阿卡演员的消息聚集

阿卡演员的消息聚集

问题描述:

下面的演员表演一次,分子和分母都收到一个部门,阿卡演员的消息聚集

package funnelTest 

import akka.actor.{Actor, ActorSystem, Props} 

object Main extends App { 

    val system1 = ActorSystem("funnelTest") 
    val input = system1.actorOf(Props[Funnel], "input") 

    input ! 3 
    input ! 2.718 

} 

case object Run 

class Funnel extends Actor { 

    var i: Option[Int] = None 
    var d: Option[Double] = None 

    def isReady = i.nonEmpty && d.nonEmpty 

    def receive = { 
    case v: Int => i = Some(v) ; if (isReady) self ! Run 
    case v: Double => d = Some(v) ; if (isReady) self ! Run 
    case Run  => println(s"aggregated, $d/$i = " + d.get/i.get) 
    case _   => 
    } 
} 

有聚合所有的消息更可扩展的方式?

+0

为什么你认为你的实现是不可扩展的? (它可以稍微改进,用一个新的方法run()来替换“!Run”,这个方法做了现在的“case Run”)。 –

标识请求的唯一标识符是解决问题的一种方法。演员内的地图(calcRegistry)拥有先前到达的FractionComponentNumeratorDenominator)。一旦第二部分进入,我们就可以开始按照你已经完成的计算来运行计算。

该实现仍然没有解决内存泄漏的问题,其中第二对将不会被接收,并且地图将继续增长。

import akka.actor.{Actor, ActorSystem, Props} 

object Main extends App { 

    import Funnel._ 

    val system1 = ActorSystem("funnelTest") 
    val input = system1.actorOf(Props[Funnel], "input") 

    (1 to 10) foreach { number => 

    val id = java.util.UUID.randomUUID().toString 
    input ! Numerator(id, value = number + 2) 
    input ! Denominator(id, value = number + 1) 
    } 

    system1.awaitTermination() 

} 

class Funnel extends Actor { 

    import Funnel._ 
    import scala.collection._ 

    val calcRegistry = mutable.Map[String, FractionComponent]() 

    def saveToRegistry(comp: FractionComponent) = calcRegistry(comp.id) = comp 

    def printValue(num: Numerator, den: Denominator) = println(s"aggregated, ${num.value}/${den.value} = ${num.value/den.value}") 

    def receive = { 
    case [email protected](id, _) => 
     if (calcRegistry contains id) 
     self ! Run(num, calcRegistry(id).asInstanceOf[Denominator]) 
     else saveToRegistry(num) 
    case [email protected](id, _) => 
     if (calcRegistry contains id) 
     self ! Run(calcRegistry(id).asInstanceOf[Numerator], den) 
     else saveToRegistry(den) 
    case Run(num: Numerator, den: Denominator) => 
     calcRegistry.remove(num.id) 
     printValue(num, den) 
    case _ => 
    } 
} 

object Funnel { 

    sealed trait FractionComponent { 
    def id: String 
    } 

    case class Numerator(override val id: String, value: Double) extends FractionComponent 

    case class Denominator(override val id: String, value: Integer) extends FractionComponent 

    case class Run(num: Numerator, denominator: Denominator) 

} 

输出示例:

aggregated, 3.0/2 = 1.5 aggregated, 4.0/3 = 1.3333333333333333 aggregated, 5.0/4 = 1.25 aggregated, 6.0/5 = 1.2 aggregated, 7.0/6 = 1.1666666666666667 aggregated, 8.0/7 = 1.1428571428571428 aggregated, 9.0/8 = 1.125 aggregated, 10.0/9 = 1.1111111111111112 aggregated, 11.0/10 = 1.1 aggregated, 12.0/11 = 1.0909090909090908

参考:Reactive Messaging Patterns with the Actor Model