使用spark中的其他csv文件更新csv文件

问题描述:

如果table1的主键不存在于table2中,我必须使用table2更新table1,并将table2的整行附加到table1。如果table1的主键存在于table2中那么table1的所有列都将被更新,除了table1的c3和table2的值之外。使用spark中的其他csv文件更新csv文件

表1

c1  c2  c3 
... .... ... 
k1  a  e 
k2  b  f 
k3  c  g 
k4  d  h 

表2

c1  c2  c3 
... .... ... 
k1  i  k 
k5  j  l 

所需输出

c1 c2  c3 
... ... ... 
k1  i  e 
k2  b  f 
k3  c  g 
k4  d  h 
k5  j  l 

我尝试下面的代码

import org.apache.spark.sql.{Row, SQLContext} 
import org.apache.spark.{SparkConf, SparkContext} 

object Update { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("tabUP").setMaster("local[2]") 
    val sc = new SparkContext(sparkConf) 
    val sqlContext = new SQLContext(sc) 

    val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("f1.csv") 
    val df2= sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("f2.csv") 
    df1.registerTempTable("tab1") 
    df2.registerTempTable("tab2") 
    val df3=sqlContext.sql("UPDATE tab1,tab2 SET tab1.val2 = tab2.val1,tab1.val3 = tab2.val3 WHERE tab1.val1 = tab2.val1").show() 


    } 
} 

由于数据帧是不可变的,我不能做临时表的更新是有什么办法,我可以实现它

+0

为什么K1的结果得到的具有i和e输出?是不是应该是一个和E? –

+0

table2已经更新了数据,因此table1将被更新为table2中的数据。只有c2栏更新不是c3 – sri

+0

我在下面回答了请检查 –

您可以使用outer join表与c1列和复制的table2值来table1如下。 重命名表2的列c2c3,因为它们与table1具有相同的名称。

val tempTable2 = table2.select('c1, 'c2.as("c22"), 'c3.as("c23")) 

import org.apache.spark.sql.functions._ 
table1.join(tempTable2, Seq("c1"), "outer") 
    .withColumn("c2", when('c22.isNotNull, 'c22).otherwise('c2)) 
    .withColumn("c3", when('c3.isNull, 'c23).otherwise('c3)) 
    .drop("c22", "c23") 

您应该尽可能拥有

+---+---+---+ 
|c1 |c2 |c3 | 
+---+---+---+ 
|k2 |b |f | 
|k4 |d |h | 
|k5 |j |l | 
|k1 |i |e | 
|k3 |c |g | 
+---+---+---+