Scala——基于Akka的并发编程和分布式应用程序开发
基于Akka分布式技术开发分布式应用程序,分为两个角色:
1、master
作用:接收worker的注册,并将worker的注册信息保存下来;感知worker的上下线;接收worker的汇报心跳,更新worker的相关信息;
定时检测超时的worker,并将超时的worker从集群中移除掉。
2、worker
1、master
作用:接收worker的注册,并将worker的注册信息保存下来;感知worker的上下线;接收worker的汇报心跳,更新worker的相关信息;
定时检测超时的worker,并将超时的worker从集群中移除掉。
2、worker
作用:向master进行注册,加入到集群中去;定时向master汇报心跳。
案例代码如下:
import akka.actor.Actor.Receive import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ /** * Master.scala */ class Master extends Actor { //worker ---->workInfo val idToWorker = new mutable.HashMap[String, WorkerInfo]() //WorkInfo val workers = new mutable.HashSet[WorkerInfo]() val checkTimeOutWorkerInterval = 5000 //preStart()是Master启动之后立即执行的方法 override def preStart(): Unit = { import context.dispatcher //启动一个定时器,定时检查超时的Worker context.system.scheduler.schedule(0 millis, checkTimeOutWorkerInterval millis, self, CheckTimeOutWorker) } override def receive: Receive = { case "started" => println("master startup successful...") //接收Worker发送过来的注册信息 case RegisterWorker(workerId, cores, memory) => { if (!idToWorker.contains(workerId)) { //3、将发送过来的Worker信息封装成WorkerInfo保存到HashMap和HashSet中 val workerInfo = new WorkerInfo(workerId, cores, memory) idToWorker.put(workerId, workerInfo) workers += workerInfo //4、master向worker发送注册成功的消息给worker //sender指代的是消息源,即谁发送过来的消息,就指代谁 sender() ! RegisteredWorker } } //Master接收Worker汇报的心跳信息 case SendHeartBeat(workerId) => { //从idToWorker中取出对应的Worker,并更新最近一次汇报心跳的时间 if (idToWorker.contains(workerId)) { val workerInfo = idToWorker(workerId) workerInfo.lastHeartBeatTime = System.currentTimeMillis() ///////////////////////////////////////// workers += workerInfo } } case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() //过滤出超时的Worker(即在指定的时间范围内没有向Master进行汇报) val toRemoves = workers.filter(w => currentTime - w.lastHeartBeatTime > checkTimeOutWorkerInterval) toRemoves.foreach(worker => { idToWorker.remove(worker.workerId) workers -= worker }) println(s"worker count is ${workers.size}") } } } object Master { def main(args: Array[String]): Unit = { val masterHost = "localhost" val masterPort = "8081" /** * 在实际应用场景下,有时候我们就是确实需要在scala创建多少字符串,但是每一行需要固定对齐。 * 解决该问题的方法就是应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符, * 在多行换行的行头前面加一个“|”符号即可。 * 当然stripMargin方法也可以自己指定“定界符”,同时更有趣的是利用stripMargin.replaceAll方法, * 还可以将多行字符串”合并”一行显示。 */ val configStr = s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname="$masterHost" |akka.remote.netty.tcp.port="$masterPort" """.stripMargin val config = ConfigFactory.parseString(configStr) //创建老大ActorSystem val masterActorSystem = ActorSystem("masterActorSystem", config) //使用ActorSystem创建actor val masterActor = masterActorSystem.actorOf(Props[Master], "masterActor") //给新创建的masterActor发送一条消息,发送消息使用感叹号"!" masterActor ! "started" //将ActorSystem阻塞在这,不要让其停止 masterActorSystem.whenTerminated } }
============================================================ import java.util.UUID import akka.actor.Actor.Receive import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /** * Worker.scala */ class Worker(var cores: Int, var memory: Int) extends Actor { var masterActor: ActorSelection = null var workerId = UUID.randomUUID().toString val heartBeatInterval = 3000 //actor启动后会立即执行此方法,只会执行一次 override def preStart(): Unit = { masterActor = context.actorSelection("akka.tcp://[email protected]:8081/user/masterActor") //2、worker启动后立即向master进行注册 masterActor ! RegisterWorker(workerId, cores, memory) } //用来接收消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 override def receive: Receive = { case "started" => println("worker setup sucessful.....") //master发送过来的注册成功的消息 case RegisteredWorker => { //导入定时器 import context.dispatcher //开启定时任务 //此处的millis需要导入import scala.concurrent.duration._ context.system.scheduler.schedule(0 millis, heartBeatInterval millis, self, SendHeartBeat) } //向master汇报心跳 case SendHeartBeat => { masterActor ! SendHeartBeat(workerId) } } } object Worker { def main(args: Array[String]): Unit = { val workerHost = "localhost" val workerPort = "8084" /** * 在实际应用场景下,有时候我们就是确实需要在scala创建多少字符串,但是每一行需要固定对齐。 * 解决该问题的方法就是应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符, * 在多行换行的行头前面加一个“|”符号即可。 * 当然stripMargin方法也可以自己指定“定界符”,同时更有趣的是利用stripMargin.replaceAll方法, * 还可以将多行字符串”合并”一行显示。 */ val configStr = s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname="$workerHost" |akka.remote.netty.tcp.port="$workerPort" """.stripMargin val config = ConfigFactory.parseString(configStr) //创建老大ActorSystem val workerActorSystem = ActorSystem("workerActorSystem", config) //使用ActorSystem创建actor //创建一个Worker对象,Cores为62,memory为128 val workerActor = workerActorSystem.actorOf(Props(new Worker(62, 128)), "workerActor") //给新创建的masterActor发送一条消息,发送消息使用感叹号"!" workerActor ! "started" //将ActorSystem阻塞在这,不要让其停止 workerActorSystem.whenTerminated } }
========================================================
/** * WorkInfo.class * 用来封装Worker的基本信息 */ class WorkerInfo(var workerId: String, var cores: Int, var memory: Int) { //worker最近汇报心跳的时间 var lastHeartBeatTime:Long = System.currentTimeMillis() }
======================================================================== /** * RemoteMessage.scala */ //因为需要走网络,所以此trait需要继承Serializable trait RemoteMessage extends Serializable {} //worker------>master case class RegisterWorker(var workerId: String, var cores: Int, var memory: Int) extends RemoteMessage //master ---->worker case class RegisteredWorker() extends RemoteMessage //worker ------> worker object SendHeartBeat //worker------>master case class SendHeartBeat(var workerId: String) //master ------> master object CheckTimeOutWorker