Akka通过http的对象流

问题描述:

我有一段代码(见下文),它产生了一个服务器,它回应从端口6001接收到的每一个ByteString流。该示例还定义了一个客户端,该客户端连接到服务器并发送包含从字母'a'到'z'的字符列表的ByteString流。Akka通过http的对象流

我现在的问题是,akka提供了一种方法来发送和接收对象的流,而不是通过http的ByStreams?例如,类Client的对象。

如果是这样,我该如何发送和接收这样的对象流?你能否给我提供一个展示如何实施的片段?

阿卡文档不是用户友好的非玩具的例子...

感谢您的帮助

公共类TcpEcho {

/** 
* Use without parameters to start both client and server. 
* 
* Use parameters `server 0.0.0.0 6001` to start server listening on port 
* 6001. 
* 
* Use parameters `client 127.0.0.1 6001` to start client connecting to 
* server on 127.0.0.1:6001. 
* 
*/ 
public static void main(String[] args) throws IOException { 
    if (args.length == 0) { 
     ActorSystem system = ActorSystem.create("ClientAndServer"); 
     InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     server(system, serverAddress); 
     client(system, serverAddress); 
    } else { 
     InetSocketAddress serverAddress; 
     if (args.length == 3) { 
      serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2])); 
     } else { 
      serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     } 
     if (args[0].equals("server")) { 
      ActorSystem system = ActorSystem.create("Server"); 
      server(system, serverAddress); 
     } else if (args[0].equals("client")) { 
      ActorSystem system = ActorSystem.create("Client"); 
      client(system, serverAddress); 
     } 
    } 
} 

public static void server(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> { 
     System.out.println("Client connected from: " + conn.remoteAddress()); 
     conn.handleWith(Flow.<ByteString> create(), materializer); 
    }); 

    final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system) 
      .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer); 

    bindingFuture.whenComplete((binding, throwable) -> { 
     System.out.println("Server started, listening on: " + binding.localAddress()); 
    }); 

    bindingFuture.exceptionally(e -> { 
     System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage()); 
     system.terminate(); 
     return null; 
    }); 

} 

public static void client(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final List<ByteString> testInput = new ArrayList<>(); 
    for (char c = 'a'; c <= 'z'; c++) { 
     testInput.add(ByteString.fromString(String.valueOf(c))); 
    } 

    Source<ByteString, NotUsed> responseStream = Source.from(testInput) 
      .via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort())); 

    CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in), 
      materializer); 

    result.whenComplete((success, failure) -> { 

     if (failure != null) { 
      System.err.println("Failure: " + failure.getMessage()); 
     } else { 
      System.out.println("Result: " + success.utf8String()); 
     } 
     System.out.println("Shutting down client"); 
     system.terminate(); 

    }); 
} 

}

+0

你见过[这个例子](http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-graphs.html#Bidirectional_Flows)如何创建双向流,我认为它或多或少的,你要求什么? – lpiepiora

+0

我没有看,但我会。无论如何,你有一个更好的想法/建议的代码片段?谢谢 – broga

akka.stream.{javadsl,scaladsl}.Framing包含水电费帮助您建立一致的信息。例如,您可以通过Framing.simpleFramingProtocolEncoder(maxLength)发送消息来自动向他们添加长度信息。另一方面,Framing.simpleFramingProtocolDecoder(maxLength)将根据其封闭的长度信息来解码消息。

如果您想操作普通对象,您只需在将它们发送到ByteString之前将它们发送到编码器,并在从解码器接收它们的表示之后将它们从ByteString反序列化。