如何从Kafka读取XML格式的流数据?
我正在尝试使用Spark结构化流从卡夫卡主题读取XML数据。如何从Kafka读取XML格式的流数据?
我试过使用Databricks spark-xml
包,但是我收到一个错误消息,说这个包不支持流式阅读。有什么方法可以使用结构化流从Kafka主题中提取XML数据?
我当前的代码:
df = spark \
.readStream \
.format("kafka") \
.format('com.databricks.spark.xml') \
.options(rowTag="MainElement")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option(subscribeType, "test") \
.load()
错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
.format("kafka") \ .format('com.databricks.spark.xml') \
最后一个与com.databricks.spark.xml
胜变为(隐藏卡夫卡作为源)的数据流源。
换句话说,以上相当于.format('com.databricks.spark.xml')
单独。
正如您可能已经体验的那样,Databricks spark-xml
包不支持流式读取(即不能用作流式源)。该软件包不适用于流媒体。
Is there any way I can extract XML data from Kafka topic using structured streaming?
您只需使用标准函数或UDF自行访问和处理XML。在结构化流式传输到Spark 2.2.0中没有内置的支持流式处理XML。
无论如何这应该不是什么大问题。一个Scala代码可能如下所示。
val input = spark.
readStream.
format("kafka").
...
load
val values = input.select('value cast "string")
val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
// print XMLs and numbers to the stdout
val q = numbersFromXML.
writeStream.
format("console").
start
另一种可能的解决办法是写自己的自定义流Source将应对def getBatch(start: Option[Offset], end: Offset): DataFrame
的XML格式。那是应该工作。
不能混合格式这种方式。卡夫卡源加载为包括值的数目,如key
,value
和topic
,与value
列存储payload as a binary
type:
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
...
value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
解析该内容是用户的责任,并且不能被委派给其他数据源。例如,请参阅我对How to read records in JSON format from Kafka using Structured Streaming?的回答。
对于XML,您可能需要一个UDF(UserDefinedFunction
),但您可以先尝试Hive XPath functions。您还应该解码二进制数据。
import xml.etree.ElementTree as ET
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option(subscribeType, "test") \
.load()
然后我写了一个Python UDF
def parse(s):
xml = ET.fromstring(s)
ns = {'real_person': 'http://people.example.com',
'role': 'http://characters.example.com'}
actor_el = xml.find("DNmS:actor",ns)
if(actor_el):
actor = actor_el.text
role_el.find('real_person:role', ns)
if(role_el):
role = role_el.text
return actor+"|"+role
注册此UDF
extractValuesFromXML = udf(parse)
XML_DF= df .withColumn("mergedCol",extractroot("value"))
AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\\|").getItem(0))\
.withColumn("Role", split(col("mergedCol"), "\\|").getItem(1))
谢谢,亚采。我写了UDF来解析XML数据。它正在工作。我将很快发布该UDF。 –