Scala——基于Akka的并发编程和分布式应用程序开发

基于Akka分布式技术开发分布式应用程序,分为两个角色:
1、master
作用:接收worker的注册,并将worker的注册信息保存下来;感知worker的上下线;接收worker的汇报心跳,更新worker的相关信息;
定时检测超时的worker,并将超时的worker从集群中移除掉。
2、worker

作用:向master进行注册,加入到集群中去;定时向master汇报心跳。

Scala——基于Akka的并发编程和分布式应用程序开发

案例代码如下:

import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._

/**
  * Master.scala
  */
class Master extends Actor {
  //worker ---->workInfo
  val idToWorker = new mutable.HashMap[String, WorkerInfo]()
  //WorkInfo
  val workers = new mutable.HashSet[WorkerInfo]()
  val checkTimeOutWorkerInterval = 5000

  //preStart()是Master启动之后立即执行的方法
  override def preStart(): Unit = {
    import context.dispatcher
    //启动一个定时器,定时检查超时的Worker
    context.system.scheduler.schedule(0 millis, checkTimeOutWorkerInterval millis, self, CheckTimeOutWorker)
  }

  override def receive: Receive = {
    case "started" => println("master startup successful...")
    //接收Worker发送过来的注册信息
    case RegisterWorker(workerId, cores, memory) => {
      if (!idToWorker.contains(workerId)) {
        //3、将发送过来的Worker信息封装成WorkerInfo保存到HashMap和HashSet中
        val workerInfo = new WorkerInfo(workerId, cores, memory)
        idToWorker.put(workerId, workerInfo)
        workers += workerInfo

        //4、master向worker发送注册成功的消息给worker
        //sender指代的是消息源,即谁发送过来的消息,就指代谁
        sender() ! RegisteredWorker
      }
    }
    //Master接收Worker汇报的心跳信息
    case SendHeartBeat(workerId) => {
      //从idToWorker中取出对应的Worker,并更新最近一次汇报心跳的时间
      if (idToWorker.contains(workerId)) {
        val workerInfo = idToWorker(workerId)
        workerInfo.lastHeartBeatTime = System.currentTimeMillis()
        /////////////////////////////////////////
        workers += workerInfo
      }
    }
    case CheckTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      //过滤出超时的Worker(即在指定的时间范围内没有向Master进行汇报)
      val toRemoves = workers.filter(w => currentTime - w.lastHeartBeatTime > checkTimeOutWorkerInterval)
      toRemoves.foreach(worker => {
        idToWorker.remove(worker.workerId)
        workers -= worker
      })
      println(s"worker count is ${workers.size}")
    }
  }
}

object Master {
  def main(args: Array[String]): Unit = {
    val masterHost = "localhost"
    val masterPort = "8081"
    /**
      * 在实际应用场景下,有时候我们就是确实需要在scala创建多少字符串,但是每一行需要固定对齐。
      * 解决该问题的方法就是应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符,
      * 在多行换行的行头前面加一个“|”符号即可。
      * 当然stripMargin方法也可以自己指定“定界符”,同时更有趣的是利用stripMargin.replaceAll方法,
      * 还可以将多行字符串”合并”一行显示。
      */
    val configStr =
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname="$masterHost"
         |akka.remote.netty.tcp.port="$masterPort"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //创建老大ActorSystem
    val masterActorSystem = ActorSystem("masterActorSystem", config)
    //使用ActorSystem创建actor
    val masterActor = masterActorSystem.actorOf(Props[Master], "masterActor")

    //给新创建的masterActor发送一条消息,发送消息使用感叹号"!"
    masterActor ! "started"
    //将ActorSystem阻塞在这,不要让其停止
    masterActorSystem.whenTerminated
  }
}

============================================================

import java.util.UUID

import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

/**
  * Worker.scala
  */
class Worker(var cores: Int, var memory: Int) extends Actor {
  var masterActor: ActorSelection = null
  var workerId = UUID.randomUUID().toString
  val heartBeatInterval = 3000

  //actor启动后会立即执行此方法,只会执行一次
  override def preStart(): Unit = {
    masterActor = context.actorSelection("akka.tcp://[email protected]:8081/user/masterActor")
    //2、worker启动后立即向master进行注册
    masterActor ! RegisterWorker(workerId, cores, memory)
  }

  //用来接收消息的方法,这个方法会被多次执行,只要有消息过来就会被执行
  override def receive: Receive = {
    case "started" => println("worker setup sucessful.....")
    //master发送过来的注册成功的消息
    case RegisteredWorker => {
      //导入定时器
      import context.dispatcher
      //开启定时任务
      //此处的millis需要导入import scala.concurrent.duration._
      context.system.scheduler.schedule(0 millis, heartBeatInterval millis, self, SendHeartBeat)
    }
    //向master汇报心跳
    case SendHeartBeat => {
      masterActor ! SendHeartBeat(workerId)
    }
  }
}

object Worker {
  def main(args: Array[String]): Unit = {
    val workerHost = "localhost"
    val workerPort = "8084"
    /**
      * 在实际应用场景下,有时候我们就是确实需要在scala创建多少字符串,但是每一行需要固定对齐。
      * 解决该问题的方法就是应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符,
      * 在多行换行的行头前面加一个“|”符号即可。
      * 当然stripMargin方法也可以自己指定“定界符”,同时更有趣的是利用stripMargin.replaceAll方法,
      * 还可以将多行字符串”合并”一行显示。
      */
    val configStr =
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname="$workerHost"
         |akka.remote.netty.tcp.port="$workerPort"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //创建老大ActorSystem
    val workerActorSystem = ActorSystem("workerActorSystem", config)
    //使用ActorSystem创建actor
    //创建一个Worker对象,Cores为62,memory为128
    val workerActor = workerActorSystem.actorOf(Props(new Worker(62, 128)), "workerActor")

    //给新创建的masterActor发送一条消息,发送消息使用感叹号"!"
    workerActor ! "started"
    //将ActorSystem阻塞在这,不要让其停止
    workerActorSystem.whenTerminated
  }
}
========================================================
/**
  * WorkInfo.class
  * 用来封装Worker的基本信息
  */
class WorkerInfo(var workerId: String, var cores: Int, var memory: Int) {
  //worker最近汇报心跳的时间
  var lastHeartBeatTime:Long = System.currentTimeMillis()
}
========================================================================

/**
  * RemoteMessage.scala
  */
//因为需要走网络,所以此trait需要继承Serializable
trait RemoteMessage extends Serializable {}

//worker------>master
case class RegisterWorker(var workerId: String, var cores: Int, var memory: Int) extends RemoteMessage

//master ---->worker
case class RegisteredWorker() extends RemoteMessage

//worker ------> worker
object SendHeartBeat

//worker------>master
case class SendHeartBeat(var workerId: String)

//master ------> master
object CheckTimeOutWorker