Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

很多情况大数据集群需要获取业务数据,用于分析。通常有两种方式:

  • 业务直接或间接写入的方式
  • 业务的关系型数据库同步到大数据集群的方式

第一种可以是在业务中编写代码,将觉得需要发送的数据发送到消息队列,最终落地到大数据集群。

第二种则是通过数据同步的方式,将关系型数据同步到大数据集群,可以是存储在 hdfs 上,使用 hive 进行分析,或者是直接存储到 hbase 中。

其中数据同步又可以大致分为两种:增量同步、CRUD 同步。

增量同步是只将关系型数据库中新增的数据进行同步,对于修改、删除操作不进行同步,这种同步方式适用于那些一旦生成就不会变动的数据。
CRUD 同步则是数据的增、删、改都需要进行同步,保证两个库中的数据一致性。

本文不讲 binlog + Canal + 消息队列 + JAR 实现数据实时同步的方案,也不讲使用 Sqoop 进行离线同步。而是讲解如何使用 Streamsets 零代码完成整个实时同步流程。关于 Streamsets 具体是什么,以及能做哪些其他的事情,大家可以前往 Streamsets 官网进行了解。从笔者了解的信息,在数据同步方面 Streamsets 十分好用。

要实现 mysql 数据的实时同步,首先我们需要打开其 binlog 模式,具体怎么操作网上有很多教程,这里就不进行阐述了。

那么,现在就直接进入正题吧。

安装

下载

Streamsets 可以直接从官网下载: https://archives.streamsets.com

这里安装的是 Core Tarball 格式,当然你也可以直接选择下载 Full Tarball、Cloudera Parcel 或者其他格式。下载 Core Tarball 的好处是体积小,后期需要什么库的时候可以自行在 Streamsets Web 页进行下载。相对于 Core Tarball,Full Tarball 默认帮你下载了很多库,但是文件体积相对较大(>4G),并且可能很多库我们暂时使用不到。

Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

或者你可以直接使用这个链接进行下载:https://archives.streamsets.com/datacollector/3.7.1/tarball/streamsets-datacollector-core-3.7.1.tgz

解压启动

Streamsets Core Tarball 下载好后,直接解压就可以使用,非常方便。

 tar xvzf streamsets-datacollector-core-3.7.1.tgz
 cd streamsets-datacollector-3.7.1/bin/
 ./streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd

如果在运行的时候遇到上面的报错,修改操作系统的 open files 限制数量即可。

#vi /etc/security/limits.conf

添加两行内容:

  • soft nofile 65536
  • hard nofile 65536

运行 ‘ulimit -n’ 既可以看到 open files 设置值已生效。

Web 页

Streamsets 拥有一个 Web 页,默认端口是 18630。浏览器中输入 ip:18630 即可进入 streamsets 的页面,默认用户名、密码都是 admin。
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

Pipeline

准备工作

因为需要将 mysql 的数据实时同步到 hbase 中,但是下载的 Core Tarball 中没有 MySQL Binary Log 以及 hbase 两个 stage library,所以在 create new pipeline 之前需要先安装它们。

安装 MySQL Binary Log 库

Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

安装 Hbase 库,这里注意一下,hbase 库位于 CDH 中,所以选择一个 CDH 版本进行安装

Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

安装好后在 Installed Stage Libraries 中就能看到已经安装了 MySQL Binary Log 和 Hbase
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

创建 Pipeline

MySQL Binary Log

创建一个 MySQL Binary Log
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

设置 mysql 的连接参数(Hostname, Port 以及 Server ID),这里的 Server ID 与 mysql 配置文件(一般是 /etc/my.cnf)中的 server-id 保持一致
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

设置 mysql 的用户名、密码
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

其他设置:我们在 Include Tables 栏设置了两张表(表与表之间用逗号隔开),意思是监控这两张表的数据变化,其他表不关心。
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

Stream Selector

创建一个 Stream Selector,并将刚刚创建的 MySQL Binary Log 指向这个 Stream Selector。 设置过滤条件, 比如说 ${record:value("/Table")==‘cartype’} 就是过滤 cartype 表。

可以看到 Stream Selector 有两个出口(1 和 2),后面我们将会看到: 1 输出到 Hbase, 2 数据到 Trash
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

Hbase & Trash

分别创建 Hbase 和 Trash,连接到 Stream Selector 上
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

配置 Hbase
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

Trash 无需进行配置

验证 & 启动

验证

点击右上角的“眼镜”,验证整个流程是否有问题。

这里报错:“java.lang.RuntimeException:Unable to get driver instance for jdbcUrl”。这个报错的原因是缺少 mysql 连接的 jar 包。解决起来也很简单,下载一个 jar 包然后放到 streamsets 指定的目录下。我这边的完整目录是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar, mysql-connector-java-5.1.26-bin.jar 就是我下载的 jar 包。

还有一点就是事先要将 Hbase 中相对应的表创建好,不然验证会提示错误。
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

接着在页面上重启 streamsets 即可。

Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

重新验证,发现成功了。

Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

点击右上角播放标签,启动流程,这样整个流程就已经完成了(数据已经在进行实时同步),查看各个 Stage 既可以看到有多少数据流入,多少数据流出。也可以直接进入 hbase 数据库中查看是否有数据生成。
Mysql 到 Hbase 数据如何实时同步,强大的 Streamsets 告诉你

以上就是如何使用 Streamsets 实时同步 mysql 数据到 hbase 中的整个操作流程。大家肯定发现了,整个流程没有编写任何的代码,相对于 binlog + Canal + 消息队列 + JAR 的方案是不是高效一些呢。当然任何方案都会有优缺点,Streamsets 这种方案的更多实际体验还需要更多的观察。