将具有不同格式的文本文件映射到数据集
问题描述:
我有很多文本文件充满了不同种类的日志消息,但每个文件只能显示一种类型的消息。将具有不同格式的文本文件映射到数据集
File1中:(I); 2017年1月12日; 16:54:45;随机文本;其他文本
文件2: (I); 2017年1月13日15:34:56;同样文本;再一次//即空间 日期和时间
我已经得到了这个工作,但我想问这是否是“正确”的方式来做到这一点。另外我的方法只适用于分号和空格之间的变化总是出现在同一位置。
关于这个问题的任何意见,赞赏,因为我是新来的斯卡拉/火花。
//read file
val df = spark.read.textFile(file.path).filter(f => f.nonEmpty && f.length > 1 && f.startsWith("("))
//create empty dataset of type OutputMessage
var df3 = Seq.empty[OutputMessage].toDS()
//get number of semicolons within first line of the dataset to determine type
val message_type = df.take(1).mkString(",").count(_ == ';')
if(message_type == 5){
//split by semicolon and create dataset of type InputMessage
var df2 = df.map(x => x.split(";")).map(x => InputMessage(x(0), x(1), x(2), x(3), x(4), x(5)))
//map to dataset of type output message
df3 = df2.map(
x =>
OutputMessage(x.status,
x.messages_datestring,
x.messages_timestring,
x.device,
x.device_fullmessage,
x.device_message,
fileName,
getWeekday(x.messages_datestring),
(x.messages_datestring + "T" + x.messages_timestring),
data_company,
data_location,
data_systemname)
)
}
else if (message_type == 4){
var df2 = df.map(x => x.split(";")).map(x => InputMessage1(x(0), x(1), x(2), x(3), x(4)))
df3 = df2.map(
x=>
OutputMessage(x.status,
x.messages_datetimestring.split(" ").take(1).mkString(","),
x.messages_datetimestring.split(" ").takeRight(1).mkString(","),
x.device,
x.device_fullmessage,
x.device_message,
fileName,
getWeekday(x.messages_datetimestring.split(" ").take(1).mkString(",")),
x.messages_datetimestring.replace(' ', 'T'),
data_company,
data_location,
data_systemname)
)
}
//convert to rdd
val dsToRDD = df3_filtered.rdd
//laod to elasticsearch
dsToRDD.saveToEs("abdata/log")
编辑:我刚才看到有些文件之间有不一致的行。这意味着我的解决方案不再适用于
编辑:将其更改为基于行的执行。除了行内的随机分隔符之外,大部分工作都很有效。我得到这个案件的输出,但不是通缉犯。
object MapRawData{
def mapRawLine (line: String): Option[RawMessage] ={
var msgtype = 0;
val fields = line.split(";")
if (fields(0).length == 3 && fields(1).length == 10) msgtype = 1
if (fields(0).length == 3 && fields(1).length > 10) msgtype = 3
if (fields(0).length > 16) msgtype = 2
try {
fields.map(_.trim)
Some(
RawMessage(
status = fields(0).take(3),
messages_datestring = if(msgtype == 1) fields(1) else if(msgtype == 2) fields(0).drop(4).take(10) else fields(1).take(10),
messages_timestring = if(msgtype == 1) fields(2).take(8) else if (msgtype == 2) fields(0).drop(15).take(8) else (fields(1).drop(11).take(8)),
device = if(msgtype == 1) fields(3) else if (msgtype == 2) fields(1) else fields(2),
device_fullmessage = if(msgtype == 1) fields(4) else if (msgtype == 2) fields(2) else fields(3),
device_message = if(msgtype == 1) fields(5) else if (msgtype == 2) fields(3) else fields(4)
)
)
}
catch {
case e: Exception =>
println(s"Unable to parse line: $line")
None
}
}
}
这种变化方式比第一种方式更耗费时间/资源吗?
答
将其更改为基于行的执行。除了行内的随机分隔符之外,大部分工作都很有效。我得到这个案件的输出,但不是通缉犯。
object MapRawData{
def mapRawLine (line: String): Option[RawMessage] ={
var msgtype = 0;
val fields = line.split(";")
if (fields(0).length == 3 && fields(1).length == 10) msgtype = 1
if (fields(0).length == 3 && fields(1).length > 10) msgtype = 3
if (fields(0).length > 16) msgtype = 2
try {
fields.map(_.trim)
Some(
RawMessage(
status = fields(0).take(3),
messages_datestring = if(msgtype == 1) fields(1) else if(msgtype == 2) fields(0).drop(4).take(10) else fields(1).take(10),
messages_timestring = if(msgtype == 1) fields(2).take(8) else if (msgtype == 2) fields(0).drop(15).take(8) else (fields(1).drop(11).take(8)),
device = if(msgtype == 1) fields(3) else if (msgtype == 2) fields(1) else fields(2),
device_fullmessage = if(msgtype == 1) fields(4) else if (msgtype == 2) fields(2) else fields(3),
device_message = if(msgtype == 1) fields(5) else if (msgtype == 2) fields(3) else fields(4)
)
)
}
catch {
case e: Exception =>
println(s"Unable to parse line: $line")
None
}
}
}