火花陷落柱值dataframes

问题描述:

我有2个DataFrames火花陷落柱值dataframes

case class UserTransactions(id: Long, transactionDate: java.sql.Date, currencyUsed: String, value: Long) 

ID, TransactionDate, CurrencyUsed, value 
1, 2016-01-05, USD, 100 
1, 2016-01-09, GBP, 150 
1, 2016-02-01, USD, 50 
1, 2016-02-10, JPN, 10 
2, 2016-01-10, EURO, 50 
2, 2016-01-10, GBP, 100 

case class ReportingTime(userId: Long, reportDate: java.sql.Date) 

userId, reportDate 
1, 2016-01-05 
1, 2016-01-31 
1, 2016-02-15 
2, 2016-01-10 
2, 2016-02-01 

现在我想通过userIdreportDatesum所有先前使用的货币组合,以获得摘要。结果应该如下所示:

userId, reportDate, trasactionSummary 
1, 2016-01-05, None 
1, 2016-01-31, (USD -> 100)(GBP-> 150) // combined above 2 transactions less than 2016-01-31 
1, 2016-02-15, (USD -> 150)(GBP-> 150)(JPN->10) // combined transactions less than 2016-02-15 
2, 2016-01-10, None 
2, 2016-02-01, (EURO-> 50) (GBP-> 100) 

要做到这一点,最好的方法是什么?我们有超过3亿次交易,每个用户最多可以有10,000次交易。

+0

在你的榜样输出,你为什么显示'None'为'对应于reportDate's 'UserTransactions' DataFrame中的第一笔交易?你总是想“跳过”第一笔交易吗? – Metropolis

+0

在第一笔交易中,用户没有任何历史记录,因此汇总显示为无,从第二笔交易中汇总的结果如(USD-> 100)(GBP-> 150) – Rahul

以下代码片段可以达到您的要求。最初的加入和聚合是通过pyspark的Dataframe API完成的。然后,数据分组(使用reduceByKey)和最终的数据集准备通过RDD api完成,因为它更适合这种操作。

from datetime import datetime 
from pyspark.sql.functions import udf 
from pyspark.sql.types import DateType 
from pyspark.sql import functions as F 

df1 = spark.createDataFrame([(1,'2016-01-05','USD',100), 
(1,'2016-01-09','GBP',150), 
(1,'2016-02-01','USD',50), 
(1,'2016-02-10','JPN',10), 
(2,'2016-01-10','EURO',50), 
(2,'2016-01-10','GBP',100)],['id', 'tdate', 'currency', 'value']) 

df2 = spark.createDataFrame([(1,'2016-01-05'), 
(1,'2016-01-31'), 
(1,'2016-02-15'), 
(2,'2016-01-10'), 
(2,'2016-02-01')],['user_id', 'report_date']) 


func = udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType()) ### function to convert string data type to date data type 

df2 = df2.withColumn('tdate', func(df2.report_date)) 
df1 = df1.withColumn('tdate', func(df1.tdate)) 
result = df2.join(df1, (df1.id == df2.user_id) & (df1.tdate < df2.report_date), 'left_outer').select('user_id', 'report_date', 'currency', 'value').groupBy('user_id', 'report_date', 'currency').agg(F.sum('value').alias('value')) 

data = result.rdd.map(lambda x: (x.user_id,x.report_date,x.currency,x.value)).keyBy(lambda x: (x[0],x[1])).mapValues(lambda x: filter(lambda x: bool(x),[(x[2],x[3]) if x[2] else None])).reduceByKey(lambda x,y: x + y).map(lambda x: (x[0][0],x[0][1], x[1])) 

生成的最终结果如下所示。

>>> spark.createDataFrame([ (x[0],x[1],str(x[2])) for x in data.collect()], ['id', 'date', 'values']).orderBy('id', 'date').show(20, False) 
+---+----------+--------------------------------------------+ 
|id |date  |values          | 
+---+----------+--------------------------------------------+ 
|1 |2016-01-05|[]           | 
|1 |2016-01-31|[(u'USD', 100), (u'GBP', 150)]    | 
|1 |2016-02-15|[(u'USD', 150), (u'GBP', 150), (u'JPN', 10)]| 
|2 |2016-01-10|[]           | 
|2 |2016-02-01|[(u'EURO', 50), (u'GBP', 100)]    | 
+---+----------+--------------------------------------------+ 
+0

伟大的工作 | 1 | 2016-01- 05 |地图()| | 1 | 2016-01-31 |地图(USD - > 100,GBP - > 150)| | 1 | 2016-02-15 |地图(USD - > 150,GBP - > 150,JPN - > 10)| | 2 | 2016-01-10 | Map()| | 2 | 2016-02-01 |地图(EURO - > 50,GBP - > 100)| + --- + ---------- + --------------------------------- ----- + – Rahul

如果Scala的一些人需要

case class Transaction(id: String, date: java.sql.Date, currency:Option[String], value: Option[Long]) 
case class Report(id:String, date:java.sql.Date) 

def toDate(date: String): java.sql.Date = { 
    val sf = new SimpleDateFormat("yyyy-MM-dd") 
    new java.sql.Date(sf.parse(date).getTime) 
} 

val allTransactions = Seq(
    Transaction("1", toDate("2016-01-05"),Some("USD"),Some(100L)), 
    Transaction("1", toDate("2016-01-09"),Some("GBP"),Some(150L)), 
    Transaction("1",toDate("2016-02-01"),Some("USD"),Some(50L)), 
    Transaction("1",toDate("2016-02-10"),Some("JPN"),Some(10L)), 
    Transaction("2",toDate("2016-01-10"),Some("EURO"),Some(50L)), 
    Transaction("2",toDate("2016-01-10"),Some("GBP"),Some(100L)) 
) 
val allReports = Seq(
    Report("1",toDate("2016-01-05")), 
    Report("1",toDate("2016-01-31")), 
    Report("1",toDate("2016-02-15")), 
    Report("2",toDate("2016-01-10")), 
    Report("2",toDate("2016-02-01")) 
) 

val transections:Dataset[Transaction] = spark.createDataFrame(allTransactions).as[Transaction] 
val reports: Dataset[Report] = spark.createDataFrame(allReports).as[Report] 

val result = reports.alias("rp").join(transections.alias("tx"), (col("tx.id") === col("rp.id")) && (col("tx.date") < col("rp.date")), "left_outer") 
    .select("rp.id", "rp.date", "currency", "value") 
    .groupBy("rp.id", "rp.date", "currency").agg(sum("value")) 
    .toDF("id", "date", "currency", "value") 
    .as[Transaction] 

val data = result.rdd.keyBy(x => (x.id , x.date)) 
    .mapValues(x => if (x.currency.isDefined) collection.Map[String, Long](x.currency.get -> x.value.get) else collection.Map[String, Long]()) 
    .reduceByKey((x,y) => x ++ y).map(x => (x._1._1, x._1._2, x._2)) 
    .toDF("id", "date", "map") 
    .orderBy("id", "date") 

控制台输出

+---+----------+--------------------------------------+ 
|id |date  |map         | 
+---+----------+--------------------------------------+ 
|1 |2016-01-05|Map()         | 
|1 |2016-01-31|Map(GBP -> 150, USD -> 100)   | 
|1 |2016-02-15|Map(USD -> 150, GBP -> 150, JPN -> 10)| 
|2 |2016-01-10|Map()         | 
|2 |2016-02-01|Map(GBP -> 100, EURO -> 50)   | 
+---+----------+--------------------------------------+