博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MySql Binlog事件数据篇
阅读量:5768 次
发布时间:2019-06-18

本文共 11942 字,大约阅读时间需要 39 分钟。

hot3.png

系列文章

前言

前两篇文章和分别从Binlog入门和Binlog事件如何产生的两个角度来介绍Binlog,本文将从Binlog事件的数据来更深入的了解Binlog。

Binlog事件数据

1.QUERY_EVENT
执行更新语句时会生成此事件,包括:create,insert,update,delete;
Fixed data part,总长度13字节:
4字节:执行sql的线程id;
4字节:执行sql的时间;
1字节:数据库名称的长度;
2字节:执行sql产生的错误码;
2字节:状态变量的长度,具体内容在Variable part;

Variable part:

可变字节:状态变量,每个状态变量key为一个字节,后面跟着value,不同的key对应不同长度的value,但是总长度在Fixed data part中已经定义;
可变字节:数据库名称
可变字节:sql语句,通过事件的总长度-header长度-Fixed data-状态变量,剩余的字节数组通过utf-8编码即可获取;

2.STOP_EVENT

当mysqld停止时生成此事件;
Fixed data part:空的

Variable part:空的

3.ROTATE_EVENT

当mysqld切换到新的binlog文件生成此事件;
Fixed data part,总长度8字节:
8字节:下一个binlog文件的第一个事件的position,这个值一直是4,因为魔数占用了4字节;

Variable data part:

可变字节:下一个binlog的名称,它的长度=事件总长度-header长度-Fixed data

4.INTVAR_EVENT

当sql语句中使用了AUTO_INCREMENT的字段或者LAST_INSERT_ID()函数;
Fixed data part:空的

Variable part:

1字节:一个变量类型的值:LAST_INSERT_ID_EVENT = 1 或者 INSERT_ID_EVENT = 2;
8字节:LAST_INSERT_ID()函数调用,或者AUTO_INCREMENT字段生成的一个无符号的整型;

5.RAND_EVENT

| bin-log.000003 |  438 | RAND               |         1 |         473 | rand_seed1=223769196,rand_seed2=1013907192

执行包含RAND()函数的语句产生此事件,此事件没有被用在binlog_format为ROW模式的情况下;

Fixed data part:空的

Variable part:

8字节:第一个种子值(ex:rand_seed1=223769196)
8字节:第二个种子值(ex:rand_seed2=1013907192)

6.USER_VAR_EVENT

| bin-log.000003 |  711 | User var           |         1 |         756 | @`age`=50

执行包含了用户变量的语句产生此事件,此事件没有被用在binlog_format为ROW模式的情况下;

Fixed data part:空的

Variable part:

4字节:用户变量名的大小;
可变字节:用户变量名,具体长度上一个4字节的数据指定了;
1字节:如果是变量值是NULL,那么此值是非0的;如果是此值是0,那么才有接下来的其他数据;应该是对有空值情况的一种优化;
1字节:用户变量类型,包括:(STRING_RESULT=0, REAL_RESULT=1, INT_RESULT=2, ROW_RESULT=3, DECIMAL_RESULT=4);
4字节:用户变量字符的数量;
4字节:用户变量值的长度;
可变字节:变量的值,通过变量类型和变量值的长度,可以解析出具体的变量值;

7.FORMAT_DESCRIPTION_EVENT

| bin-log.000003 |    4 | Format_desc        |         1 |         107 | Server ver: 5.5.29-log, Binlog ver: 4

描述事件,被写在每个binlog文件的开始位置,用在MySQL5.0以后的版本中,代替了START_EVENT_V3;

Fixed data part:
2字节:binlog版本,Mysql5.0以及以上的版本值为:4
50字节:Mysql Server版本;
4字节:事件创建的时间戳;
1字节:header的长度,binlog版本为4的情况下header长度是19;
可变字节:从START_EVENT_V3开始到第27个Event,每个Event的fixed part lengths,每个事件一个字节,总共27个字节;

