操纵序列元素在阿卡流动

问题描述:

我有如下所示2个流量:操纵序列元素在阿卡流动

val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ... 
val bToC: Flow[B, C, NotUsed] = ... 

我想这些组合成一个方便的方法如下所示:

val aToSeqOfC: Flow[A, Seq[C], NotUsed] 

到目前为止,我有以下,但我知道它只是以C元素结束而不是Seq[C]

Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC) 

如何在这种情况下保留Seq

间接回答

在我看来你的问题突出了“菜鸟错误”与阿卡流处理时是常见的一种。将业务逻辑放在aka流构造中通常不是一个好的组织。你的问题表明你有形式的东西:

val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B => 
    //business logic 
} 

更理想的情况是,如果你有:

//normal function, no akka involved 
val bToCFunc : B => C = { b : B => 
    //business logic 
} 

val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc 

在上面的“理想”例如Flow只是一个很薄的木皮上正常的,非阿卡,业务逻辑的顶部。

单独的逻辑能简单地解决您原来的问题:

val aToSeqOfC : Flow[A, Seq[C], NotUsed] = 
    aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc)) 

直接回答

如果无法重新组织你的代码,那么唯一的选择是应对期货。你需要一个独立的子流中使用bToC

val mat : akka.stream.Materializer = ??? 

val seqBToSeqC : Seq[B] => Future[Seq[C]] = 
    (seqB) => 
    Source 
     .apply(seqB.toIterable) 
     .via(bToC) 
     .to(Sink.seq[C]) 
     .run() 

然后,您可以使用一个mapAsync内这个功能来构建流程您正在寻找:

val parallelism = 10 

val aToSeqOfC: Flow[A, Seq[C], NotUsed] = 
    aToSeqB.mapAsync(parallelism)(seqBtoSeqC) 
+0

在我的代码'seqBToSeqC '不编译。它说'Source.apply(_)'签名是无效的。 –

+0

它为我编译,但答复赞扬。 –

+0

试过了。与没有签名一样。类型不匹配期望'Iterable [NotInferedT]'实际'Seq [B]' –