canal 源码解析(2)-数据流转篇(1)

一、组装数据


       上一篇 只是正常启动,但是线程是等待中,没有数据接入处理。现在开始模拟同步数据,并分析其中原理


AbstractEventParser类的start方法

canal 源码解析(2)-数据流转篇(1)


2、mysql主从复制重点就在这里了,当前因为 startposition里部位空对象,

EntryPosition[included=false,journalName=mysql-bin.000023,position=123,serverId=1,gtid=<null>,timestamp=1528939448000]

3、

// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();

改变一下状态:connect为true、

4、dum数据

// 4. 开始dump数据
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
if (isGTIDMode()) {
    erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
} else {
    if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
        erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
    } else {
        erosaConnection.dump(startPosition.getJournalName(),
                startPosition.getPosition(),
                sinkHandler);
    }
}

 4.1 dump详细内容

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();
    sendRegisterSlave();
    sendBinlogDump(binlogfilename, binlogPosition);
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {
        LogEvent event = null;
        event = decoder.decode(fetcher, context);

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {
            break;
        }

        if (event.getSemival() == 1) {
            sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);
        }
    }
}

4.1.1  设置mysql的通用参数,心跳间隔时间,内存参数

private void updateSettings() throws IOException {
    try {
        update("set wait_timeout=9999999");
    } catch (Exception e) {
        logger.warn("update wait_timeout failed", e);
    }
    try {
        update("set net_write_timeout=1800");
    } catch (Exception e) {
        logger.warn("update net_write_timeout failed", e);
    }

 构造器初始化第一个参数
public QueryCommandPacket(){
    setCommand((byte) 0x03);
}
public byte[] toBytes() throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(getCommand());
    out.write(getQueryString().getBytes("UTF-8"));// 链接建立时默认指定编码为UTF-8
    return out.toByteArray();
}


二、发送数据

canal 源码解析(2)-数据流转篇(1)


三、接受数据

mysql协议规定了,通用接受数据的每个类型;参考链接:

okpacket: https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html

erropacket https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html

canal 源码解析(2)-数据流转篇(1)

根据mysql协议okpacket的要求,则判断出该命令执行是否成功

canal 源码解析(2)-数据流转篇(1)

组装okpacket,参考上面协议规定字段

canal 源码解析(2)-数据流转篇(1)

canal 源码解析(2)-数据流转篇(1)

然后解析转化

public void fromBytes(byte[] data) throws IOException {
    int index = 0;
    // 1. read field count
    this.fieldCount = data[0];
    index++;
    // 2. read affected rows
    this.affectedRows = ByteHelper.readBinaryCodedLengthBytes(data, index);
    index += this.affectedRows.length;
    // 3. read insert id
    this.insertId = ByteHelper.readBinaryCodedLengthBytes(data, index);
    index += this.insertId.length;
    // 4. read server status
    this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index);
    index += 2;
    // 5. read warning count
    this.warningCount = ByteHelper.readUnsignedShortLittleEndian(data, index);
    index += 2;
    // 6. read message.
    this.message = new String(ByteHelper.readFixedLengthBytes(data, index, data.length - index));
    // end read
}
OKPacket [affectedRows=[0], fieldCount=0, insertId=[0], message=, serverStatus=2, warningCount=1]



4.1.2 向mysql发送slave指令

参考mysql event type。https://dev.mysql.com/doc/internals/en/event-meanings.html

private void sendRegisterSlave() throws IOException {
    RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
    cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress();
    cmd.reportPasswd = authInfo.getPassword();
    cmd.reportUser = authInfo.getUsername();
    cmd.reportPort = authInfo.getAddress().getPort(); // 暂时先用master节点的port
    cmd.serverId = this.slaveId;
    byte[] cmdBody = cmd.toBytes();

    logger.info("Register slave {}", cmd);

    HeaderPacket header = new HeaderPacket();
    header.setPacketBodyLength(cmdBody.length);
    header.setPacketSequenceNumber((byte) 0x00);
    PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

    header = PacketManager.readHeader(connector.getChannel(), 4);
    byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());
    assert body != null;
    if (body[0] < 0) {
        if (body[0] == -1) {
            ErrorPacket err = new ErrorPacket();
            err.fromBytes(body);
            throw new IOException("Error When doing Register slave:" + err.toString());
        } else {
            throw new IOException("unpexpected packet with field_count=" + body[0]);
        }
    }
}

byte[] cmdBody = cmd.toBytes();

public byte[] toBytes() throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(getCommand());
    ByteHelper.writeUnsignedIntLittleEndian(serverId, out);
    out.write((byte) reportHost.getBytes().length);
    ByteHelper.writeFixedLengthBytesFromStart(reportHost.getBytes(), reportHost.getBytes().length, out);
    out.write((byte) reportUser.getBytes().length);
    ByteHelper.writeFixedLengthBytesFromStart(reportUser.getBytes(), reportUser.getBytes().length, out);
    out.write((byte) reportPasswd.getBytes().length);
    ByteHelper.writeFixedLengthBytesFromStart(reportPasswd.getBytes(), reportPasswd.getBytes().length, out);
    ByteHelper.writeUnsignedShortLittleEndian(reportPort, out);
    ByteHelper.writeUnsignedIntLittleEndian(0, out);// Fake
                                                    // rpl_recovery_rank
    ByteHelper.writeUnsignedIntLittleEndian(0, out);// master id
    return out.toByteArray();
}

