Apache Sqoop --- 数据导入导出工具

Apache Sqoop --- 数据导入导出工具

Apache Sqoop

概述

Apache Sqoop是在RDBMS和hadoop体系之间进行数据传输的工具

  • RDBMS体系

    • MySQL, Oracle, DB2 等
  • Hadoop体系

    • HDFS, Hive, Hbase等

底层通过MapReduce来实现, 针对输出输出组件进行了定制, 支持DB

sqoop也是一种ETL工具, 可以把数据从RDBMS经过抽取转换装载到Hive数仓中

导入导出方向

  • RDBMS --> Hadoop

    • 导入
  • Hadoop --> RDBMS

    • 导出

sqoop安装

安装前提

  • 已安装java和hadoop

上传安装包并解压

  • sqoop-1.4.6-cdh5.14.0.tar.gz

修改配置文件

  • conf/sqoop-env.sh

    • 导入hadoop环境变量

      • #Set path to where bin/hadoop is available
      • export HADOOP_COMMON_HOME=/export/servers/hadoop-2.6.0-cdh5.14.0
      • #Set path to where hadoop-*-core.jar is available
      • export HADOOP_MAPRED_HOME=/export/servers/hadoop-2.6.0-cdh5.14.0
    • 导入hive环境变量

      • #Set the path to where bin/hive is available
      • export HIVE_HOME=/export/servers/hive

添加需要的jar包到 lib路径下

  • RDBMS使用mysql, 需要导入mysql jdbc驱动

    • mysql-connector-java-5.1.32.jar
  • 要使用hive进行数据导入导出, 需要导入hive-exec执行类

    • hive-exec-1.1.0-cdh5.14.0.jar
  • 涉及到json格式, 需要导入json解析类

    • java-json.jar

验证是否安装成功

  • 列出所有的数据库

  • 命令

    • bin/sqoop list-databases \
    • –connect jdbc:mysql://localhost:3306/ \
    • –username root --password hadoop
  • 结果

Sqoop导入

方向

  • RDBMS --> HDFS

语法

  • sqoop import (generic-args) (import-args)

在mysql中准备三张表

  • emp 雇员表、 emp_add 雇员地址表、emp_conn 雇员联系表

全量导入 mysql 表数据到 HDFS

(mysql --> HDFS, 全量导入)

  • 导入命令

    • bin/sqoop import \
    • –connect jdbc:mysql://node-1:3306/userdb \
    • –username root \
    • –password hadoop \
    • –delete-target-dir \
    • –target-dir /sqoopresult \
    • –table emp
    • –m 1
  • 查看hdfs上的数据验证是否导出成功

    • hdfs dfs -cat /sqoopresult/part-m-00000
  • 注意

    • 分隔符

      • 导出数据字段之间默认使用逗号,作为分隔符,
        可通过参数 --fields-terminated-by ‘\t’ 指定分隔符
    • maptask个数

      • 默认只有一个maptask, 可通过-m 指定个数

      • 当指定的个数不为1时

        • 默认根据主键进行split切割

        • 若表没有主键

          • 通过 --split-by 指定数值类型的字段
          • 若无数值字段, 则只能使用 --m 1
      • 当指定根据某个字段 进行多个切片的处理 在sqoop底层会根据该字段进行边界查询 根据查询数据范围边界 结合参数指定m个数 对表的数据进行切片

        • BoundingValsQuery: SELECT MIN(id), MAX(id) FROM emp
        • db.IntegerSplitter: Split size: 2; Num splits: 2 from: 1201 to: 1205

全量导入 mysql 表数据到 HIVE

(mysql --> hive, 全量导入)

  • 方式1:
    先复制表结构到 hive 中再导入数据

    • 将关系型数据的表结构复制到 hive 中

      • bin/sqoop create-hive-table \
      • –connect jdbc:mysql://node-1:3306/userdb \
      • –username root \
      • –password hadoop \
      • –table emp_add \
      • –hive-table test.emp_add_sp
    • 从关系数据库导入文件到 hive 中

      • bin/sqoop import \
      • –connect jdbc:mysql://node-1:3306/userdb \
      • –username root \
      • –password hadoop \
      • –table emp_add \
      • –hive-table test.emp_add_sp \
      • –hive-import \
      • –m 1
  • 方式2:
    直接复制表结构数据到 hive 中

    • bin/sqoop import \
    • –connect jdbc:mysql://node-1:3306/userdb \
    • –username root \
    • –password hadoop \
    • –table emp_conn \
    • –hive-database test \
    • –hive-import \
    • –m 1

