【图文详细 】Scala——Akka Actor
4、Akka Actor
4.1、Akka 概述
Akka 基于 Actor 模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快 速响应的(Responsive)应用程序的平台。
Actor 模型:在计算机科学领域,Actor 模型是一个并行计算(Concurrent Computation)模型, 它把 actor 作为并行计算的基本元素来对待:为响应一个接收到的消息,一个 actor 能够自 己做出一些决策,如创建更多的 actor,或发送更多的消息,或者确定如何去响应接收到的 下一个消息。
Actor 是 Akka 中最核心的概念,它是一个封装了状态和行为的对象,Actor 之间可以通过交 换消息的方式进行通信,每个 Actor 都有自己的收件箱(Mailbox)。通过 Actor 能够简化锁 及线程管理,可以非常容易地开发出正确地并发程序和并行系统。
Actor 具有如下特性:
1、提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下 的编程开发
2、提供了异步非阻塞的、高性能的事件驱动编程模型
3、超级轻量级事件处理(每 GB 堆内存几百万 Actor)
4.2、重要 API 介绍
4.2.1、ActorSystem
在 Akka 中,ActorSystem 是一个重量级的结构,他需要分配多个线程,所以在实际应用中, ActorSystem 通常是一个单例对象,我们可以使用这个 ActorSystem 的 actorOf 方法创建很多 Actor。
4.2.2、Actor
在 Akka 中,Actor 负责通信,在 Actor 中有一些重要的生命周期方法。
1、preStart()方法:该方法在 Actor 对象构造方法执行后执行,整个 Actor 生命周期中仅执行 一次。
2、receive()方法:该方法在 Actor 的 preStart 方法执行完成后执行,用于接收消息,会被反 复执行。
4.2.3、ActorSystem 和 Actor 对比
Actor: 就是用来做消息传递的 用来接收和发送消息的,一个 Actor 就相当于是一个老师或者是学生。 如果我们想要多个老师,或者学生,就需要创建多个 Actor 实例。
ActorSystem: 用来创建和管理 Actor,并且还需要监控 Actor。ActorSystem 是单例的(object) 在同一个进程里面,只需要一个 ActorSystem 就可以了
4.3、利用 Akka 构建 RPC 应用案例
4.3.1、需求
目前大多数的分布式架构底层通信都是通过 RPC 实现的,RPC 框架非常多,比如前我们学过 的 Hadoop 项目的 RPC 通信框架,但是 Hadoop 在设计之初就是为了运行长达数小时的批量 而设计的,在某些极端的情况下,任务提交的延迟很高,所有 Hadoop 的 RPC 显得有些笨重。
Spark 的 RPC 是通过 Akka 类库实现的,Akka 用 Scala 语言开发,基于 Actor 并发模型实现, Akka 具有高可靠、高性能、可扩展等特点,使用 Akka 可以轻松实现分布式 RPC 功能。
4.3.2、应用架构
4.2.3、具体实现
Master 代码实现:
class Master extends Actor{
def doHello(): Unit ={ println("我是 Master, 我接收到了 Worker 的 hello 的消息");
}
/**
* 其实就是一个死循环 : 接收消息
* while(true)
*/
override def receive: Receive = {
case "hello" =>{
doHello()
//sender() 谁发送过来消息这个就是谁
//sender() ! "hi" 给 sender() 发送一个 hi的消息
sender() ! "hi"
}
}
}
object Master {
def main(args: Array[String]): Unit = {
val str=
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
|akka.remote.netty.tcp.port = 6790
""".stripMargin
val conf: Config = ConfigFactory.parseString(str)
// def apply(name: String, config: Config)
val actorSystem = ActorSystem("MasterActorSystem", conf)
// 创建并启动 actor def actorOf(props: Props, name: String): ActorRef
//new Master() 会导致主构造函数会运行!!
actorSystem.actorOf(Props(new Master()), "MasterActor")
}
}
Worker 代码实现:
class Worker extends Actor{// 生命周期
def doHi(): Unit ={
println("我是 Worker,我接收到了 Master 的 hi 的消息");
}
// 如果 actor一执行首先运行的是这个方法,只运行一次。
override def preStart(): Unit = {
// 实现的是给 Master 发送消息 地址
val workerActor =
context.actorSelection("akka.tcp://[email protected]:6790/user/MasterActo
r")
workerActor ! "hello"
}
override def receive: Receive = {
case "hi" => {
doHi()
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
val str=
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
""".stripMargin
val conf = ConfigFactory.parseString(str)
val actorSystem = ActorSystem("WorkerActorSystem", conf)
actorSystem.actorOf(Props(new Worker()), "WorkerActor")
}
}
4.2.4、执行测试
先启动 Master,再启动 Worker