卡夫卡主题websocket
我想实现一个安装程序,我有多个Web浏览器打开一个websocket连接到我的akka-http服务器,以便读取发布到卡夫卡主题的所有消息。卡夫卡主题websocket
所以消息流应该走这条路
kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3
对于WebSocket的,现在我已经创建了一个路径:
val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}
然后我创建了一个消费者对我的卡夫卡话题:
val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))
然后最后我想把这个源连接到handleWebSocketMessages中的websocket
def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
这里是我的错误,当我尝试在TextMessage中使用source
:
Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil
我想我前进的道路上做出频频失误,但我要说的是,大部分阻塞部分是handleWebSocketMessages
。
第一件事,就是要了解该源的类型:Source[ConsumerRecord[K, V], Control]
。 因此,它不是您可以作为TextMessage参数传递的内容。
现在,让我们来看网页套接字的点:
- 传出消息被内置在卡夫卡源中的每个消息。该消息将是来自Kafka消息的字符串转换的TextMessage。
- 对于每个收到的消息,只是println()一样它
所以,Flow
可以被看作是两个组件:Source
&的Sink
。
val incomingMessages: Sink[Message, NotUsed] =
Sink.foreach(println(_))
val outgoingMessages: Source[Message, NotUsed] =
source
.map { consumerRecord => TextMessage(consumerRecord.record.value) }
val handleWebSocketMessages: Flow[Message, Message, Any]
= Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
希望它有帮助。
非常感谢!您的答案在我使用'Consumer.committableSource'而不是'Consumer.plainSource',和'consumerRecord.record.value()'而不是'consumerRecord.getkey.toString'之后起作用。 –
太棒了!我更新了我的答案。 – n1r3