MR读Hbase数据,写入到Mysql(HBase->Mysql)
首先看一下Hbase的数据 ,本系统将Hbase放入mysql
首先看一下hbase表结构
需求:根据用户在hbase的通话记录,求出每个用户每个月总共通话时间,放入mysql中
第一步、建立mapper端
package phoneXM;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class PhoneMapper extends TableMapper<Text, Text> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据提取出来放入put中
Put put = new Put(key.get());
// Get get = new Get();
//遍历行
String rowkey = new String(key.get());
String name = "";
String phone = "";
String name2 = "";
String phone2 = "";
String time = "";
String sum = "";
for (Cell cell :
value.rawCells()) {
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
/// 添 加 克 隆 列 :name
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//将该列 cell 加入到 put 对象中
name = Bytes.toString(CellUtil.cloneValue(cell));
} else if ("phone".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列 cell 加入到 put 对象中
phone = Bytes.toString(CellUtil.cloneValue(cell));
}else if ("nameTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列 cell 加入到 put 对象中
name2 = Bytes.toString(CellUtil.cloneValue(cell));
}else if ("phoneTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列 cell 加入到 put 对象中
phone2 = Bytes.toString(CellUtil.cloneValue(cell));
}else if ("time".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列 cell 加入到 put 对象中
time = Bytes.toString(CellUtil.cloneValue(cell));
}else if ("sum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列 cell 加入到 put 对象中
sum = Bytes.toString(CellUtil.cloneValue(cell));
}
}
}
//将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出
String info = name+"-"+name2+"-"+phone+"-"+phone2+"-"+sum;
System.out.println(rowkey);
System.out.println(info);
// 01_手机号_yyyyMMddhhmmss_1
String[] split = rowkey.split("_");
// 截取电话号码
String phoneNum = split[1];
// 拼接key
String dataCallKe = phoneNum+"_"+split[2].substring(0,6);
// 拼接value
String keys = phoneNum+dataCallKe;
//输出到文件
context.write(new Text(keys), new Text(info));
}
}
第二步、建立Reduce端代码
package phoneXM;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PhoneReducer extends Reducer<Text, Text, UserInfo,NullWritable> {
private UserInfo userInfo = new UserInfo();
// private UserInfoDBWritable userInfoDBWritable = null;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//获取手机号
// String phone = key.toString().split("_")[1];
//拼装信息
Integer longTime = 0;
for(Text text:values){
String time = text.toString().split("-")[4];
longTime += Integer.parseInt(time);
}
Text tt = new Text(longTime+"");
System.out.println(key.toString());
String phone = key.toString().split("_")[0];
String month = key.toString().split("_")[1];
// id,
userInfo.setPhone(phone);
// account
userInfo.setMonth(month);
// name
userInfo.setSumTime(longTime+"");
// 写入到db,放在key
// userInfoDBWritable = new UserInfoDBWritable(userInfo);
context.write(userInfo , null);
//context.write(key,tt);
}
}
第三步、Driver端代码
package phoneXM;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import phoneXM.PhoneMapper;
import phoneXM.PhoneReducer;
import java.io.FileOutputStream;
import java.io.IOException;
//将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。
public class Driver extends Configured implements Tool {
public static void main(String[] args) throws Exception{
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","es1,es2,es3");
configuration.set("hbase.zookeeper.property.clientport","2181");
int re = ToolRunner.run(configuration,new Driver(),args);
System.exit(re);
}
public int run(String[] args) throws Exception {
// 得到Conf
Configuration configuration = this.getConf();
//数据库配置
DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.244.162:3306/phone","root", "123456");
Job job = Job.getInstance(configuration, "db info1");
// 创建job任务
// Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(Driver.class);
// 配置job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
// 设置Mapper
TableMapReduceUtil.initTableMapperJob(
"dianxin:phone", // 数据源的表名
scan, // scan扫描控制器
PhoneMapper.class, // 设置Mapper类
Text.class, // 设置Mapper输入key类型
Text.class, // 设置Mapper输出value值类型
job // 设置job
);
// 设置Reduce
/*TableMapReduceUtil.initTableReducerJob(
"hbase_mr", // 表名
Test_reduce.class, // 设置reduce
job
);*/
// 设置reduce数量,最少一个
job.setNumReduceTasks(1);
job.setReducerClass(PhoneReducer.class);
job.setOutputKeyClass(UserInfo.class);
job.setOutputValueClass(NullWritable.class);
//FileOutputFormat.setOutputPath(job, new Path("D:\\Demo\\hadoop\\ouput\\out1"));
DBOutputFormat.setOutput(job, "info1", "phone", "month", "sumTime");
job.setOutputFormatClass(DBOutputFormat.class);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
}
工具类、UserInfo
package phoneXM;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class UserInfo implements DBWritable {
//主要是把手机号,月份,通话总时间放入到mysql,所以把这3个封装一个类
private String phone;
private String month;
private String sumTime;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getMonth() {
return month;
}
public void setMonth(String month) {
this.month = month;
}
public String getSumTime() {
return sumTime;
}
public void setSumTime(String sumTime) {
this.sumTime = sumTime;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1,this.getPhone());
statement.setString(2,this.getMonth());
statement.setString(3,this.getSumTime());
}
public void readFields(ResultSet resultSet) throws SQLException {
}
}
测试
集群Hbase开启,运行代码,查看数据库表信息
到此已经完了,大家可以去测试一下