在scala中实现Producer Consumer的正确方法是什么
问题描述:
我试图在没有使用Queue的情况下在scala中实现Producer Consumer程序。因为我认为Actor已经实现了“邮件队列”或其他的东西,所以再次编写代码将是多余的。在scala中实现Producer Consumer的正确方法是什么
我试图纯粹在Actor中编写程序。 下面是一个多生产者多个消费者程序。 生产者睡一会儿,模拟做某事。消费者根本不睡觉。
但是我不知道怎么关闭程序,如果我不添加主管演员监控消费者,以及使用“等待”(代码中的超级类)
是一种承诺对象无论如何摆脱他们?
import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
class Producer(val pool:ActorRef)(val name:String) extends Actor {
def receive = {
case _ =>
while (true) {
val sleepTime = scala.util.Random.nextInt(1000)
Thread.sleep(sleepTime)
println("Producer %s send food" format name)
pool ! name
}
}
}
class Consumer(supervisor : ActorRef)(val name:String) extends Actor {
var counter = 0
def receive = {
case s =>
counter += 1
println("%s eat food produced by %s" format (name,s))
if (counter >= 10) {
println("%s is full" format name)
context.stop(self)
supervisor ! 1
}
}
}
class Supervisor(p:Promise[String]) extends Actor {
var r = 3
def receive = {
case _ =>
r -= 1
if (0 == r) {
println("All consumer stopped")
context.stop(self)
p success ("Good")
}
}
}
object Try3 {
def work(): Unit = {
val system = ActorSystem("sys1")
val nProducer = 5;
val nConsumer = 3;
val p = Promise[String]
val supervisor = system.actorOf(Props(new Supervisor(p)));
val arrConsumer = for (i <- 1 to nConsumer) yield system.actorOf(Props(new Consumer(supervisor)("Consumer %d" format (i))))
val poolConsumer = system.actorOf(Props.empty.withRouter(RoundRobinRouter(arrConsumer)))
val arrProducer = for (i <- 1 to nProducer) yield system.actorOf(Props(new Producer(poolConsumer)("Producer %d" format (i))))
arrProducer foreach (_ ! "start")
Await.result(p.future,Duration.Inf)
println("great!")
system.shutdown
}
def main(args:Array[String]): Unit = {
work()
}
}
接收功能产生类有一个问题,它不会被关闭,因为它虽然没有打破的条件。
我能想到的唯一方法是“向制作者本身发送信息”。 我不知道这是实现这种请求的正常方式吗?
下面是修改代码:
class Producer(val pool:ActorRef)(val name:String) extends Actor {
// original implementation:
// def receive = {
// case _ =>
// while (true){
// val sleepTime = scala.util.Random.nextInt(1000)
// Thread.sleep(sleepTime)
// println("Producer %s send food" format name)
// pool ! name
// }
// }
case object Loop;
def receive = {
case _ =>
val sleepTime = scala.util.Random.nextInt(1000)
Thread.sleep(sleepTime)
println("Producer %s send food" format name)
pool ! name
self ! Loop //send message to itself
}
}
不管我的实现,什么是Scala实现生产者消费者程序的正确方法,与演员或未来/无极?
答
你不应该在actor中阻塞(在你的情况下是Thread.sleep,while循环)。在演员内部阻止从所有演员中使用的线程池中获取线程。即使像你这样少量的制作者也会让ActorSystem中的所有actor脱离线程并使其无法使用。
取而代之的是使用Scheduler
来定期在Producer中定期发送meesage。
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(
initialDelay = 0.seconds,
interval = 1.second,
receiver = pool,
message = name
)
}
答
你想想实现Terminator
演员:)
object Terminator {
case class WatchMe(ref: ActorRef)
}
class Terminator extends Actor {
var consumers: Map[ActorRef, ActorRef] = Map()
def receive = {
case WatchMe(ref) => {
consumers += ref -> ref
context.watch(ref)
}
case Terminated(ref) => {
context.unwatch(ref)
consumers.get(ref).foreach { ref -> ref ! PoisonPill }
consumers -= ref
//If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy
if(consumers.size == 0) {
delegate()
context.stop(self)
}
}
}
}
谢谢@Martynas什么。你解决了我的“循环”问题。我仍在寻求生产者 - 消费者优雅实施的答案。 – worldterminator 2014-10-10 07:26:22