Akka HTTP Websocket,如何识别演员内部的连接
问题描述:
我正在通过websockets为JS客户端展示scala中的简单多人游戏游戏。Akka HTTP Websocket,如何识别演员内部的连接
这里是我WebsocketServer类
class WebsocketServer(actorRef: ActorRef, protocol: Protocol, system: ActorSystem, materializer: ActorMaterializer) extends Directives {
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.map {
case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage)
}
.via(actorFlow)
.map(event => TextMessage.Strict(protocol.serialize(event)))
def actorFlow : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink =
Flow[Protocol.Message]
.to(Sink.actorRef[Protocol.Message](actorRef, Protocol.CloseConnection()))
val source =
Source.actorRef[Protocol.Event](1, OverflowStrategy.fail)
.mapMaterializedValue(actor => actorRef ! Protocol.OpenConnection(actor))
Flow.fromSinkAndSource(sink, source)
}
}
这是简化我的演员代码应该从WebSocket伺服器接收邮件。
class GameActor() extends Actor {
private var connections: List[ActorRef] = List()
override def receive: Receive = {
case message: Protocol.OpenConnection => {
this.connections = message.connection :: this.connections
message.connection ! Protocol.ConnectionEstablished()
}
case message: Protocol.CloseConnection => {
// how can I remove actor from this.connections ?
}
case message: Protocol.DoSomething => {
// how can I identify from which connection this message came in?
}
}
}
到目前为止好,目前我能够用简单的WelcomeMessage到客户端响应,但是我还是不知道如何:从连接列表
- 删除演员,每当演员收到
CloseConnection
信息? - 确定从哪个连接消息到演员?
答
我想你需要有某种key
或id
来映射你的连接actor。
def websocketFlow: Flow[Message, Message, Any] =
val randomKey = Random.nextInt()
Flow[Message]
.map {
case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage)
}
.via(actorFlow(randomKey))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def actorFlow(flowID: Int) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink =
Flow[Protocol.Message]
.to(Sink.actorRef[Protocol.Message](actorRef, Protocol.CloseConnection(flowID)))
val source =
Source.actorRef[Protocol.Event](1, OverflowStrategy.fail)
.mapMaterializedValue(actor => actorRef ! Protocol.OpenConnection(actor, flowID))
Flow.fromSinkAndSource(sink, source)
}
然后在您的演员中,您可以将连接存储在一个Map而不是List中,这也可以更有效地被删除。
答
此问题已经得到解答。对于那里的Java人,这里是Java版本:
public class WebsocketRoutes extends AllDirectives {
private final ActorSystem actorSystem;
private final ActorRef connectionManager;
public WebsocketRoutes(final ActorSystem actorSystem, final ActorRef connectionManager) {
this.actorSystem = actorSystem;
this.connectionManager = connectionManager;
}
public Route handleWebsocket() {
return path(PathMatchers.segment(compile("router_v\\d+")).slash(PathMatchers.segment("websocket")).slash(PathMatchers.segment(compile("[^\\\\/\\s]+"))), (version, routerId) ->
handleWebSocketMessages(createWebsocketFlow(routerId))
);
}
private Flow<Message, Message, NotUsed> createWebsocketFlow(final String routerId) {
final ActorRef connection = actorSystem.actorOf(WebsocketConnectionActor.props(connectionManager));
final Source<Message, NotUsed> source = Source.<RouterWireMessage.Outbound>actorRef(5, OverflowStrategy.fail())
.map((outbound) -> (Message) TextMessage.create(new String(outbound.message, "utf-8")))
.throttle(5, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
.mapMaterializedValue(destinationRef -> {
connection.tell(new RouterConnected(routerId, destinationRef), ActorRef.noSender());
return NotUsed.getInstance();
});
final Sink<Message, NotUsed> sink = Flow.<Message>create()
.map((inbound) -> new RouterWireMessage.Inbound(inbound.asTextMessage().getStrictText().getBytes()))
.throttle(5, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
.to(Sink.actorRef(connection, PoisonPill.getInstance()));
return Flow.fromSinkAndSource(sink, source);
}
}
我正在寻找某种内置机制,从未想过有史以来最简单的解决方案,谢谢! –