Scala入门(三) Actor并发编程

Actor 编程

Actor简介

Scala 中的 Actor 能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息的发送、接收来实现高并发的。

​ Actor 可以看作是一个个独立的实体,他们之间是毫无关联的。但是,他们可以通过消息来通信。一个 Actor 收到其他 Actor 的信息后,它可以根据需要作出各种相应。消息的类型可以是任意的,消息的内容也可以是任意的。

与 Java 的基于共享数据和锁的线程模型不同,Scala 的 actor 包则提供了另外一种不共享任何数据、依赖消息传递的模型,从而进行并发编程。

Scala入门(三) Actor并发编程

Actor执行顺序

  • 首先调用 start()方法启动 Actor
  • 调用 start()方法后其 act()方法会被执行
  • 向 Actor 发送消息
  • act 方法执行完成之后,程序会调用 exit 方法

Actor发送消息

  • ! 发送异步消息,没有返回值。
  • !? 发送同步消息,等待返回值。返回值时Any
  • !! 发送异步消息,返回值是 Future[Any]。

Future 表示一个异步操作的结果状态,可能还没有实际完成的异步任务的结果。
Any 是所有类的超类,Future[Any]的泛型是异步操作结果的类型。

Actor接收消息

  • act 方法中通过 receive 方法接受消息并进行相应的处理
  • 使用 react 方法代替 receive 方法去接受消息
    • react 方式会复用线程,避免频繁的线程创建、销毁和切换。比 receive 更高效
    • react 如果要反复执行消息处理,react 外层要用 loop,不能用 while
  • Actor 可以返回消息给发送方。通过 sender 方法向当前消息发送方返回消息
package com.hrh.obj

import scala.actors.{Actor, Future}

case class Syn(message:String)
case class Asyn(message:String)
case class Result(message:String)

class MyActor extends Actor{
  override def act(): Unit ={
    loop{

      react{
        case Syn(message)=>{
          println(message)

          sender ! Result("同步有返回值的返回")
        }

        case Asyn(message)=>{
          println(message)

          sender ! Result("异步有返回值的返回")
        }

        case _ =>{
          println("haha")

        }

      }
    }

  }
}

object MyActor {
  def main(args: Array[String]): Unit = {

    val actor = new MyActor
    actor.start()

    actor ! Asyn("异步无返回值")

    val synResult: Any = actor !? Syn("同步有返回值")
    val message: String = synResult.asInstanceOf[Result].message
    println(message)


    val asynResult: Future[Any] = actor !! Asyn("异步有返回值")
    val message2: String = asynResult.apply().asInstanceOf[Result].message
    println(message2)

    actor ! "nihao"
  }
}

Actor WordCount

​ 用 actor 并发编程写一个单机版的 WordCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。

  • 通过 loop +react 方式去不断的接受消息
  • 利用 case class 样例类去匹配对应的操作
  • 其中 scala 中提供了文件读取的接口 Source,通过调用其 fromFile 方法去获取文件内容
  • 将每个文件的单词数量进行局部汇总,存放在一个 ListBuffer 中
  • 最后将 ListBuffer 中的结果进行全局汇总。
package com.hrh.wordcount

import scala.actors.{Actor, Future}
import scala.collection.mutable.ListBuffer
import scala.io.Source

case class File(filename:String)

case class Result(result:Map[String, Int])

