并行运行
问题描述:
阿卡流我有一个流是并行运行
- 侦听HTTP后接收事件的
- 列表mapconcat事件的流元素
- 转换事件卡夫卡记录列表
- 产生与反应性卡夫卡(akka流卡夫卡生产者水槽)的记录
这里是简化代码
// flow to split group of lines into lines
val splitLines = Flow[List[Evt]].mapConcat(list=>list)
// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
.map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
.toMat(Producer.plainSink(kafka))(Keep.right)
val routes = {
path("ingest") {
post {
(entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
val result = onComplete(ingest){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
complete("eventList ingested: " + result)
}
}
}
}
您能否突出显示什么是并行运行,什么是顺序?
我认为mapConcat序列化流中的事件,所以如何在并行处理mapConcat每个步骤之后并行化流?
简单的mapAsyncUnordered是否足够?或者我应该使用GraphDSL进行平衡和合并?
答
在你的情况下,它会顺序我想。在开始向Kafka推送数据之前,您也会收到整个请求。我会用extractDataBytes
指令给你src: Source[ByteString, Any]
。然后我会处理它像
src
.via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String))
.mapConcat { line =>
line.split(",")
}.async
.runWith(kafkaSink)(mat)
extractDataBytes的问题是,我不能解开JSON easly ... – vgkowski
嗯,我明白了。我从来不需要解析json的无限流,但我听说jawn支持它。 https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala – expert
也可以查看http://owlike.github.io/genson/ – expert