Spark Streaming - 根据按键分组的键值对计算统计信息
背景: 我使用Spark Streaming从Kafka流式处理以逗号分隔键值对形式出现的事件 以下是事件流入我的火花应用程序。Spark Streaming - 根据按键分组的键值对计算统计信息
Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100
输出:
我想要计算不同的度量(平均,计数等)由不同的密钥流中分组的对于给定批次的时间间隔例如
- 通过密钥1,密钥2(RESPONSETIME是在每一个事件的关键之一)
- 计数通过密钥1,密钥2
我尝试到目前为止平均RESPONSETIME:
val stream = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val pStream = stream.persist()
val events: DStream[String] = pStream.flatMap(_._2.split(","))
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.
更新 - 03/04 钥匙Key1,Key2 ...可能在传入流中乱序出现。
欣赏您的输入/提示。
一个可行的办法是这样的:
-
创建代表每个记录的情况下类,所以我们没有处理的元组:
case class Record( key1: String, key2: String, key3: String, key4: String, rt: Double)
-
使用正则表达式解析记录,丢弃残缺的条目:
import scala.util.matching.Regex val recordPattern = new Regex( "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++ "responseTime=(0-9+)$" ) val records = pStream.map { case recordPattern(key1, key2, key3, key4, rt) => Some(Record(key1, key2, key3, key4, rt.toDouble)) case _ => None }.flatMap(x => x) // Drop malformed
-
重塑数据键值对:
val pairs = records.map(r => ((r.key1, r.key2), r.rt))
-
创建一个分区,并使用
StatCounter
来汇总统计:import org.apache.spark.util.StatCounter import org.apache.spark.HashPartitioner val paritioner: HashPartitioner = ??? pairs.combineByKey[StatCounter]( StatCounter(_), _ merge _, _ merge _, paritioner )
-
的兴趣提取物领域:
stats.mapValues(s => (s.count, s.mean))
您也可以尝试像这样的无序数据,虽然我强烈建议修复th在上游:
val kvPattern = "(\\w+)=(\\w+)".r
val pairs = pStream.map(line => {
val kvs = kvPattern.findAllMatchIn(line)
.map(m => (m.group(1), m.group(2))).toMap
// This will discard any malformed lines
// (lack of key1, key2, lack or invalid format of responseTime)
Try((
(kvs("Key1"), kvs("Key2")),
kvs("responseTime").toDouble
))
}).flatMap(_.toOption)
并且按照以前那样继续。
谢谢+1。这可能工作。唯一的警告,我可能忘记在我的问题中提到的是key1,key2可能不一定每次都在相同的序列中,所以正则表达式不匹配。我正在考虑应用更通用的解决方案,因此需要通过关键值元组来查找配置的key1,key2。想法? – codehammer
使用StatsCounter也很棒。 – codehammer
然后,你必须解析这个其他的方式。将键值对提取到“Map”中应该没问题。 – zero323
您能否提供预期的输出类型和您迄今为止的尝试?这不是你想要的东西。 – zero323
@ zero323 - 好吧,我用期望的输出类型和我迄今为止的尝试更新了我的问题。如果我能够解决这个问题,我会发布答案。 – codehammer
谢谢。我对这一点进行了格式化。 – zero323