class WordCountActor extends Actor{
  override def act(): Unit ={

    loop{
      react{
        case File(filename) => {
          val lines = Source.fromFile(filename).mkString
          //每个文件的内容字符串
          println(lines) //

          val split: Array[String] = lines.split("\r\n")
          println(split.toBuffer)//ArrayBuffer(hello hello hadoop, hadoop spark)

          val flatten: Array[String] = split.map(x=>x.split(" ")).flatten
          println(flatten.toBuffer)//ArrayBuffer(hello, hello, hadoop, hadoop, spark)

          val map: Array[(String, Int)] = flatten.map(x=>(x,1))
          println(map.map(x=>print(x)))//(hello,1)(hello,1)(hadoop,1)(hadoop,1)(spark,1)

          val groupBy: Map[String, Array[(String, Int)]] = map.groupBy(x=>x._1)
          println(for(i <- groupBy) print(i._1+" -> "+i._2.toBuffer))//spark -> ArrayBuffer((spark,1))hadoop -> ArrayBuffer((hadoop,1), (hadoop,1))hello -> ArrayBuffer((hello,1), (hello,1))()

          val result: Map[String, Int] = groupBy.map(x=>(x._1,x._2.length))
          println(result.map(x=>print(x)))//(spark,1)(hadoop,2)(hello,2)

          sender ! Result(result)

        }
      }
    }
  }
}

object WordCountActor{
  def main(args: Array[String]): Unit = {

    //文件列表
    val fileArr=Array("E:\\a.txt","E:\\b.txt","E:\\c.txt")

    //存储每个文件的future集合
    val futureList=ListBuffer[Future[Any]]()

    //最终单词列表
    val wordList=ListBuffer[Map[String, Int]]()


    for(i <- fileArr){
      val actor = new WordCountActor

      actor.start()

      val future: Future[Any] = actor !! File(i)



      futureList += future;
    }

    while(futureList.size>0){
      for( future <- futureList){
        if(future.isSet){
          val reply: Any = future.apply()//Result(Map(spark -> 1, hadoop -> 2, hello -> 2))

          val result: Map[String, Int] = reply.asInstanceOf[Result].result //Map(spark -> 1, hadoop -> 2, hello -> 2)

          wordList += result

          futureList -= future
        }
      }
    }
    println(wordList)//Map(spark -> 1, hadoop -> 1, storm -> 1, hello -> 1), Map(spark -> 1, hadoop -> 2, hello -> 2), Map(spark -> 2, storm -> 3)

    val map: ListBuffer[Array[(String, Int)]] = wordList.map(x=>x.toArray)
    println(map.map(x=>print(x.toBuffer)))  //ArrayBuffer((spark,1), (hadoop,1), (storm,1), (hello,1))ArrayBuffer((spark,1), (hadoop,2), (hello,2))ArrayBuffer((spark,2), (storm,3))

    val flatten: ListBuffer[(String, Int)] = map.flatten
    println(flatten)//ListBuffer((spark,2), (storm,3), (spark,1), (hadoop,1), (storm,1), (hello,1), (spark,1), (hadoop,2), (hello,2))

    val by: Map[String, ListBuffer[(String, Int)]] = flatten.groupBy(x=>x._1)
    println(for(i <- by) print(i._1+" -> "+i._2.toBuffer))//hadoop -> ArrayBuffer((hadoop,1), (hadoop,2))spark -> ArrayBuffer((spark,2), (spark,1), (spark,1))storm -> ArrayBuffer((storm,3), (storm,1))hello -> ArrayBuffer((hello,1), (hello,2))()


    val word: Map[String, Int] = by.mapValues(x=>x.foldLeft(0)((x,y)=>x+y._2))
    println(word)//Map(hadoop -> 3, spark -> 4, storm -> 4, hello -> 3)
  }
}
foldLeft 解析

val tuples: ListBuffer[(String,Int)] = ListBuffer(("hadoop",1),("hadoop",2),("hadoop",3))

 scala源码
def foldLeft[B](z: B)(op: (B, A) => B): B = {
	var result = z
	this foreach (x => result = op(result, x))
	result
}
//def 关键字
//foldLeft 方法名
//[B] 返回值类型
//z 返回值初始值
//op: 方法参数为函数
def foldLeft [Int](0)(op: (result,y)=>result+y._2): Int = {
	var result = 0
	this foreach (x => result = op(result, x))
	result
}

第一次循环: result= op(0,(hadoop,1))=>0+1=1
第二次循环: result= op(1,(hadoop,2))=>1+2=3
第三次循环: result= op(3,(hadoop,3))=>3+3=6