导入表数据子集

  • 导入表全集的部分数据, 部分行 或 部分列

  • where 查询

    • bin/sqoop import \
    • –connect jdbc:mysql://node-1:3306/userdb \
    • –username root \
    • –password hadoop \
    • –where “city =‘sec-bad’” \
    • –target-dir /wherequery \
    • –table emp_add
    • –m 1
  • query 查询

    • bin/sqoop import \

    • –connect jdbc:mysql://node-1:3306/userdb \

    • –username root \

    • –password hadoop \

    • –target-dir /wherequery12 \

    • –query ‘select id,name,deg from emp WHERE id>1203 and $CONDITIONS’ \

    • –split-by id \

    • –fields-terminated-by ‘\t’ \

    • –m 2

    • 注意

      • 使用 query sql 语句来进行查找不能加参数 --table

      • 必须要添加 where 条件

        • where 条件后面必须带一个$CONDITIONS 这个字符串
      • sql 语句必须用单引号,不能用双引号

增量导入

  • 概述

    • 仅导入新增加到表中的数据
  • 核心参数

    • –check-column (col)

      • 用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似

      • 注意

        • 这些被指定的列的类型不能使任意字符类型,需要是唯一且能够判断大小的数据
    • –incremental (mode)

      • append

        • 追加模式
        • 对大于 last-value 指定的值之后的记录进行追加导入
      • lastmodified

        • 最后修改时间
        • 追加 last-value 指定的日期之后的记录
    • –last-value (value)

      • 最后一次值
      • 指定自从上次导入后列的最大值(大于该指定的值),也可以自己设定某一值
  • append 模式增量导入

    • bin/sqoop import \

    • –connect jdbc:mysql://node-1:3306/userdb \

    • –username root --password hadoop \

    • –table emp --m 1 \

    • –target-dir /appendresult \

    • –incremental append \

    • –check-column id \

    • –last-value 1205

    • 结果

  • lastmodified 模式增量导入

    • 建表

      • create table customertest(
        id int,
        name varchar(20),
        last_mod timestamp default current_timestamp on update current_timestamp
        );

      • 注意

        • default current_timestamp on update current_timestamp

          • 插入数据时, 不指定本字段, 默认用当前时间戳填充
          • 当进行数据修改后, 也修改为当前时间戳
    • 子模式

      • append 附加模式

        • bin/sqoop import \

        • –connect jdbc:mysql://node-1:3306/userdb \

        • –username root \

        • –password hadoop \

        • –table customertest \

        • –target-dir /lastmodifiedresult \

        • –check-column last_mod \

        • –incremental lastmodified \

        • –last-value “2020-06-28 16:24:25” \

        • –m 1 \

        • –append

        • 结果

          • 原文件
          • 追加文件
      • merge-key 合并模式

        • bin/sqoop import \

        • –connect jdbc:mysql://node-1:3306/userdb \

        • –username root \

        • –password hadoop \

        • –table customertest \

        • –target-dir /lastmodifiedresult \

        • –check-column last_mod \

        • –incremental lastmodified \

        • –last-value “2019-05-28 18:42:06” \

        • –m 1 \

        • –merge-key id

        • 结果

      • 注意

        • lastmodified 模式去处理增量时,会将大于等于 last-value 值的数据当做增量插入,
          因此使用append子模式, 可能会出现增量与原数据重复的情况
        • merge-key子模式经理了两个MapReduce过程, 第一个job判断有哪些新增数据, 第二个job将这些数据与原文件进行合并
        • 使用merge-key子模式处理增量时, 会针对指定的key,
          当没有该key时, 则会在原文件中新增数据, 表示新增了数据
          已有该key时, 会用新的数据覆盖原数据, 表示对数据进行了修改

Sqoop 导出

方向

  • HDFS --> RDBMS

语法

  • $ sqoop export (generic-args) (export-args)

export三种模式

  • 默认模式

    • 将文件中的数据使用 INSERT 语句插入到表中
  • 更新模式

    • Sqoop 将生成 UPDATE 替换数据库中现有记录的语句
  • 调用模式

    • Sqoop 将为每条记录创建一个存储过程调用

默认模式导出(全量导出)

(HDFS 数据到 mysql)

  • 默认情况下,sqoop export 将每行输入记录转换成一条 INSERT 语句,添加到目标数据库表中

    • 注意表的约束信息, 如主键约束, 唯一约束等
    • 如果INSERT 语句失败,导出过程将失败
  • 此模式主要用于将记录导出到可以接收这些结果的空表中
    通常用于全表数据导出

  • 导出命令

    • bin/sqoop export \
    • –connect jdbc:mysql://node-1:3306/userdb \
    • –username root \
    • –password hadoop \
    • –table employee \
    • –export-dir /emp/emp_data

