Akka流并发流
我试图测试Akka Streams
的吞吐量,看看它随着请求数量的增加而变大。Akka流并发流
我目前面临的问题是stream
不能并发工作。我的stream
由flows
组成,每个睡一秒以模拟功能。发生的是,对于通过流传递的每个元素,流将同步处理它。我希望这也是异步的,以优化我的表现。
这是我使用的代码:
// Flow that's being used
def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] =
Flow[TestObject].map { s ⇒
println(name + " started processing " + s + " on thread " + Thread.currentThread().getName)
Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
println(name + " finished processing " + s)
s
}
// Stream
def startStream() = {
val completion = Source[TestObject](list.toList)
.via(processingStage("A")).async
.via(processingStage("B")).async
.runWith(Sink.foreach(s ⇒ println("Got output " + s)))
}
不要使用了Thread.sleep(1000)来模拟延迟,使用基于时间的组合程序。 另外,如果您想强制同一个流中的多个阶段的并发性,请使用.async
划分。有关更多详情,请参阅文档。
我一直在'stream'中使用'.async'。它在问题中发布的示例中的作用是一旦元素已经通过流A,下一个元素将立即从流A开始,而不等待前一个元素完成整个流(这是它在删除'.async') – RemcoW
@Viktor巴生基于时间的combinator的一些例子吗? – MaatDeamon
流默认是串行的。如果您希望并行处理流中的元素,则必须直接请求它。
该文档描述了一个方法来实现这一目标:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
这是它添加到您的代码:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
object q40545440 {
def main(args: Array[String]): Unit = {
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
case class TestObject(x: String)
// Flow that's being used
def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] =
Flow[TestObject].map { s ⇒
println(name + " started processing " + s + " on thread " + Thread.currentThread().getName)
Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
println(name + " finished processing " + s)
s
}
// Stream to a parallel processing pool of workers
// See http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
// for each worker, add an edge from the balancer to the worker, then wire
// it to the merge element
balancer ~> worker.async ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
def startStream(list: List[TestObject]) = {
val completion = Source[TestObject](list)
.via(balancer(processingStage("A"), 5))
.via(balancer(processingStage("B"), 5))
.runWith(Sink.foreach(s ⇒ println("Got output " + s)))
}
startStream(List(
TestObject("a"),
TestObject("b"),
TestObject("c"),
TestObject("d")))
sys.terminate()
}
}
这里是你的代码一点点更新版本:
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Test extends App {
// Flow that's being used
def processingStage(name: String): Future[String] = Future {
println(name + " started processing " + name + " on thread " + Thread.currentThread().getName)
Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
println(name + " finished processing " + name)
name
}
// Stream
def startStream() = {
val parallelism = 10 //max number of parallel instances
val list = (1 to 1000000).toList.map(_.toString) //sample input
Source[String](list)
.mapAsync(parallelism)(processingStage) //handles the items concurrently
.runWith(Sink.foreach(s ⇒ println("Got output " + s)))
}
startStream()
}
第一点是,你应该转换你的
processingStage
函数我nto返回Future
的方法。通过这样做,您可以更好地模拟并发任务。其次,您应该使用
mapAsync
方法来支持分阶段的并发。据我所知,这是你正在寻找的确切功能。
太糟糕的人不张贴自足的例子,我们可以运行。但是我会尝试在阶段结束时添加'.async'。也可以尝试使用'mapAsync(4)'代替。 – expert
他们总是告诉你不要在演员中使用Thread.sleep。创建一个新的演员并安排一项任务。接下来,如果您不关心顺序,则使用mapAsyncUnordered,因为它不会创建与mapAsync相关的开销,并且如前所述创建异步边界(将任务分解到不同的参与者以进行可能的并发处理)。干杯 –