Variable part:空的

8.XID_EVENT

| bin-log.000003 |  315 | Xid                |         1 |         342 | COMMIT /* xid=32 */

事务提交产生的XID_EVENT事件;

Fixed data part:空的

Variable part:

8字节:事务编号;

9.BEGIN_LOAD_QUERY_EVENT

| bin-log.000003 |  964 | Begin_load_query   |         1 |        1008 | ;file_id=3;block_len=21

执行LOAD DATA INFILE 语句时产生此事件

Fixed data part:
4字节:加载Data File的ID,防止加载的Data File内容是相同的;

Variable part:

加载数据的第一个块,如果文件大小超过某个阀值,后面会有多个APPEND_BLOCK_EVENT事件,每一个包含一个数据块;可变字节长度 = 事件的总长度 – header长度 – Fixed data;因为测试数据量比较少(999, 101, ‘zhaohui’)总共就21个字节,所以一个块足够了;

10.EXECUTE_LOAD_QUERY_EVENT

| bin-log.000003 | 1008 | Execute_load_query |         1 |        1237 | use `test`; LOAD DATA INFILE 'D:/btest.sql' INTO TABLE `btest` FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '\\' LINES TERMINATED BY '\n' (`id`, `age`, `name`) ;file_id=3 |

执行LOAD DATA INFILE产生的事件,类似QUERY_EVENT事件,Fixed data的前13个字节和QUERY_EVENT类似;

Fixed data part:
4字节:执行sql的线程id;
4字节:执行sql的时间;
1字节:数据库名称的长度;
2字节:执行sql产生的错误码;
2字节:状态变量的长度,具体内容在Variable part;
4字节:加载Data File的ID;
4字节:文件名替换语句中的起始位置;
4字节:文件名替换语句中的结束位置;
1字节:如何处理重复数据,三个选项:LOAD_DUP_ERROR = 0, LOAD_DUP_IGNORE = 1, LOAD_DUP_REPLACE = 2

Variable part:

1.状态变量,每个状态变量key为一个字节,后面跟着value,不同的key对应不同长度的value,但是总长度在Fixed data part中已经定义;
2.sql语句,通过事件的总长度-header长度-Fixed data-状态变量,剩余的字节数组通过utf-8编码即可获取;

11.TABLE_MAP_EVENT

| bin-log.000004 |  844 | Table_map   |         1 |         892 | table_id: 33 (test.btest)

将表的定义映射到一个数字,在行操作事件之前记录(包括:WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT,DELETE_ROWS_EVENT);

Fixed data part:
6字节:表Id;
2字节:保留字段为将来使用;

Variable part:

1字节:数据库名字的长度;
可变字节:数据库名字,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取;
1字节:表名的长度;
可变字节:表名,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取;
Packed integer:用来记录表中字段的数量;
注:Packed integer是一个可变字节的类型,根据数据大小字节大小不一样,
更多详细:
可变字节:表字段类型数组,每个字段一个字节;
Packed integer:用来记录表元数据的长度;
可变字节:元数据块,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取;
可变字节:用位域表示每一个字段是否为null,一个字节有8位,所以N个字段需要(N+7)/8个字节;

12.WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT和DELETE_ROWS_EVENT

binlog_format为ROW模式下,执行insert,update和delete操作产生的事件;
Fixed data part:
6字节:表Id;
2字节:保留字段为将来使用;

Variable part:

Packed integer:记录表中字段的数量;
可变字节:用位域表示每个字段是否被使用(比如只有更新、插入的字段才是被使用的),N个字段需要(N+7)/8个字节;
可变字节:仅用在UPDATE_ROWS_EVENT事件中,用位域表示每个字段更新之后是否被使用(值只有真正被更新了才是被使用的),N个字段需要(N+7)/8个字节;
接下来是记录的每一行的数据:
可变字节:当前行中的字段值是否为NULL,只有这个字段被标识为被使用,才会出现在这;
可变字节:当前行所有字段的值,只有这个字段被标识为被使用,并且值不为NULL才会有值;

