Akka流并发流

问题描述:

我试图测试Akka Streams的吞吐量,看看它随着请求数量的增加而变大。Akka流并发流

我目前面临的问题是stream不能并发工作。我的streamflows组成,每个睡一秒以模拟功能。发生的是,对于通过流传递的每个元素,流将同步处理它。我希望这也是异步的,以优化我的表现。

这是我使用的代码:

// Flow that's being used 
def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] = 
    Flow[TestObject].map { s ⇒ 
     println(name + " started processing " + s + " on thread " + Thread.currentThread().getName) 
     Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!* 
     println(name + " finished processing " + s) 
     s 
    } 

// Stream 
def startStream() = { 
     val completion = Source[TestObject](list.toList) 
      .via(processingStage("A")).async 
      .via(processingStage("B")).async 
      .runWith(Sink.foreach(s ⇒ println("Got output " + s))) 
    } 
+1

太糟糕的人不张贴自足的例子,我们可以运行。但是我会尝试在阶段结束时添加'.async'。也可以尝试使用'mapAsync(4)'代替。 – expert

+0

他们总是告诉你不要在演员中使用Thread.sleep。创建一个新的演员并安排一项任务。接下来,如果您不关心顺序,则使用mapAsyncUnordered,因为它不会创建与mapAsync相关的开销,并且如前所述创建异步边界(将任务分解到不同的参与者以进行可能的并发处理)。干杯 –

不要使用了Thread.sleep(1000)来模拟延迟,使用基于时间的组合程序。 另外,如果您想强制同一个流中的多个阶段的并发性,请使用.async划分。有关更多详情,请参阅文档。

+0

我一直在'stream'中使用'.async'。它在问题中发布的示例中的作用是一旦元素已经通过流A,下一个元素将立即从流A开始,而不等待前一个元素完成整个流(这是它在删除'.async') – RemcoW

+0

@Viktor巴生基于时间的combinator的一些例子吗? – MaatDeamon

流默认是串行的。如果您希望并行处理流中的元素,则必须直接请求它。

该文档描述了一个方法来实现这一目标:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

这是它添加到您的代码:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, FlowShape} 
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source} 

object q40545440 { 

    def main(args: Array[String]): Unit = { 

    implicit val sys = ActorSystem() 
    implicit val mat = ActorMaterializer() 

    case class TestObject(x: String) 

    // Flow that's being used 
    def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] = 
    Flow[TestObject].map { s ⇒ 
     println(name + " started processing " + s + " on thread " + Thread.currentThread().getName) 
     Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!* 
     println(name + " finished processing " + s) 
     s 
    } 

    // Stream to a parallel processing pool of workers 
    // See http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers 
    def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = { 
     import GraphDSL.Implicits._ 

     Flow.fromGraph(GraphDSL.create() { implicit b => 
     val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true)) 
     val merge = b.add(Merge[Out](workerCount)) 

     for (_ <- 1 to workerCount) { 
      // for each worker, add an edge from the balancer to the worker, then wire 
      // it to the merge element 
      balancer ~> worker.async ~> merge 
     } 

     FlowShape(balancer.in, merge.out) 
     }) 
    } 

    def startStream(list: List[TestObject]) = { 
     val completion = Source[TestObject](list) 
     .via(balancer(processingStage("A"), 5)) 
     .via(balancer(processingStage("B"), 5)) 
     .runWith(Sink.foreach(s ⇒ println("Got output " + s))) 
    } 

    startStream(List(
     TestObject("a"), 
     TestObject("b"), 
     TestObject("c"), 
     TestObject("d"))) 

    sys.terminate() 
    } 
} 

这里是你的代码一点点更新版本:

import akka.stream.scaladsl.{Sink, Source} 

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 

object Test extends App {  

    // Flow that's being used 
    def processingStage(name: String): Future[String] = Future { 
    println(name + " started processing " + name + " on thread " + Thread.currentThread().getName) 
    Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!* 
    println(name + " finished processing " + name) 
    name 
    } 

    // Stream 
    def startStream() = { 
    val parallelism = 10 //max number of parallel instances 
    val list = (1 to 1000000).toList.map(_.toString) //sample input 

    Source[String](list) 
     .mapAsync(parallelism)(processingStage) //handles the items concurrently 
     .runWith(Sink.foreach(s ⇒ println("Got output " + s))) 
    } 

    startStream() 
} 
  • 第一点是,你应该转换你的processingStage函数我nto返回Future的方法。通过这样做,您可以更好地模拟并发任务。

  • 其次,您应该使用mapAsync方法来支持分阶段的并发。据我所知,这是你正在寻找的确切功能。