静下心来学spark06
1.spark Streaming是spark Core API 的扩展,它支持弹性的,高吞吐的,容错的实时数据流的处理,数据可以通过多种数据原获取,例如Kafka,fume,kinesis以及TCP sockets,也可以通过map,reduce,join,window等高级函数组成的复杂算法处理,最总,处理后的数据可以输出到文件系统,数据库以及实时仪表盘中,事实上,你还可以在data stream(数据流)上使用机器学习一起图计算算法
spark中文学习网站
Dstream 是一个离散的流,是spark streaming基本的抽象,是一个连续同类型的RDD的序列,代表一个连续的数据流
在内部, 它工作原理如下, Spark Streaming 接收实时输入数据流并将数据切分成多个 batch(批)数据, 然后由 Spark 引擎处理它们以生成最终的 stream of results in batches(分批流结果).
此处对比Flink:Flink是流模拟批
Spark Streaming特性:
- A list of other DStreams that the DStream depends on
一个放了多个DStream的列表,并且DStream之间有依赖关系
- A time interval at which the DStream generates an RDD
每隔一段时间DStream会生成一个RDD,DStream可以放多个同类型的RDD
- A function that is used to generate an RDD after each time interval
每隔一段时间生成的RDD都会有一个函数作用到这个RDD上
Stremaing api练习
// linux 安装netcat
yum -y install nc
### 使用
nc -lk 端口号(port)
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 实现一个简单的wordcount
* 数据需要从netcat中获取
*/
object HomeWorkStreaming_WC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HomeWorkStreaming_WC").setMaster("local[2]")
val sc = new SparkContext(conf)
// 创建spark的上下文
val ssc = new StreamingContext(sc, Durations.seconds(5))
// 也可以用这种方法
// val ssc = new StreamingContext(sc,Seconds(5))
// 开始获取netcat数据 这种获取方式会把获取的数据以缓存的方式放在指定缓存级别的地方
val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
// 开始分析数据
val res: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 打印到控制台
res.print()
// 开始提交任务到集群
ssc.start()
// 线程等待
ssc.awaitTermination()
}
}
awaitTermination() 线程池方法
当前线程阻塞,直到
等所有已提交的任务(包括正在跑的和队列中等待的)执行完
或者等超时时间到
或者线程被中断,抛出InterruptedException
然后返回true(shutdown请求后所有任务执行完毕)或false(已超时)
过程图解
2.用Kafka模拟数据,streaming实现wordcount
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
object HomeWork09_LoadTopicDataWC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HomeWork09_LoadTopicDataWC").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// 设置检查点
ssc.checkpoint("hdfs://hadoop01:8020/out-20190110-2")
// 设置请求Kafka的配置信息
val Array(zkQuorum, group, topics, numThread) = args
// 将topics数据封装到map里面
val topicMap: Map[String, Int] = topics.split(",").map((_, numThread.toInt)).toMap
// 开始调用Kafka工具类获取topic的消息
val dstream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK)
// 获取到的Kafka的数据是key value 的,其中key为offset,在实际统计中不需要,可以过滤
val lines: DStream[String] = dstream.map(_._2)
// 开始统计
val tups: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))
val res: DStream[(String, Int)] = tups.updateStateByKey(func1, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
res.print()
ssc.start()
ssc.awaitTermination()
}
val func1 = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map {
case (x, y, z) => {
(x, y.sum + z.getOrElse(0))
}
}
}
}
3.用netcat模拟数据流,streaming编写word count (包括历史数据)
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
/**
* 在将历史数据应用到当前批次的时候,可以使用updateStaeByKey 原语实现
*
* 实现历史批次累加功能也可以借助数据库来实现
* updateStaeByKey有获取历史批次结果应用到当前批次的功能,该原语是没有存储 历史批次结果的功能的
* 所以,实现批次累加只需要进行checkpoint --streaming中checkpoint具有存储历史数据的功能
*/
object HomeWork09_AccSparkStreamingWC {
def main(args: Array[String]): Unit = {
// 模板代码
val conf = new SparkConf().setAppName("HomeWork09_AccSparkStreamingWC").setMaster("local[2]")
// val ssc = new StreamingContext(conf,Seconds(5))
val ssc = new StreamingContext(conf, Milliseconds(5000))
// 做checkpoint
ssc.checkpoint("hdfs://hadoop01:8020/out2/cp-2019-1-10")
// 获取数据
val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
// 开始统计
val tups: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_, 1))
val res: DStream[(String, Int)] = tups.updateStateByKey(func2, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
res.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 迭代器中
* 第一个参数:数据中的key
* 第二个参数:当前批次中相同key对应的value Seq(1,1,1)
* 第三个参数:历史结果中相同的key对应的value Some(2)
*/
val func2 = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse(0))
})
}
}
4.transform操作DStream中的RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 用transform 可以操作DStream 中的RDD
*/
object HomeWork09_Tranfrom {
def main(args: Array[String]): Unit = {
// 模板代码
val conf = new SparkConf().setAppName("HomeWork09_Tranfrom").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
val res: DStream[(String, Int)] = dstream.transform(rdd => {
rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
})
res.print()
ssc.start()
ssc.awaitTermination()
}
}
5.窗口操作(window) 窗口时间应该是批次时间的倍数
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HomeWork09_WindowOperation {
def main(args: Array[String]): Unit = {
// 模板代码
val conf = new SparkConf().setAppName("HomeWork09_WindowOperation").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
val tups: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map((_, 1))
val res: DStream[(String, Int)] = tups.reduceByKeyAndWindow((x, y) => x + y, Seconds(6))
res.print()
ssc.start()
ssc.awaitTermination()
}
}
6.SparkStreaming窗口:
是指展示数据结果的范围,是streaming中用来描述展示数据结果范围的,所以一个窗口往往会包含多个批次的间隔的结果范围
窗口操作:一段时间内数据发生的变化
窗口操作过程中,需要我们指定两个重要的参数
1窗口长度:指窗口的持续时间(每次展示的结果范围)
2.滑动间隔:是指窗口操作的间隔(窗口从一个地方滑动到另一个地方的间隔)
### 注意,这两个参数必须是DStream批次间隔的整数倍
应用场景:
比如:批次间隔为2秒,当我们需要每次展示一小时的结果范围,类似于这样的需求,就需要用窗口操作了
如果我们把批次间隔直接调整为1小时,可能会因为分析的数据太多而出现延迟或者OOM
!#消费数据反写到Kafka
未解答
7.streaming消费kafka的两种方式的优缺点的总结
两种方式第一种是利用接收器和Kafka的高级api
第二种是不使用接收器 即Direct
博客园这个兄弟描述的形象一些-By-奔跑的蜗牛-传送门
8.关于手动写入offset的方式
// KafkaUtils.createDirectStream
前提条件:
所有相关消息服务必须停止:
手动更新Kafka中存在zookeeper中的偏移量,我们需要手动将某个主题的偏移量设置为某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类。Kafka.tools.UpdateOffsetsInZK,我们可以通过她修改Zookeeper中某个主题的偏移量
未完待续。。。
9.streaming消费kafka可能遇到什么问题
未解答
10.消息传递语义–如何保证数据零丢失
未解答