Spark Streaming - 根据按键分组的键值对计算统计信息

问题描述:

背景: 我使用Spark Streaming从Kafka流式处理以逗号分隔键值对形式出现的事件 以下是事件流入我的火花应用程序。Spark Streaming - 根据按键分组的键值对计算统计信息

Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200 
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150 
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100 

输出

我想要计算不同的度量(平均,计数等)由不同的密钥流中分组的对于给定批次的时间间隔例如

  1. 通过密钥1,密钥2(RESPONSETIME是在每一个事件的关键之一)
  2. 计数通过密钥1,密钥2

我尝试到目前为止平均RESPONSETIME:

val stream = KafkaUtils 
    .createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

val pStream = stream.persist() 

val events: DStream[String] = pStream.flatMap(_._2.split(",")) 
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1))) 
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on. 

更新 - 03/04 钥匙Key1,Key2 ...可能在传入流中乱序出现。

欣赏您的输入/提示。

+0

您能否提供预期的输出类型和您迄今为止的尝试?这不是你想要的东西。 – zero323

+0

@ zero323 - 好吧,我用期望的输出类型和我迄今为止的尝试更新了我的问题。如果我能够解决这个问题,我会发布答案。 – codehammer

+1

谢谢。我对这一点进行了格式化。 – zero323

一个可行的办法是这样的:

  • 创建代表每个记录的情况下类,所以我们没有处理的元组:

    case class Record(
        key1: String, key2: String, key3: String, key4: String, rt: Double) 
    
  • 使用正则表达式解析记录,丢弃残缺的条目:

    import scala.util.matching.Regex 
    
    val recordPattern = new Regex(
        "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++ 
        "responseTime=(0-9+)$" 
    ) 
    
    val records = pStream.map { 
        case recordPattern(key1, key2, key3, key4, rt) => 
        Some(Record(key1, key2, key3, key4, rt.toDouble)) 
        case _ => None 
    }.flatMap(x => x) // Drop malformed 
    
  • 重塑数据键值对:

    val pairs = records.map(r => ((r.key1, r.key2), r.rt)) 
    
  • 创建一个分区,并使用StatCounter来汇总统计:

    import org.apache.spark.util.StatCounter 
    import org.apache.spark.HashPartitioner 
    
    val paritioner: HashPartitioner = ??? 
    
    pairs.combineByKey[StatCounter](
        StatCounter(_), _ merge _, _ merge _, paritioner 
    ) 
    
  • 的兴趣提取物领域:

    stats.mapValues(s => (s.count, s.mean)) 
    

您也可以尝试像这样的无序数据,虽然我强烈建议修复th在上游:

val kvPattern = "(\\w+)=(\\w+)".r 
val pairs = pStream.map(line => { 
    val kvs = kvPattern.findAllMatchIn(line) 
    .map(m => (m.group(1), m.group(2))).toMap 

    // This will discard any malformed lines 
    // (lack of key1, key2, lack or invalid format of responseTime) 
    Try((
    (kvs("Key1"), kvs("Key2")), 
    kvs("responseTime").toDouble 
)) 

}).flatMap(_.toOption) 

并且按照以前那样继续。

+0

谢谢+1。这可能工作。唯一的警告,我可能忘记在我的问题中提到的是key1,key2可能不一定每次都在相同的序列中,所以正则表达式不匹配。我正在考虑应用更通用的解决方案,因此需要通过关键值元组来查找配置的key1,key2。想法? – codehammer

+0

使用StatsCounter也很棒。 – codehammer

+1

然后,你必须解析这个其他的方式。将键值对提取到“Map”中应该没问题。 – zero323