异常在线程“主”org.apache.spark.sql.AnalysisException:
问题描述:
我想我的手在kafka火花结构化流,但得到一些异常,如异常在线程“主”“org.apache.spark.sql.AnalysisException :无法解析'device
'给定的输入列:[value,offset,partition,key,timestamp,timestampType,topic];异常在线程“主”org.apache.spark.sql.AnalysisException:
附上我的代码
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
case class DeviceData(device: String, deviceType: String, signal: String)
object dataset_kafka {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
println(df.isStreaming)
println(df.printSchema())
val ds: Dataset[DeviceData] = df.as[DeviceData]
val values = df.select("device").where("signal == Strong")
values.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
任何帮助如何解决这个问题?
答
卡夫卡流总是产生以下字段:value
,offset
,partition
,key
,timestamp
,timestampType
,topic
。在你的情况下,你对value
感兴趣,但请注意values are always deserialized as byte arrays,因此,在反序列化JSON之前需要强制转换为字符串。
试试下面的代码:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import spark.implicits._
val kafkaStream =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
// If you don't want to build the schema manually
val schema = ExpressionEncoder[DeviceData]().schema
val ds = kafkaStream.select(from_json($"value".cast("string"), schema)).as[DeviceData]
val values = ds.filter(_.signal == "Strong").map(_.device)