平等帧

问题描述:

我有以下情形:平等帧

我有仅含有1列2个dataframes比方说

DF1=(1,2,3,4,5) 
DF2=(3,6,7,8,9,10) 

基本上那些值是键并且如果我创建DF1的镶木文件DF1中的键不在DF2中(在当前的例子中它应该返回false)。我目前的方式达到我的要求是:

val df1count= DF1.count 
val df2count=DF2.count 
val diffDF=DF2.except(DF1) 
val diffCount=diffDF.count 
if(diffCount==(df2count-df1count)) true 
else false 

这种方法的问题是我打电话4次,这当然不是最好的方法。有人可以建议我实现这一目标的最佳方法吗?

您可以使用以下功能:

import org.apache.spark.sql.functions._ 

def diff(key: String, df1: DataFrame, df2: DataFrame): DataFrame = { 
    val fields = df1.schema.fields.map(_.name) 
    val diffColumnName = "Diff" 

    df1 
    .join(df2, df1(key) === df2(key), "full_outer") 
    .withColumn(
     diffColumnName, 
     when(df1(key).isNull, "New row in DataFrame 2") 
     .otherwise(
      when(df2(key).isNull, "New row in DataFrame 1") 
      .otherwise(
       concat_ws("", 
       fields.map(f => when(df1(f) =!= df2(f), s"$f ").otherwise("")):_* 
      ) 
      ) 
     ) 
    ) 
    .filter(col(diffColumnName) =!= "") 
    .select(
     fields.map(f => 
     when(df1(key).isNotNull, df1(f)).otherwise(df2(f)).alias(f) 
    ) :+ col(diffColumnName):_* 
    ) 
} 

你的情况,运行此:

diff("emp_id", df1, df2) 

import org.apache.spark.sql.{DataFrame, SparkSession} 
import org.apache.spark.sql.functions._ 

object DiffDataFrames extends App { 
    val session = SparkSession.builder().master("local").getOrCreate() 

    import session.implicits._ 

    val df1 = session.createDataset(Seq((1,"a",11),(2,"b",2),(3,"c",33),(5,"e",5))).toDF("n", "s", "i") 
    val df2 = session.createDataset(Seq((1,"a",11),(2,"bb",2),(3,"cc",34),(4,"d",4))).toDF("n", "s", "i") 

    def diff(key: String, df1: DataFrame, df2: DataFrame): DataFrame = 
    /* above definition */ 

    diff("n", df1, df2).show(false) 
} 
+0

您能否让我知道如何声明df1和df2。我已经声明如下 sqlContext = SQLContext(sc) df = sqlContext.sql(“select * from table1”) df2 = sqlContext.sql(“select * from table2”)then coped the above code as is .. ..获取语法错误.... IAM非常新的火花斯卡拉代码 –

+0

你能纠正我我做错了什么,当我尝试运行下面的代码时我得到一个错误:未找到:值df1,未找到df2 .. 进口org.apache.spark.sql {数据帧,sQLContext} 进口org.apache.spark.sql.functions._ VAL SC:SparkContext VAL sqlContext =新org.apache.spark.sql.SQLContext (sc) sqlContext = SQLContext(sc) df1 = sqlContext.sql(“select * from表1 “) DF2 = sqlContext.sql(” 从表2中选择* “) DIFF(” 租户”,DF1,DF2) DEF的diff(键:字符串,DF1:数据帧,DF2:数据帧):数据帧= { ......} ///提供有趣的代码 –

+0

嗨,我添加了一个简短的例子。 –

下面是获得罕见行的方式在两个数据帧之间:

val d1 = Seq((3, "Chennai", "rahman", "9848022330", 45000, "SanRamon"), (1, "Hyderabad", "ram", "9848022338", 50000, "SF"), (2, "Hyderabad", "robin", "9848022339", 40000, "LA"), (4, "sanjose", "romin", "9848022331", 45123, "SanRamon")) 
val d2 = Seq((3, "Chennai", "rahman", "9848022330", 45000, "SanRamon"), (1, "Hyderabad", "ram", "9848022338", 50000, "SF"), (2, "Hyderabad", "robin", "9848022339", 40000, "LA"), (4, "sanjose", "romin", "9848022331", 45123, "SanRamon"), (4, "sanjose", "romino", "9848022331", 45123, "SanRamon"), (5, "LA", "Test", "1234567890", 12345, "Testuser")) 

val df1 = d1.toDF("emp_id" ,"emp_city" ,"emp_name" ,"emp_phone" ,"emp_sal" ,"emp_site") 
val df2 = d2.toDF("emp_id" ,"emp_city" ,"emp_name" ,"emp_phone" ,"emp_sal" ,"emp_site") 

spark.sql("((select * from df1) union (select * from df2)) minus ((select * from df1) intersect (select * from df2))").show //spark is SparkSession