Akka HTTP客户端websocket流的定义
我过去成功地使用了Akka Streams,但是我目前很难理解为什么客户端Websocket Streams在Akka-HTTP中被定义并且作为它显示在documentation中。Akka HTTP客户端websocket流的定义
由于WebSocket连接允许全双工通信,我期望这种连接由Akka HTTP中的两个单独的流表示,一个用于传入流量,另一个用于传出流量。并且实际上,文档指出以下几点:
甲的WebSocket由消息的两个流的[...]
它进一步指出,传入消息由Sink
和传出消息由a表示Source
。这是我的第一个困惑 - 当使用两个独立的流时,你会期望总共处理两个源和两个接收器,而不是每种类型中的一个。目前,我的猜测是,传入流的来源以及传出流的接收器对于开发人员来说并不是很有用,因此只是“隐藏起来”。
但是,当将所有东西连接在一起时,它确实令人困惑(请参阅上面链接的文档)。
使用singleWebSocketRequest
时,有问题的部分:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
或者用webSocketClientFlow
当相同的部分:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
这违背我的电流流的工作流程的理解。
- 我为什么要到
Source
传出消息和Sink
传入的消息结合?上面的代码看起来像我发送消息给自己而不是服务器。 - 此外,什么是
Flow[Message, Message, ...]
的语义?将传出消息转换为传入消息似乎没有意义。 - 不应该有两个流而不是一个?
任何帮助提高我的理解是赞赏,谢谢。
编辑:
我必须使用Source
和Sink
并通过WebSocket的发送数据没有问题,我只是想知道为什么阶段的布线这样做。
WebSocket包含两个独立的流,它们只是(可能)不在同一个JVM上。
你有两个对等体进行通信,一个是服务器,另一个是客户机,但从建立的WebSocket连接的角度来看,差异不再重要。一个数据流是对等体1向对等体2发送消息,另一个流是对等体2向对等体1发送消息,然后在这两个对等体之间存在网络边界。如果您一次查看一个对等点,则您有对等点1接收来自对等点2的消息,而在第二个流中,对等点1正在向对等点2发送消息。
每个对等体都有一个接收部分的接收器和一个发送部分的源。实际上,您确实有两个源和两个共享池,但不是同一个ActorSystem(假设为了解释两个对等实现在Akka HTTP中)。来自对等体1的源连接到对等体2的宿并且对等体2的源连接到对等体1的接收器。
因此,您编写了一个接收器来描述如何处理第一个流中的传入消息以及描述如何通过第二个流发送消息的源。通常,您希望根据您收到的消息生成消息,因此您可以将这两者连接在一起,并通过不同的流程来路由消息,这些流程描述如何响应和传入消息并生成任意数量的传出消息。 Flow[Message, Message, _]
并不意味着您将传出消息转换为传入消息,而是将传入消息转换为传出消息。
webSocketFlow
是一个典型的异步边界,代表另一个对等体的流。它通过将传出消息发送到另一个对等体并发射其他对等体发送的内容来“转换”传出消息。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
这种流动是两个流的同行的一半:
-
[message from other peer]
连接到printSink
-
helloSource
连接到[message to the other peer]
有传入邮件和传出之间没有关系消息,您只需打印收到的所有内容并发送一个“hello world!”。实际上,由于源在一条消息之后完成,所以WebSocket连接也关闭,但是如果用例如Source.repeat
替换源,则会不断发送(洪泛,真的)“hello,world!”。无论传入消息的速率如何。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
在这里,您采取一切从outgoing
来了,这是你要发送的邮件,将其路由通过webSocketFlow
,通过与其他同行交流“转变”的消息,并生成每一个接收到的消息到incoming
。通常你有一个有线协议,你可以将你的case class/pojo/dto消息编码并解码成Wire格式。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
或者你能想象某种聊天服务器(啊,的WebSockets和聊天室),其中广播和和一些客户的融合信息。这应该采取的任何消息从任何客户端,并将其发送给每个客户端(演示只,未经测试,可能不是你想要什么实际的聊天服务器):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
希望这有助于一点。