RegisterSlaveCommandPacket对象转为byte数组

注意 :平常在使用中,把一个对象转为字节,调用的是对象流,且对象必须实现序列化接口,也就是 先把对象转为流,然后转为byte数组。然后遍历。

但是这里使用的编解码是自定义的编码规则进行编码解码。

canal 源码解析(2)-数据流转篇(1)

命令:用于标识当前请求消息的类型,例如切换数据库(0x02)、查询命令(0x03)等。命令值的取值范围及说明如下表(参考MySQL源代码/include/mysql_com.h头文件中的定义):

类型值 命令 功能 关联函数
0x00 COM_SLEEP (内部线程状态) (无)
0x01 COM_QUIT 关闭连接 mysql_close
0x02 COM_INIT_DB 切换数据库 mysql_select_db
0x03 COM_QUERY SQL查询请求 mysql_real_query
0x04 COM_FIELD_LIST 获取数据表字段信息 mysql_list_fields
0x05 COM_CREATE_DB 创建数据库 mysql_create_db
0x06 COM_DROP_DB 删除数据库 mysql_drop_db
0x07 COM_REFRESH 清除缓存 mysql_refresh
0x08 COM_SHUTDOWN 停止服务器 mysql_shutdown
0x09 COM_STATISTICS 获取服务器统计信息 mysql_stat
0x0A COM_PROCESS_INFO 获取当前连接的列表 mysql_list_processes
0x0B COM_CONNECT (内部线程状态) (无)
0x0C COM_PROCESS_KILL 中断某个连接 mysql_kill
0x0D COM_DEBUG 保存服务器调试信息 mysql_dump_debug_info
0x0E COM_PING 测试连通性 mysql_ping
0x0F COM_TIME (内部线程状态) (无)
0x10 COM_DELAYED_INSERT (内部线程状态) (无)
0x11 COM_CHANGE_USER 重新登陆(不断连接) mysql_change_user
0x12 COM_BINLOG_DUMP 获取二进制日志信息 (无)
0x13 COM_TABLE_DUMP 获取数据表结构信息 (无)
0x14 COM_CONNECT_OUT (内部线程状态) (无)
0x15 COM_REGISTER_SLAVE 从服务器向主服务器进行注册 (无)
0x16 COM_STMT_PREPARE 预处理SQL语句 mysql_stmt_prepare
0x17 COM_STMT_EXECUTE 执行预处理语句 mysql_stmt_execute
0x18 COM_STMT_SEND_LONG_DATA 发送BLOB类型的数据 mysql_stmt_send_long_data
0x19 COM_STMT_CLOSE 销毁预处理语句 mysql_stmt_close
0x1A COM_STMT_RESET 清除预处理语句参数缓存 mysql_stmt_reset
0x1B COM_SET_OPTION 设置语句选项 mysql_set_server_option
0x1C COM_STMT_FETCH 获取预处理语句的执行结果 mysql_stmt_fetch

参数:内容是用户在MySQL客户端输入的命令(不包括每行命令结尾的";"分号)。另外这个字段的字符串不是以NULL字符结尾,而是通过消息头中的长度值计算而来。


1)向mysql发送数据包:(日常的所有操作都是想mysql发送指令,然后ack,现在就以发送slave指令来解析)

canal 源码解析(2)-数据流转篇(1)

canal 源码解析(2)-数据流转篇(1)

每次向server发送报文,序号都从0开始,且header里包含第一个字段为消息体的长度

组装header

canal 源码解析(2)-数据流转篇(1)


组装header(头部4字节,第一位消息体长度,第四位位***)

public byte[] toBytes() {
    byte[] data = new byte[4];
    data[0] = (byte) (packetBodyLength & 0xFF);
    data[1] = (byte) (packetBodyLength >>> 8);
    data[2] = (byte) (packetBodyLength >>> 16);
    data[3] = getPacketSequenceNumber();
    return data;
}


组装为整个报文,

HeaderPacket header = new HeaderPacket();
header.setPacketBodyLength(cmdBody.length);
header.setPacketSequenceNumber((byte) 0x00);
PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

bio为如下
public void write(byte[]... buf) throws IOException {
    OutputStream output = this.output;
    if (output != null) {
        for (byte[] bs : buf) {
            output.write(bs);
        }
    } else {
        throw new SocketException("Socket already closed.");
    }
}


nio为

public void write(byte[]... buf) throws IOException {
    if (channel != null && channel.isWritable()) {
        channel.writeAndFlush(Unpooled.copiedBuffer(buf));
    } else {
        throw new IOException("write failed ! please checking !");
    }
}

下一篇讲如何接收msyql发过来的数据。