Spark Streaming一个简单例子

楔子

《Spark快速大数据分析》学习

10 Spark Streaming

​ 许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学习模型的应用,还有自动检测异常的应用。Spark Streaming是Spark为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的API来编写流式计算应用,这样就可以大量重用批处理的技术甚至代码。

​ 和Spark的基于RDD的概念很相似,Spark Streaming使用离散化流作为抽象表示,叫做Dstream。它是随时间推移而受到的数据序列。在内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。Dstream可以从各种输入源创建,例如Flume、Kafka、HDFS。创建出来的Dstream支持两种操作,一种是转化操作,会生成一个新的Dstream,另一种是输出操作,可以把数据写入外部系统中。Dstream提供了许多RDD所支持的操作相类似的操作支持,还增加了与时间的新操作,比如滑动窗口。

10.1 一个简单的例子

​ 从一台服务器的8888端口接收以换行符分隔的多行文本,要从中筛选出包含error的行,并打印。

我是用的是2.1.1版本

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>

​ 从创建StreamingContext开始,它是流计算功能的主要入口。StreamingContext会在底层创建SparkContext,用来处理数据,其构造函数还接收指定多长时间处理一次新数据的批次间隔作为输入。

/**
 * 打印包含error的行 <br>
 * 需要注意的是配置一个拥有两个线程的应用程序 local[2]<br>
 * nc 模拟数据  先启动发送端,<br>
 * 
 * nc 在windows 发送端: nc -l -p 端口 <br>
 * nc 在windows 接收端: nc 127.0.0.1 端口 <br>
 * 
 * @param ssc
 */
private static void errorDemo(JavaStreamingContext ssc) throws InterruptedException {
	// 以8888 端口作为输入创建DStream
	JavaReceiverInputDStream<String> socketTextStream = ssc.socketTextStream("127.0.0.1", 8888);
	// 从中筛选出包含error的行

	// JavaDStream<String> javaDStream = socketTextStream.filter(t->t.contains("error"));
	// 打印有error的行
	JavaDStream<String> dStream = socketTextStream.filter(new Function<String, Boolean>() {
		@Override
		public Boolean call(String v1) throws Exception {
			return v1.contains("error");
		}
	});
	dStream.print();
	// 启动流计算环境
	ssc.start();
	// 等待作业完成
	ssc.awaitTermination();
}
/**
 * 创建 JavaStreamingContext
 * 
 * @returnJavaStreamingContext
 */
private static JavaStreamingContext createStreamingContext() {
	// Create the context with a 5 second batch size
	// TODO 此处 创建一个拥有两个线程的应用程序 local[2] 一定是2个否则看不到效果
	SparkConf sparkConf = new SparkConf().setAppName("sparkStreaming_Demo").setMaster("local[2]");
	return new JavaStreamingContext(sparkConf, Durations.seconds(5));
}

​ 数据源使用nc 模拟

# windows上使用nc 模拟数据源

# 发送端 nc -l -p 端口
# 接收端 nc ip 端口

Spark Streaming一个简单例子

Spark Streaming一个简单例子
Spark Streaming一个简单例子
Spark Streaming一个简单例子