操纵序列元素在阿卡流动
问题描述:
我有如下所示2个流量:操纵序列元素在阿卡流动
val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...
我想这些组合成一个方便的方法如下所示:
val aToSeqOfC: Flow[A, Seq[C], NotUsed]
到目前为止,我有以下,但我知道它只是以C
元素结束而不是Seq[C]
。
Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)
如何在这种情况下保留Seq
?
答
间接回答
在我看来你的问题突出了“菜鸟错误”与阿卡流处理时是常见的一种。将业务逻辑放在aka流构造中通常不是一个好的组织。你的问题表明你有形式的东西:
val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B =>
//business logic
}
更理想的情况是,如果你有:
//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
//business logic
}
val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc
在上面的“理想”例如Flow
只是一个很薄的木皮上正常的,非阿卡,业务逻辑的顶部。
单独的逻辑能简单地解决您原来的问题:
val aToSeqOfC : Flow[A, Seq[C], NotUsed] =
aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))
直接回答
如果无法重新组织你的代码,那么唯一的选择是应对期货。你需要一个独立的子流中使用bToC
:
val mat : akka.stream.Materializer = ???
val seqBToSeqC : Seq[B] => Future[Seq[C]] =
(seqB) =>
Source
.apply(seqB.toIterable)
.via(bToC)
.to(Sink.seq[C])
.run()
然后,您可以使用一个mapAsync
内这个功能来构建流程您正在寻找:
val parallelism = 10
val aToSeqOfC: Flow[A, Seq[C], NotUsed] =
aToSeqB.mapAsync(parallelism)(seqBtoSeqC)
在我的代码'seqBToSeqC '不编译。它说'Source.apply(_)'签名是无效的。 –
它为我编译,但答复赞扬。 –
试过了。与没有签名一样。类型不匹配期望'Iterable [NotInferedT]'实际'Seq [B]' –