objectStreamingWordCountextendsApp{ //创建配置 val sparkConf = newSparkConf().setAppName("streaming word count").setMaster("local[*]") //创建StreamingContext val ssc = newStreamingContext(sparkConf, Seconds(5)) //从socket接口数据 val lineDStream = ssc.socketTextStream("datanode1", 9999)
val wordDStream = lineDStream.flatMap(_.split(" ")) val word2CountDStream = wordDStream.map((_, 1)) val result = word2CountDStream.reduceByKey(_ + _)
defstart(): Unit = synchronized { state match { caseINITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate()
// Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get())) scheduler.start() } state = StreamingContextState.ACTIVE } catch { caseNonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } logDebug("Adding shutdown hook") // force eager creation of logger shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") caseACTIVE => logWarning("StreamingContext has already been started") caseSTOPPED => thrownewIllegalStateException("StreamingContext has already been stopped") } }