Akka scala具有不同分类器的事件总线(取决于订户)

问题描述:

我正在研究Akka EventBus以检查它是否可以解决我的设计问题,但我仍然不知道。 问题是以下。Akka scala具有不同分类器的事件总线(取决于订户)

简化,我有:

case class Request(requesterId: String, operation: String, header: RequestHeader) 
case class Response(requesterId: String, operation: String, header: ResponseHeader) 

我有几个演员有不同的功能,我想一些演员订阅Response取决于requesterId,取决于operation一些人。 有没有办法通过EventBus和分类器轻松实现?

感谢, 乔尔

当然,这就是所谓的LookupEventBus。您可以通过扩展它实现自己的巴士,并在classify方法提取requesterId,就像这样:

class LookupBusImpl extends EventBus with LookupClassification { 
    type Event = HasRequesterId // I made up a super type for you here 
    type Classifier = String 
    type Subscriber = ActorRef 

    override def classify(event: HasRequesterId): String = event.requesterId 

然后,你订阅了指定requesterId,就像这样:

lookupBus.subscribe(actorRef, "requester-100") 

而这个演员然后将仅接收被分类为requester-100的消息。

+0

谢谢,但它仅回答我问题的第一部分:我想要另一个演员能够订阅不同的分类器,即“操作”字段。基本上,请求者对结果感兴趣,因为...他是请求者,还有一些是因为他们想知道具体的操作。我应该创建几辆巴士并将事件发布两次? – Joel 2014-11-25 15:34:56

我同意康拉德,你应该实施新的LookupClassification总线来解决你的问题。我认为这些总线有两个单独的实例是最简单的,一个通过requesterId进行分类,另一个通过操作进行分类。一些这种方法的基本设置工作将是:

//Singleton to hold the instances of each stream type 
object ResponseEventStream{ 
    val RequestorIdStream = new RequestorIdResponseEventStream 
    val OperationStream = new OperationResponseEventStream 
} 

//Common functionality for the two different types of streams 
trait ResponseEventStream extends ActorEventBus with LookupClassification{ 
    import ResponseEventStream._ 
    type Event = Response 
    type Classifier = String 
    protected def mapSize = 128 
    protected def publish(resp:Response, subscriber: ActorRef) = { 
    if (subscriber.isTerminated) unsubscribe(subscriber) 
    else subscriber ! resp 
    } 
} 

//Concrete impl that uses requesterId to classify 
class RequestorIdResponseEventStream extends ResponseEventStream{ 
    protected def classify(resp:Response) = resp.requesterId 
} 

//Concrete impl that uses operation to classify 
class OperationResponseEventStream extends ResponseEventStream{ 
    protected def classify(resp:Response) = resp.operation 
} 

//Trait to mix into classes that need to publish or subscribe to response events 
//Has helper methods to simplify interaction with the two distinct streams 
trait ResponseEventing{ 
    import ResponseEventStream._ 

    def publishResponse(resp:Response){ 
    RequestorIdStream.publish(resp) 
    OperationStream.publish(resp) 
    } 

    def subscribeByRequestId(requestId:String, ref:ActorRef){ 
    RequestorIdStream.subscribe(ref, requestId) 
    } 

    def subscribeByOperartion(op:String, ref:ActorRef){ 
    OperationStream.subscribe(ref, op) 
    } 
} 

然后你只需要到ResponseEventing特质混合成需要或者发布Response事件或需要进行订阅者角色。正在发布的参与者将调用publishResponse,而需要订阅的演员将根据他们感兴趣的分类(requesterId或操作)调用subscribeXXX

+0

非常感谢,我会试试这个解决方案! – Joel 2014-11-25 15:42:54