Streaming的重要算子

Transform

可以通过这个算子对Dstream和RDD之间互操作,返回值还是Dstream。

package com.ruozedata.bigdata.streaming03

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

object TransformApp {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local[2]").setAppName("TransformApp")
    val ssc = new StreamingContext(conf,Seconds(10))

    val blackTuple=new ListBuffer[(String,Boolean)]
    blackTuple.append(("doudou",true))
    blackTuple.append(("huahua",true))
    val blacksRDD=ssc.sparkContext.parallelize(blackTuple)


    val lines=ssc.socketTextStream("hadoop000",9999)
    //xx,xx info => (xx,(xx,xx info))
    //返回Dstream
    lines.map(x=>(x.split(",")(0),x)).transform(rdd=>{
      rdd.leftOuterJoin(blacksRDD).filter(x=>{
        x._2._2.getOrElse(false) !=true
      }).map(_._2._1)
    }).print()


    ssc.start()
    ssc.awaitTermination()
  }
}

UpdateStateByKey

相当于对不同批次的累加和更新,可以下一批次把之前批次的数据都参与计算。需要开启checkpoint记录批次数据。

package com.ruozedata.bigdata.streaming03

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


import scala.collection.mutable.ListBuffer

object UpdateStateByKeyApp {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyApp")
    val ssc = new StreamingContext(conf,Seconds(10))

    //记录以前批次的,指定到当前目录
    ssc.checkpoint("hdfs://hadoop000:9000/ss/logs")

    val lines=ssc.socketTextStream("hadoop000",9999)
    val results=lines.flatMap(_.split(",")).map((_,1))

    val state = results.updateStateByKey(updateFunction)
    state.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def updateFunction(currentValues:Seq[Int],preValues:Option[Int]):Option[Int] = {
    val curr = currentValues.sum
    val pre = preValues.getOrElse(0)

    Some(curr + pre)
  }

}

ForeachRDDApp

保存Dstream的所有rdd的数据到其他地方


package com.ruozedata.bigdata.streaming03

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ForeachRDDApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
    val ssc = new StreamingContext(conf,Seconds(10))


    val lines=ssc.socketTextStream("hadoop000",9999)
    val results=lines.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)

    //(ruoze,2)(jepson,1)
    results.foreachRDD(rdd =>{
      rdd.foreachPartition(partition=>{
        val connection = createConnection()

        partition.foreach(pair=>{
                  val sql=s"insert into wc(word,count) values('${pair._1}',${pair._2})"
                  connection.createStatement().execute(sql)
                  connection.close()
        })
      })


//      rdd.foreach(pair =>{
//        //connection必须在worker端创建,但是会导致每条rdd都会创建一次,所以应当用foreachpartition
//        val connection = createConnection()
//        val sql=s"insert into wc(word,count) values('${pair._1}',${pair._2})"
//        connection.createStatement().execute(sql)
//        connection.close()
//      })
    })

    ssc.start()
    ssc.awaitTermination()
  }

  def createConnection() ={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://hadoop000:3306/g5_spark","root","123456")
  }
}

WindowsApp

实现一阶段内的累加 ,而不是程序启动时
假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错。
Streaming的重要算子

package com.ruozedata.bigdata.streaming03

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowsApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("WindowsApp")
    val ssc = new StreamingContext(conf,Seconds(5))

    val lines = ssc.socketTextStream("hadoop000",9999)
    lines.flatMap(_.split(",")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(5)).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>