更新导出(updateonly 模式)

  • 概述

    • 导出的时候只会对已有的数据进行更新操作
  • 命令

    • bin/sqoop export \
    • –connect jdbc:mysql://node-1:3306/userdb \
    • –username root \
    • –password hadoop \
    • –table updateonly \
    • –export-dir /updateonly_2/ \
    • –update-key id \
    • –update-mode updateonly

更新导出(allowinsert 模式)

  • 概述

    • 导出的时候 如果已有数据有更新进行更新,如果有新增数据就插入
  • 命令

    • bin/sqoop export \
    • –connect jdbc:mysql://node-1:3306/userdb \
    • –username root --password hadoop \
    • –table allowinsert \
    • –export-dir /allowinsert_2/ \
    • –update-key id \
    • –update-mode allowinsert

相关参数

  • –fields-terminated-by

    • 指定导出文件的分隔符
  • –input-fields-terminated-by

    • 指定导入文件的分隔符
  • –columns

    • 当导出文件的字段顺序, 与要导入表中的字段顺序不一致时, 可通过该参数来进行调整
  • –export-dir

    • 指定导出目录, HDFS体系的目录

    • 在执行导出的时候,必须指定这个参数,同时需要具备–table 或–call 参数两者之一

      • –table

        • 指定导入到哪个表
      • –call

        • 指定某个存储过程
  • –input-null-string

    • 若无该参数, 对于字符串类型的列来说,“NULL”这个字符串就会被翻译成空值
  • –input-null-non-string

    • 若无该参数, 无论是“NULL”字符串还是说空字符串也好,对于非字符串类型的字段来说,这两个类型的空串都会被翻译成空值
  • – update-key

    • 更新标识,即根据某个字段进行更新
    • sqoop要求该属性指定字段必须是主键,因为主键有唯一性,
      如果指定的字段不是主键 理解为增量导出是失效
    • 例如 id,可以指定多个更新标识的字段,多个字段之间用逗号分隔
  • – updatemod

    • 指定更新模式
    • updateonly
    • allowinsert

注意

  • 将数据从 Hadoop 生态体系导出到 RDBMS 数据库导出前,目标表必须存在于目标数据库中

  • hive数仓 导出到 mysql有两种方式

    • 方式1

      • 直接从hive中把数据导出到mysql

      • 直接执行 sqoop export

      • 优点

        • 直接导出到目的地, 简单直接
      • 缺点

        • 当导出文件的格式(如压缩), 字段顺序等存在差异时, 将无法达到预期效果
    • 方式2

      • 把数据从hive导出 hdfs某个路径下,然后再从hdfs导出到mysql中

      • 先通过 insert into directory + select 导出到 HDFS,
        在执行 sqoop export

        • 例如

          • 导出到hdfs

            • insert overwrite directory ‘/weblog/export/dw_pvs_referer_everyhour’
              row format delimited fields terminated by ‘\001’
              STORED AS textfile
              select referer_url,hour,pv_referer_cnt from dw_pvs_referer_everyhour where datestr = “20181101”;
          • 导出到mysql

            • bin/sqoop export \
            • –connect jdbc:mysql://node-1:3306/userdb \
            • –username root --password hadoop \
            • –table dw_pvs_referer_everyhour \
            • –fields-terminated-by ‘\001’ \
            • –columns referer_url,hour,pv_referer_cnt \
            • –export-dir /weblog/export/dw_pvs_referer_everyhour
      • 优点

        • 在将数据从hive 导出到 HDFS 上时, 可对数据的压缩格式进行调整
          在将数据从HDFS 导出到 数据库时, 可通过 column对字段的顺序进行调整
      • 缺点

        • 当数据的格式, 顺序, 类型等完全一致时, 没有直接导出简便
  • sqoop底层是由 MapReduce 实现, 因此当遇到报错时, 可到 YARN 上查看job的日志排查错误

  • sqoop导出 可编写成shell脚本, 定时执行

      1. 集中定义一些环境变量, 方便后续直接通过命令执行
      1. 定义一些可能发生改变的量(如:用户名, 密码, 路径, 日期等), 还可通过动态传参传入数据,
        增加脚本的通用性, 扩展性
      1. 具体的脚本内容, 执行具体业务逻辑