13.INCIDENT_EVENT

主服务器发生了不正常的事件,通知从服务器并告知可能会导致数据处于不一致的状态;
Fixed data part:
1字节:不正常事件的编号;
1字节:消息的长度;

Variable part:

消息的内容,根据Fixed data part中指定的消息长度读取消息的内容;

14.HEARTBEAT_LOG_EVENT

主服务器告诉从服务器,主服务器还活着,不写入到日志文件中;
Fixed data part:空的

Variable part:空的

更多参考:

Java读取简单实例

1.创建表,并插入数据,产生binlog日志文件;

2.查看binlog中的事件;

mysql> show binlog events in 'bin-log.000001';+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Log_name       | Pos | Event_type  | Server_id | End_log_pos | Info                                                                                                                                                                                                           |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| bin-log.000001 |   4 | Format_desc |         1 |         107 | Server ver: 5.5.29-log, Binlog ver: 4                                                                                                                                                                          || bin-log.000001 | 107 | Query       |         1 |         364 | use `test`; CREATE TABLE `btest` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `age` int(11) DEFAULT NULL,  `name` varchar(255) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 || bin-log.000001 | 364 | Query       |         1 |         432 | BEGIN                                                                                                                                                                                                          || bin-log.000001 | 432 | Query       |         1 |         536 | use `test`; insert into btest values(1,100,'zhaohui')                                                                                                                                                          || bin-log.000001 | 536 | Xid         |         1 |         563 | COMMIT /* xid=30 */                                                                                                                                                                                            || bin-log.000001 | 563 | Stop        |         1 |         582 |                                                                                                                                                                                                                |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

3.通过java代码来读取binlog日志,具体代码如下:

