ambari hdp3.0 spark环境运行出错
安装了hdp3.0 , 使用spark sql建表是报了这个错
Table default.src failed strict managed table checks due to the following reason: Table is marked as a managed table but is not transactional.
后面加了format("orc"), 报错没有了,新建的表无法使用报这个错
spark sql 读取新生成的orc表,报这个错
package com.hcf.streaming
import java.security.MessageDigest
import java.text.SimpleDateFormat
import com.google.gson.{JsonObject, JsonParser}
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import scala.collection.mutable
/**
* Created by hyt on 2018/9/10.
*/
object Id_mapping2 {
def main(args: Array[String]): Unit = {
//val args = Array("{'input_csv':'C:/Users/hyt/Desktop/national_tax.csv','csv_head':'nsrsbh, identity','table_name':'hyzs.id_mapping','mapping_colume':'identity=identity'}")
val args = Array("{'input_csv':'C:/Users/hyt/Desktop/jsyh_idmapping.txt','csv_head':'identity, client_no','table_name':'data_bridge.id_mapping','mapping_colume':'id=xxx'}")
val json = new JsonParser()
val obj = json.parse(args(0).replace(" ", "")).asInstanceOf[JsonObject]
val input_csv = obj.get("input_csv").getAsString
val table_name = obj.get("table_name").getAsString
val csv_head = obj.get("csv_head").getAsString.split(",")
val mapping_colume = obj.get("mapping_colume").getAsString.split(",")
val sparkSession = SparkSession.builder()
.appName("id_mapping2")
.master("local[*]")
.enableHiveSupport() //开启支持hive
.getOrCreate()
val dataFrame = sparkSession.sqlContext.sql("select * from " + table_name)
val field = dataFrame.schema.fields.map(b => b.name)
val all_colume = field.++(csv_head.filter(a => !mapping_colume.map(b => b.split("=")(1)).contains(a)))
// 生成匹配规则map
val mapp = mutable.Map.empty[String, String]
for (colume <- mapping_colume) {
val columes = colume.split("=")
mapp.+=((columes(1), columes(0)))
}
// 原始id_mapping表
val table_rdd = dataFrame.rdd.map(row => {
val set = mutable.Set.empty[String]
for (i <- 0 to (field.size - 1)) {
if (StringUtils.isNotBlank(row.getString(i))) {
set.add(field(i) + "__" + row.getString(i))
}
}
set
})
// 新增的csv文件
val csv_rdd = sparkSession.sparkContext.textFile(input_csv).map(line => {
val lines = line.split(",")
val set = mutable.Set.empty[String]
for (i <- 0 to (csv_head.size - 1)) {
if (StringUtils.isNotBlank(lines(i))) {
if (mapp.contains(csv_head(i)))
set.add(mapp(csv_head(i)) + "__" + lines(i))
else
set.add(csv_head(i) + "__" + lines(i))
}
}
set
})
// MR1
val rdd1: RDD[(String, (mutable.Set[String], mutable.Set[String], Int))] = table_rdd.++(csv_rdd)
.flatMap { set =>
set.map(t => (t, (set, 1)))
}.reduceByKey { (t1, t2) =>
t1._1 ++= t2._1
val added = t1._2 + t2._2
(t1._1, added)
}.map { t =>
(t._1, (t._2._1, mutable.Set.empty[String], t._2._2))
}
// MR2
val rdd2: RDD[(String, (mutable.Set[String], mutable.Set[String], Int))] = rdd1
.flatMap(flatIdSet).reduceByKey(tuple3Add)
// MR3
val rdd3: RDD[(String, (mutable.Set[String], mutable.Set[String], Int))] = rdd2
.flatMap(flatIdSet).reduceByKey(tuple3Add)
// filter
val rdd4 = rdd3.filter { t =>
t._2._2 += t._1
t._2._3 == 1 || (t._2._1 -- t._2._2).isEmpty
}.map(_._2._1).distinct()
// 重复数据处理
// 同一用户重复数据,映射为多行
val rdd5 = rdd4.flatMap(set => {
val line = mutable.ListBuffer.empty[mutable.Set[String]]
all_colume.foreach(a => {
line.+=(set.filter(b => b.startsWith(a)))
})
cartesion_product(line).map(list => Row.fromSeq(list.map(t => if (StringUtils.isBlank(t)) "" else t.substring(t.indexOf("__") + 2))))
})
//保存hive
val schema = StructType(all_colume.map(col_name => StructField(col_name, StringType, true)))
val result = sparkSession.createDataFrame(rdd5, schema)
result.write.mode(SaveMode.Overwrite).saveAsTable(table_name+"_1")
}
// flat id_set
def flatIdSet(row: (String, (mutable.Set[String], mutable.Set[String], Int))): Array[(String, (mutable.Set[String], mutable.Set[String], Int))] = {
row._2._3 match {
case 1 =>
Array((row._1, (row._2._1, row._2._2, row._2._3)))
case _ =>
row._2._2 += row._1 // add key to keySet
row._2._1.map(d => (d, (row._2._1, row._2._2, row._2._3))).toArray
}
}
def tuple3Add(t1: (mutable.Set[String], mutable.Set[String], Int),
t2: (mutable.Set[String], mutable.Set[String], Int)) = {
t1._1 ++= t2._1
t1._2 ++= t2._2
val added = t1._3 + t2._3
(t1._1, t1._2, added)
}
val digest = MessageDigest.getInstance("MD5")
val sdf = new SimpleDateFormat("yyyyMMddHHmmss")
def cartesion_product(arrs: mutable.ListBuffer[mutable.Set[String]]): List[mutable.ListBuffer[String]] = {
arrs.foldLeft(List[mutable.ListBuffer[String]]()) { (cumArr, addArr) => {
if (cumArr.isEmpty && addArr.isEmpty) List(mutable.ListBuffer[String](""))
else if (cumArr.isEmpty) addArr.map(t2 => mutable.ListBuffer(t2)).toList
else if (addArr.isEmpty) cumArr.map(list => list.:+(""))
else cumArr.flatMap { t1 => addArr.map(t2 => t1.:+(t2)) }
}
}.map(list => {
try{
list(0) = digest.digest(list(1).getBytes).map("%02x".format(_)).mkString
}catch{
case e: Exception => {
println(e)
println(list)
}
}
list
})
}
}
以上是是我整个操作过程的代码,麻烦各位大神有空的话帮忙看下