使用IDEA编写StructuredStreaming_scoket

Socket数据实时计算:
准备工作
nc -lk 9999

hadoop spark sqoop hadoop spark hive hadoop
使用IDEA编写StructuredStreaming_scoket

代码演示 :

def main(args: Array[String]): Unit = {
//1 创建sparksession
val spark: SparkSession = SparkSession.builder()
.master(“local[*]”)
.appName(“StructStreaming_socket”)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//2 读物实时数据 数据类型是Row
val socketDatasRow: DataFrame = spark.readStream.option(“host”,“hadoop01”).
option(“port”,“9999”)
.format(“socket”)
.load()
//3 对数据进行处理和计算
import spark.implicits._
val socketDatasString: Dataset[String] = socketDatasRow.as[String]
val Word: Dataset[String] = socketDatasString.flatMap(a=>{a.split(" ")})
//使用DSL (SQL)对数据进行计算
val StructWordCount: Dataset[Row] = Word.groupBy(“value”).count().sort($“count”)
//4 输出(启动-等待关闭)
StructWordCount.writeStream
.trigger(Trigger.ProcessingTime(0)) //尽快执行
.format(“console”) //数据输出到控制台
.outputMode(“complete”) //输出所有数据
.start() //开始计算
.awaitTermination() //=等待关闭
}