Streaming的重要算子
Transform
可以通过这个算子对Dstream和RDD之间互操作,返回值还是Dstream。
package com.ruozedata.bigdata.streaming03
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object TransformApp {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(conf,Seconds(10))
val blackTuple=new ListBuffer[(String,Boolean)]
blackTuple.append(("doudou",true))
blackTuple.append(("huahua",true))
val blacksRDD=ssc.sparkContext.parallelize(blackTuple)
val lines=ssc.socketTextStream("hadoop000",9999)
//xx,xx info => (xx,(xx,xx info))
//返回Dstream
lines.map(x=>(x.split(",")(0),x)).transform(rdd=>{
rdd.leftOuterJoin(blacksRDD).filter(x=>{
x._2._2.getOrElse(false) !=true
}).map(_._2._1)
}).print()
ssc.start()
ssc.awaitTermination()
}
}
UpdateStateByKey
相当于对不同批次的累加和更新,可以下一批次把之前批次的数据都参与计算。需要开启checkpoint记录批次数据。
package com.ruozedata.bigdata.streaming03
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object UpdateStateByKeyApp {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyApp")
val ssc = new StreamingContext(conf,Seconds(10))
//记录以前批次的,指定到当前目录
ssc.checkpoint("hdfs://hadoop000:9000/ss/logs")
val lines=ssc.socketTextStream("hadoop000",9999)
val results=lines.flatMap(_.split(",")).map((_,1))
val state = results.updateStateByKey(updateFunction)
state.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunction(currentValues:Seq[Int],preValues:Option[Int]):Option[Int] = {
val curr = currentValues.sum
val pre = preValues.getOrElse(0)
Some(curr + pre)
}
}
ForeachRDDApp
保存Dstream的所有rdd的数据到其他地方
package com.ruozedata.bigdata.streaming03
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
val ssc = new StreamingContext(conf,Seconds(10))
val lines=ssc.socketTextStream("hadoop000",9999)
val results=lines.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
//(ruoze,2)(jepson,1)
results.foreachRDD(rdd =>{
rdd.foreachPartition(partition=>{
val connection = createConnection()
partition.foreach(pair=>{
val sql=s"insert into wc(word,count) values('${pair._1}',${pair._2})"
connection.createStatement().execute(sql)
connection.close()
})
})
// rdd.foreach(pair =>{
// //connection必须在worker端创建,但是会导致每条rdd都会创建一次,所以应当用foreachpartition
// val connection = createConnection()
// val sql=s"insert into wc(word,count) values('${pair._1}',${pair._2})"
// connection.createStatement().execute(sql)
// connection.close()
// })
})
ssc.start()
ssc.awaitTermination()
}
def createConnection() ={
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://hadoop000:3306/g5_spark","root","123456")
}
}
WindowsApp
实现一阶段内的累加 ,而不是程序启动时
假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错。
package com.ruozedata.bigdata.streaming03
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowsApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("WindowsApp")
val ssc = new StreamingContext(conf,Seconds(5))
val lines = ssc.socketTextStream("hadoop000",9999)
lines.flatMap(_.split(",")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(5)).print()
ssc.start()
ssc.awaitTermination()
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>