public class BinlogRead {     private static RandomAccessFile file;    /** 魔数的字节长度 **/    private static final int MAGIN_LEN = 4;    /** 事件header长度 **/    private static final int EVENT_HEADER_LEN = 19;    /** Query_Event fix data长度 **/    private static final int QUERY_EVENT_FIX_LEN = 13;     public static void main(String[] args) throws Exception {        file = new RandomAccessFile(new File("D://bin-log.000001"), "rw");        FileChannel channel = file.getChannel();         /** 1.魔数4字节 **/        ByteBuffer magic = ByteBuffer.allocate(MAGIN_LEN);        channel.read(magic);         /** 2.Format_desc_Event事件 **/        EventHeader header = getEventHeader(channel);        channel.position(header.getEventLen() + MAGIN_LEN);         /** 3.Query_Event事件 **/        header = getEventHeader(channel);        System.out.println(getQueryEventSql(header.getEventLen(), channel));         /** 4.Query_Event事件 **/        header = getEventHeader(channel);        System.out.println(getQueryEventSql(header.getEventLen(), channel));         /** 5.Query_Event事件 **/        header = getEventHeader(channel);        System.out.println(getQueryEventSql(header.getEventLen(), channel));         /** 6.Xid_Event事件 **/        header = getEventHeader(channel);        ByteBuffer xidNumber = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);        channel.read(xidNumber);        xidNumber.flip();        System.out.println("xidNumber = " + xidNumber.getLong());         /** 7.Stop_Event事件 **/        header = getEventHeader(channel);     }     /**     * 获取事件Header信息     *      * @param channel     * @return     * @throws IOException     */    private static EventHeader getEventHeader(FileChannel channel) throws IOException {        ByteBuffer formatDescEventHeader = ByteBuffer.allocate(EVENT_HEADER_LEN).order(ByteOrder.LITTLE_ENDIAN);        channel.read(formatDescEventHeader);        formatDescEventHeader.flip();        EventHeader header = new EventHeader();        header.setTimestamp(formatDescEventHeader.getInt());        header.setTypeCode(formatDescEventHeader.get());        header.setServerId(formatDescEventHeader.getInt());        header.setEventLen(formatDescEventHeader.getInt());        header.setNextPosition(formatDescEventHeader.getInt());        header.setFlags(formatDescEventHeader.getShort());        System.out.println(header.toString());        return header;    }     /**     * 获取Query Event sql语句     *      * @param queryEventLen     * @param channel     * @return     * @throws IOException     */    private static String getQueryEventSql(int queryEventLen, FileChannel channel) throws IOException {        /** Query_Event fix data **/        ByteBuffer queryEventFix = ByteBuffer.allocate(QUERY_EVENT_FIX_LEN).order(ByteOrder.LITTLE_ENDIAN);        channel.read(queryEventFix);        queryEventFix.flip();        queryEventFix.position(11);         /** 状态变量的长度 **/        int statusLen = queryEventFix.getShort();         int queryEventValLen = queryEventLen - EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN;        ByteBuffer queryEventVal = ByteBuffer.allocate(queryEventValLen).order(ByteOrder.LITTLE_ENDIAN);        channel.read(queryEventVal);        queryEventVal.flip();        queryEventVal.position(statusLen);         /** 数据库名称 **/        queryEventVal.mark();        int length = 0;        while ('\0' != queryEventVal.get()) {            length++;        }        queryEventVal.reset();        byte dbName[] = new byte[length];        queryEventVal.get(dbName);        System.out.println("db name : " + new String(dbName, "utf-8"));         /** sql语句 **/        byte sql[] = new byte[queryEventValLen - statusLen - length - 1];        queryEventVal.get(sql);        return new String(sql, "utf-8");    }}
public class EventHeader {     private int timestamp;    private byte typeCode;    private int serverId;    private int eventLen;    private int nextPosition;    private int flags;     @Override    public String toString() {        return "EventHeader [timestamp=" + timestamp + ", typeCode=" + typeCode + ", serverId=" + serverId                + ", eventLen=" + eventLen + ", nextPosition=" + nextPosition + ", flags=" + flags + "]";    }                 //...get/set方法省略...}

总结

本文对事件的数据格式做了详细的介绍,因为所有事件的event header部分都是一样的,所以文中主要介绍的event data部分,event data主要包括两个部分:Fixed data part和Variable part;最后通过一个简单实例来大致了解事件数据的读取方式,后续会提供更详细的binlog事件数据读取。

转载于:https://my.oschina.net/OutOfMemory/blog/1579454

你可能感兴趣的文章
被遗忘的CSS
查看>>
Webpack中的sourcemap以及如何在生产和开发环境中合理的设置sourcemap的类型
查看>>
做完小程序项目、老板给我加了6k薪资~
查看>>
面试必备:LinkedList源码解析(JDK8)
查看>>
java工程师linux命令,这篇文章就够了
查看>>
翻译 | 玩转 React 表单 —— Refs 的运用
查看>>
关于React生命周期的学习
查看>>
从15000个Python开源项目中精选的Top30,Github平均star为3707,赶紧收藏!
查看>>
总奖金 200 万的 AI Challenger 开赛,可申请免费 GPU 资源
查看>>
webpack雪碧图生成
查看>>
搭建智能合约开发环境Remix IDE及使用
查看>>
iOS 12 is coming
查看>>
Blind Return Oriented Programming (BROP) Attack - 攻击原理
查看>>
在xib上removeFromSuperview引用计数问题
查看>>
开源一个企业官网
查看>>
移动端如何做适配?
查看>>
Java反射详细介绍
查看>>
Spring Cloud构建微服务架构—服务消费基础
查看>>
RAC实践采坑指北
查看>>
runtime运行时 isa指针 SEL方法选择器 IMP函数指针 Method方法 runtime消息机制 runtime的使用...
查看>>