并行运行

问题描述:

阿卡流我有一个流是并行运行

  1. 侦听HTTP后接收事件的
  2. 列表mapconcat事件的流元素
  3. 转换事件卡夫卡记录列表
  4. 产生与反应性卡夫卡(akka流卡夫卡生产者水槽)的记录

这里是简化代码

// flow to split group of lines into lines 
    val splitLines = Flow[List[Evt]].mapConcat(list=>list) 

// sink to produce kafka records in kafka 
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt] 
    .map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value)) 
    .toMat(Producer.plainSink(kafka))(Keep.right) 

val routes = { 
    path("ingest") { 
     post { 
     (entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) => 
      val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat) 
      val result = onComplete(ingest){ 
       case Success(value) => complete(s"OK") 
       case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
      } 
      complete("eventList ingested: " + result) 
      } 
     } 
    } 
    } 

您能否突出显示什么是并行运行,什么是顺序?

我认为mapConcat序列化流中的事件,所以如何在并行处理mapConcat每个步骤之后并行化流?

简单的mapAsyncUnordered是否足够?或者我应该使用GraphDSL进行平衡和合并?

在你的情况下,它会顺序我想。在开始向Kafka推送数据之前,您也会收到整个请求。我会用extractDataBytes指令给你src: Source[ByteString, Any]。然后我会处理它像

src 
    .via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String)) 
    .mapConcat { line => 
    line.split(",") 
    }.async 
    .runWith(kafkaSink)(mat) 
+0

extractDataBytes的问题是,我不能解开JSON easly ... – vgkowski

+0

嗯,我明白了。我从来不需要解析json的无限流,但我听说jawn支持它。 https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala – expert

+0

也可以查看http://owlike.github.io/genson/ – expert