SparkStreaming——实例2:批处理目录
简单:
使用textFileStream替代socketTextStream即可
可以向目录不断添加文件查看结果
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object test1 {
def main(args: Array[String]): Unit = {
//创建sparkconf
val conf = new SparkConf().setAppName("demo1").setMaster("local[2]")
//从conf创建streamingcontext,设置批次间隔时间为5s
val ssc = new StreamingContext(conf,Seconds(5))
//设置检查点
ssc.checkpoint("hdfs://hadoop01:9000/ck-20190502")
//指定目录,使用textFileStream,创建FileInputStream
val lines = ssc.textFileStream("hdfs://hadoop01:9000/ssc")
//wordcount
val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//print,默认输出前十行
words.print()
//开始计算
ssc.start()
//等待计算结束
ssc.awaitTermination()
}
}