如何使用不响应的Akka Java API创建TCP接收器

问题描述:

我正在使用这个example作为我的出发点。如何使用不响应的Akka Java API创建TCP接收器

在这个例子中,服务器响应修改过的文本,但在我的情况下,服务器不需要响应。

看着其他类似的问题,我知道我可以传递Empty ByteString或使用过滤器(p - > false)不发回任何东西。但是,在这种情况下,问题是我的whenComplete块没有得到执行。即异常被吞噬。有没有办法避免这种情况?帮助赞赏!

connections.runForeach(connection -> { 
System.out.println("New connection from: " + connection.remoteAddress()); 

final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class) 
.via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) 
.map(ByteString::utf8String) 
.map(s -> s + "!!!\n") 
.map(ByteString::fromString); 

connection.handleWith(echo, mat); 
}, mat).whenComplete((done,throwable) -> 
    { 
    //exception handling 
    } 
); 
+0

在进一步的分析,我认为这个问题是我想为每个连接completionStage而不是Source(连接)的末尾。 – tapasvi

您的分析是正确的,现在您在服务器关闭时发生反应,而不是每次连接。

起反应的各个连接完成将在传递到连接的流动,像这样的东西来完成:

final Tcp tcp = Tcp.get(system); 
tcp.bind("127.0.0.1", 6789).runForeach((connection) -> { 

    final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class) 
    .via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) 
    .map(ByteString::utf8String) 
    .map(s -> s + "!!!\n") 
    .map(ByteString::fromString) 
    .watchTermination((notUsed, completionStage) -> { 
     completionStage.whenComplete((done, exception) -> { 
     System.out.println("Connection from " + connection.remoteAddress() + " completed"); 
     }); 
     return notUsed; 
    }); 

    connection.handleWith(echo, materializer); 
}, materializer);