使用Spark机器学习,流技术和Kafka API监视实时Uber数据(第2部分)
这篇文章是该系列文章的第二部分,我们将建立一个实时示例,用于分析和监视Uber车载GPS行程数据 。 如果您尚未阅读本系列的第一部分 ,则应先阅读。
第一篇文章讨论了使用Apache Spark的K-means算法创建机器学习模型,以基于位置对Uber数据进行聚类。 第二篇文章将讨论将保存的K-means模型与流数据一起使用,以对Uber汽车在何时何地进行集群进行实时分析。
用例示例:地理上集群的车辆/物品的实时分析
使用Kafka API将 Uber行程数据发布到MapR Streams主题 。 订阅该主题的Spark流媒体应用程序使用k-means模型使用与位置对应的集群ID丰富数据,并将结果以JSON格式发布到另一个主题。 订阅第二个主题的Spark流媒体应用程序实时分析JSON消息。
用例数据示例
示例数据集是Uber行程数据,您可以在本系列的第1部分中了解更多信息。 传入的数据记录为CSV格式。
示例行如下所示:
丰富的数据记录为JSON格式。 示例行如下所示:
Spark Kafka消费者生产者代码
解析数据集记录
Scala Uber案例类定义了与CSV记录相对应的架构。 parseUber函数将逗号分隔的值解析为Uber案例类。
加载K均值模型
Spark KMeansModel类用于加载拟合在历史Uber行程数据上的已保存K均值模型。
模型clusterCenters的输出:
集群中心下方显示在Google地图上:
Spark流代码
这些是Spark Streaming Consumer Producer代码的基本步骤:
- 配置Kafka Consumer Producer属性。
- 初始化Spark StreamingContext对象。 使用此上下文,创建一个从主题读取消息的DStream。
- 应用转换(创建新的DStreams)。
- 将消息从转换的DStream写入主题。
- 开始接收数据并进行处理。 等待处理停止。
我们将通过示例应用程序代码完成所有这些步骤。
- 配置Kafka Consumer Producer属性
第一步是设置KafkaConsumer和KafkaProducer配置属性,稍后将使用它们来创建DStream,以接收/发送主题消息。 您需要设置以下参数:
- 键和值反序列化器:用于反序列化消息。
- 自动偏移量重置:从最早或最新的消息开始读取。
- 引导服务器:由于代理地址实际上未由MapR流使用,因此可以将其设置为虚拟host:port。
有关配置参数的更多信息,请参见MapR Streams文档 。
- 初始化Spark StreamingContext对象。
如下所示,ConsumerStrategies.Subscribe用于设置主题和Kafka配置参数。 我们将KafkaUtils createDirectStream方法与StreamingContext,消费者和位置策略一起使用,以根据MapR Streams主题创建输入流。 这将创建一个DStream来表示传入的数据流,其中每个消息都是一个键值对。 我们使用DStream映射转换来创建带有消息值的DStream。
- 应用转换(创建新的DStreams)
我们使用DStream foreachRDD方法将处理应用于此DStream中的每个RDD。 我们使用DStream上的map操作将消息值解析为Uber对象。 然后,我们将RDD转换为DataFrame,这使您可以对流数据使用DataFrame和SQL操作。
这是df.show的示例输出:
VectorAssembler用于转换并返回带有向量列中的纬度和经度特征列的新DataFrame。
然后使用模型通过模型转换方法从要素中获取聚类,该方法将返回具有聚类预测的DataFrame。
category.show的输出如下:
然后将DataFrame注册为表,以便可以在SQL语句中使用它。 SQL查询的输出如下所示:
- 将消息从转换的DStream写入主题
查询的数据集结果将转换为JSON RDD字符串,然后使用RDD sendToKafka方法将JSON键值消息发送到主题(在这种情况下,键为null)。
示例消息值(temp.take(2)的输出)如下所示:
{“ dt”:“ 2014-08-01 00:00:00”,“ lat”:40.729,“ lon”:-73.9422,“ base”:“ B02598”,“ cluster”:7}
{“ dt”:“ 2014-08-01 00:00:00”,“ lat”:40.7406,“ lon”:-73.9902,“ base”:“ B02598”,“ cluster”:7}
- 开始接收数据并进行处理。 等待处理停止。
要开始接收数据,我们必须在StreamingContext上显式调用start(),然后调用awaitTermination以等待流计算完成。
Spark Kafka消费者代码
接下来,我们将介绍一些使用了JSON丰富消息的Spark流代码。
我们使用Spark Structype指定架构:
下面是代码:
- 创建直接卡夫卡流
- 使用带有模式的spark.read.json将JSON消息值转换为Dataset [Row]
- 为后续的SQL查询创建两个临时视图
- 使用ssc.remember缓存查询数据
现在,我们可以查询流数据以询问类似以下问题:哪个小时的接机次数最多? (输出显示在Zeppelin笔记本中):
spark.sql(“将小时数(uber.dt)选择为小时,将计数(cluster)转换为ct,从uber组中按小时(uber.dt)”)
每个群集中发生了多少次接机?
df.groupBy(“群集”).count()。show()
要么
spark.sql(“选择集群,从集群中按uber组计数作为count(集群)”)
一天中的哪个小时和哪个集群的接机次数最多?
spark.sql(“将小时数(uber.dt)选择为小时,将计数(cluster)转换为ct,从uber组中按小时(uber.dt)”)
显示Uber行程的日期时间和集群计数:
%sql选择集群,dt,count(cluster)作为uber按dt分组的计数,按dt的集群顺序,集群
软件
- 您可以从此处下载完整的代码,数据和说明,以运行此示例 。
- 本示例在带有Spark 2.0.1的MapR 5.2上运行。 如果您在MapR v5.2 沙盒上运行,则需要将Spark升级到2.0.1(MEP 2.0)。 有关升级的更多信息,请参见: 此处和此处 。
摘要
在此博客文章中,您学习了如何在Spark Streaming应用程序中使用Spark机器学习模型,以及如何将Spark Streaming与MapR Streams集成以使用Kafka API来消费和产生消息。
参考和更多信息:
- 将Spark与MapR Streams文档集成
- 有关MapR Streams的免费在线培训,请访问Learn.mapr.com,获取Spark
- Apache Spark流编程指南
- 使用Apache API的实时流数据管道:Kafka,Spark流和HBase
- Apache Kafka和MapR Streams:术语,技术和新设计