我司Spark迁移Hive数据到MongoDB生产案例代码--转载
本文章转自若泽大数据,如果您也想转载请注明出处,原文地址:
我司Spark迁移Hive数据到MongoDB生产案例代码
(若泽大数据:www.ruozedata.com ,系统化讲解大数据知识,专注于生产案例的讲解,讲师全部为一线在职牛人,定期学员分享生产案例,所有人一起进步)
Hive emp表数据如下
hive (soul)> select * from emp;
OK
emp.empno emp.ename emp.job emp.age emp.deptno
7369 SMITH CLERK 24 10
7499 ALLEN SALESMAN 30 20
7521 WARD SALESMAN 25 30
7654 MARTIN SALESMAN 23 10
7698 BLAKE MANAGER 29 40
pom.xml
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hive.version>1.1.0</hive.version>
</properties>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>3.4.0</version>
</dependency>
<!--SparkHive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--MySQL Driver-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
MongoDB 的updateSave(对已有数据进行更新) upsertSave(有就更新没有就插入)工具类
package com.soul.utils
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.{ReplaceOneModel, UpdateOneModel}
import com.mongodb.spark.MongoConnector
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.rdd.RDD
import org.bson.Document
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
/**
* @author 若泽数据 soulChun
* @create 2018-12-18-20:37
*/
object MongoUtils {
val DefaultMaxBatchSize = 100000
def updateSave[D: ClassTag](rdd: RDD[UpdateOneModel[Document]]): Unit = updateSave(rdd, WriteConfig(rdd.sparkContext))
def updateSave[D: ClassTag](rdd: RDD[UpdateOneModel[D]], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
rdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.bulkWrite(batch.toList.asJava))
})
})
}
def upsertSave[D: ClassTag](rdd: RDD[ReplaceOneModel[Document]]): Unit = upsertSave(rdd, WriteConfig(rdd.sparkContext))
def upsertSave[D: ClassTag](rdd: RDD[ReplaceOneModel[D]], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
rdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.bulkWrite(batch.toList.asJava))
})
})
}
}
一、将DF存入MongoDB
package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
* @author 若泽数据 soulChun
* @create 2018-12-15-16:17
*/
public class SparkHiveToMg {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
//如何你的密码中有@符号 请用%40代替
conf.set("spark.mongodb.output.uri", "mongodb://root:[email protected]/soul_db.emp");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(jsc);
Dataset<Row> df =hc.table("soul.emp");
//直接存DF到MongoDB
MongoSpark.save(df);
jsc.stop();
}
}
启动程序会自动在MongoDB建表emp(emp是在uri中指定的,可以自己修改),然后将数据插入,发现五条数据已存入MongoDB。
二、将RDD存入MongoDB
package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
* @author 若泽数据 soulChun
* @create 2018-12-15-16:17
*/
public class SparkHiveToMg {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
//如何你的密码中有@符号 请用%40代替
conf.set("spark.mongodb.output.uri", "mongodb://root:[email protected]/soul_db.emp");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(jsc);
Dataset<Row> df =hc.table("soul.emp");
JavaRDD<Row> rdd = df.toJavaRDD();
//insert
JavaRDD<Document> rddDoc= rdd.map(new Function<Row, Document>() {
public Document call(Row row) throws Exception {
Document doc = new Document();
doc.put("empno",row.get(0));
doc.put("ename",row.get(1));
doc.put("job",row.get(2));
doc.put("age",row.get(3));
doc.put("deptno",row.get(4));
return doc;
}
});
MongoSpark.save(rddDoc);
jsc.stop();
}
}
三、对已有数据进行更新
将MongoDB中第一个文档的age改成100
然后运行以下程序
package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
* @author 若泽数据 soulChun
* @create 2018-12-15-16:17
*/
public class SparkHiveToMg {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
//如何你的密码中有@符号 请用%40代替
conf.set("spark.mongodb.output.uri", "mongodb://root:[email protected]/soul_db.emp");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(jsc);
Dataset<Row> df =hc.table("soul.emp");
//直接存DF到MongoDB
// MongoSpark.save(df);
JavaRDD<Row> rdd = df.toJavaRDD();
//update
JavaRDD<UpdateOneModel<Document>> rddUpdate= rdd.map(new Function<Row, UpdateOneModel<Document>>() {
public UpdateOneModel<Document> call(Row row) throws Exception {
Document doc = new Document();
doc.put("empno",row.get(0));
doc.put("ename",row.get(1));
doc.put("job",row.get(2));
doc.put("age",row.get(3));
doc.put("deptno",row.get(4));
Document modifiers = new Document();
modifiers.put("$set",doc);
return new UpdateOneModel<Document>(Filters.eq("empno",doc.get("empno")),modifiers,new UpdateOptions().upsert(true));
}
});
MongoUtils.updateSave(rddUpdate.rdd(),rddUpdate.classTag());
jsc.stop();
}
}
运行完毕后查看MongoDB中还是五条数据,而且age已更新为原有的30
四、对已有数据进行更新而且没有的进行插
删除MongoDB中emp的4、5文档,而且将第一个文档的age改为200
然后运行以下程序
package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
* @author 若泽数据 soulChun
* @create 2018-12-15-16:17
*/
public class SparkHiveToMg {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
//如何你的密码中有@符号 请用%40代替
conf.set("spark.mongodb.output.uri", "mongodb://root:[email protected]/soul_db.emp");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(jsc);
Dataset<Row> df =hc.table("soul.emp");
JavaRDD<Row> rdd = df.toJavaRDD();
//upsert
JavaRDD<ReplaceOneModel<Document>> rddUpsert= rdd.map(new Function<Row, ReplaceOneModel<Document>>() {
public ReplaceOneModel<Document> call(Row row) throws Exception {
Document doc = new Document();
doc.put("empno",row.get(0));
doc.put("ename",row.get(1));
doc.put("job",row.get(2));
doc.put("age",row.get(3));
doc.put("deptno",row.get(4));
// Document modifiers = new Document();
// modifiers.put("$set",doc);
return new ReplaceOneModel<Document>(Filters.eq("empno",doc.get("empno")),doc,new UpdateOptions().upsert(true));
}
});
MongoUtils.upsertSave(rddUpsert.rdd(),rddUpsert.classTag());
jsc.stop();
}
}
会发现数据已恢复
如果对数据进行Update或者Upsert的时候记得将
Filters.eq("empno",doc.get("empno")
关联字段empno在MongoDB中设置成索引字段,可以提高性能。如果公司有调度平台(支持动态传参)可以将上面的内容改写成插件,支持任意Hive表的迁移。
MongoSpark的具体使用请参考:
https://docs.mongodb.com/?_ga=2.206688532.683104556.1545909674-1980198650.1544859775