如何在set_identity关闭时将Spark数据框推送到Sql Server表中?

问题描述:

所以,我有一个在Id列的Sql Server表,它是一个标识列。我面临的问题是,当我尝试将数据框推入其中时,它抱怨identity_insert设置为“关闭”。现在,我明确地使用jdbc将其设置为'on',但由于这是sql服务器端的会话变量,因此在dataframe push命令被命中时它会变回'off',因为两者都是针对sql的不同会话服务器。如何在set_identity关闭时将Spark数据框推送到Sql Server表中?

如果'开启'并在同一会话中推送数据帧,是否有方法可用?

一些代码 - SQL Server表

create table dbo.testtable 
(
[Id] int identity, 
[Name] varchar(100), 
[Address] varchar(100), 
[ExtraColumn] int, 
[Age] int 
) 

我的数据帧 -

case class TestClass(Id: Int, Name: String, Address: String, ExtraColumn: 
Int, Age: Int) 

val seqClass = Seq(TestClass(1, "kv", "riata", 2, 30), 
       TestClass(2, "xyz", "xyz's place", 2, 31), 
       TestClass(3, "abc", "abc's place", 2, 32)) 

val sparkSession = createSparkSession //creating through some method 
val df = sparkSession.sqlContext.createDataFrame(seqClass) 
JDBCUtils.setIdentityInsertOn(conn, JDBC.SQL_SERVER.TYPE, 
"testdb1.dbo.testtable", None) //my method to turn on identity_insert 

//code to push data frame to sql server 
df.coalesce(1).write.mode("append").jdbc(jdbcUrl,"testdb1.dbo.testtable", 
getConnectionProperties(username,password, dbType)) 

//getConnectionProperties is my own method that provides connection 
//properties for jdbc. 

注意的是,以上所有工作正常,如果我删除从数据帧ID列。所以代码总体上起作用,只是我需要能够在数据框上保留Id并将其推送到测试表。为什么我不能简单地使用测试表中的身份代码?由于上述代码是非常复杂的工作流程的一部分,因此我需要在数据框中生成类似上述的Id列。

任何帮助表示赞赏!

感谢

能够深入到Vaibhav的下线谁已经实施的解决方案后,解决此问题。我在此发布相同的内容供其他人在将来使用。

在低于与位置 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

更新请SaveTable()和从JDBCUtils.java依赖函数的本地副本的SavePartition函数插入下面的代码行 - 行640之前

if(Identity_Insert_Off) { 
    val sql = "set IDENTITY_INSERT " + table + " ON"; 
    val statement = conn.createStatement() 
    statement.execute(sql) 
} 

while循环。

while (iterator.hasNext) {...} 

更新如果基于场景的条件(我只使用SqlServer的所以只检查标识插入标志,它被传递给函数的代码)

下面查询可以用来检查身份开启或关闭某一表 -

SELECT OBJECTPROPERTY(OBJECT_ID('<TableName>'), 'TableHasIdentity');