使用spark中的其他csv文件更新csv文件
问题描述:
如果table1的主键不存在于table2中,我必须使用table2更新table1,并将table2的整行附加到table1。如果table1的主键存在于table2中那么table1的所有列都将被更新,除了table1的c3和table2的值之外。使用spark中的其他csv文件更新csv文件
表1
c1 c2 c3
... .... ...
k1 a e
k2 b f
k3 c g
k4 d h
表2
c1 c2 c3
... .... ...
k1 i k
k5 j l
所需输出
c1 c2 c3
... ... ...
k1 i e
k2 b f
k3 c g
k4 d h
k5 j l
我尝试下面的代码
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Update {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("tabUP").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("f1.csv")
val df2= sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("f2.csv")
df1.registerTempTable("tab1")
df2.registerTempTable("tab2")
val df3=sqlContext.sql("UPDATE tab1,tab2 SET tab1.val2 = tab2.val1,tab1.val3 = tab2.val3 WHERE tab1.val1 = tab2.val1").show()
}
}
由于数据帧是不可变的,我不能做临时表的更新是有什么办法,我可以实现它
答
您可以使用outer join
表与c1
列和复制的table2
值来table1
如下。 重命名表2的列c2
和c3
,因为它们与table1具有相同的名称。
val tempTable2 = table2.select('c1, 'c2.as("c22"), 'c3.as("c23"))
import org.apache.spark.sql.functions._
table1.join(tempTable2, Seq("c1"), "outer")
.withColumn("c2", when('c22.isNotNull, 'c22).otherwise('c2))
.withColumn("c3", when('c3.isNull, 'c23).otherwise('c3))
.drop("c22", "c23")
您应该尽可能拥有
+---+---+---+
|c1 |c2 |c3 |
+---+---+---+
|k2 |b |f |
|k4 |d |h |
|k5 |j |l |
|k1 |i |e |
|k3 |c |g |
+---+---+---+
为什么K1的结果得到的具有i和e输出?是不是应该是一个和E? –
table2已经更新了数据,因此table1将被更新为table2中的数据。只有c2栏更新不是c3 – sri
我在下面回答了请检查 –