系列文章
前言
前两篇文章和分别从Binlog入门和Binlog事件如何产生的两个角度来介绍Binlog,本文将从Binlog事件的数据来更深入的了解Binlog。Binlog事件数据
1.QUERY_EVENT 执行更新语句时会生成此事件,包括:create,insert,update,delete; Fixed data part,总长度13字节: 4字节:执行sql的线程id; 4字节:执行sql的时间; 1字节:数据库名称的长度; 2字节:执行sql产生的错误码; 2字节:状态变量的长度,具体内容在Variable part;Variable part:
可变字节:状态变量,每个状态变量key为一个字节,后面跟着value,不同的key对应不同长度的value,但是总长度在Fixed data part中已经定义; 可变字节:数据库名称 可变字节:sql语句,通过事件的总长度-header长度-Fixed data-状态变量,剩余的字节数组通过utf-8编码即可获取;2.STOP_EVENT
当mysqld停止时生成此事件; Fixed data part:空的Variable part:空的
3.ROTATE_EVENT
当mysqld切换到新的binlog文件生成此事件; Fixed data part,总长度8字节: 8字节:下一个binlog文件的第一个事件的position,这个值一直是4,因为魔数占用了4字节;Variable data part:
可变字节:下一个binlog的名称,它的长度=事件总长度-header长度-Fixed data4.INTVAR_EVENT
当sql语句中使用了AUTO_INCREMENT的字段或者LAST_INSERT_ID()函数; Fixed data part:空的Variable part:
1字节:一个变量类型的值:LAST_INSERT_ID_EVENT = 1 或者 INSERT_ID_EVENT = 2; 8字节:LAST_INSERT_ID()函数调用,或者AUTO_INCREMENT字段生成的一个无符号的整型;5.RAND_EVENT
| bin-log.000003 | 438 | RAND | 1 | 473 | rand_seed1=223769196,rand_seed2=1013907192
执行包含RAND()函数的语句产生此事件,此事件没有被用在binlog_format为ROW模式的情况下;
Fixed data part:空的Variable part:
8字节:第一个种子值(ex:rand_seed1=223769196) 8字节:第二个种子值(ex:rand_seed2=1013907192)6.USER_VAR_EVENT
| bin-log.000003 | 711 | User var | 1 | 756 | @`age`=50
执行包含了用户变量的语句产生此事件,此事件没有被用在binlog_format为ROW模式的情况下;
Fixed data part:空的Variable part:
4字节:用户变量名的大小; 可变字节:用户变量名,具体长度上一个4字节的数据指定了; 1字节:如果是变量值是NULL,那么此值是非0的;如果是此值是0,那么才有接下来的其他数据;应该是对有空值情况的一种优化; 1字节:用户变量类型,包括:(STRING_RESULT=0, REAL_RESULT=1, INT_RESULT=2, ROW_RESULT=3, DECIMAL_RESULT=4); 4字节:用户变量字符的数量; 4字节:用户变量值的长度; 可变字节:变量的值,通过变量类型和变量值的长度,可以解析出具体的变量值;7.FORMAT_DESCRIPTION_EVENT
| bin-log.000003 | 4 | Format_desc | 1 | 107 | Server ver: 5.5.29-log, Binlog ver: 4
描述事件,被写在每个binlog文件的开始位置,用在MySQL5.0以后的版本中,代替了START_EVENT_V3;
Fixed data part: 2字节:binlog版本,Mysql5.0以及以上的版本值为:4 50字节:Mysql Server版本; 4字节:事件创建的时间戳; 1字节:header的长度,binlog版本为4的情况下header长度是19; 可变字节:从START_EVENT_V3开始到第27个Event,每个Event的fixed part lengths,每个事件一个字节,总共27个字节;Variable part:空的
8.XID_EVENT
| bin-log.000003 | 315 | Xid | 1 | 342 | COMMIT /* xid=32 */
事务提交产生的XID_EVENT事件;
Fixed data part:空的Variable part:
8字节:事务编号;9.BEGIN_LOAD_QUERY_EVENT
| bin-log.000003 | 964 | Begin_load_query | 1 | 1008 | ;file_id=3;block_len=21
执行LOAD DATA INFILE 语句时产生此事件
Fixed data part: 4字节:加载Data File的ID,防止加载的Data File内容是相同的;Variable part:
加载数据的第一个块,如果文件大小超过某个阀值,后面会有多个APPEND_BLOCK_EVENT事件,每一个包含一个数据块;可变字节长度 = 事件的总长度 – header长度 – Fixed data;因为测试数据量比较少(999, 101, ‘zhaohui’)总共就21个字节,所以一个块足够了;10.EXECUTE_LOAD_QUERY_EVENT
| bin-log.000003 | 1008 | Execute_load_query | 1 | 1237 | use `test`; LOAD DATA INFILE 'D:/btest.sql' INTO TABLE `btest` FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '\\' LINES TERMINATED BY '\n' (`id`, `age`, `name`) ;file_id=3 |
执行LOAD DATA INFILE产生的事件,类似QUERY_EVENT事件,Fixed data的前13个字节和QUERY_EVENT类似;
Fixed data part: 4字节:执行sql的线程id; 4字节:执行sql的时间; 1字节:数据库名称的长度; 2字节:执行sql产生的错误码; 2字节:状态变量的长度,具体内容在Variable part; 4字节:加载Data File的ID; 4字节:文件名替换语句中的起始位置; 4字节:文件名替换语句中的结束位置; 1字节:如何处理重复数据,三个选项:LOAD_DUP_ERROR = 0, LOAD_DUP_IGNORE = 1, LOAD_DUP_REPLACE = 2Variable part:
1.状态变量,每个状态变量key为一个字节,后面跟着value,不同的key对应不同长度的value,但是总长度在Fixed data part中已经定义; 2.sql语句,通过事件的总长度-header长度-Fixed data-状态变量,剩余的字节数组通过utf-8编码即可获取;11.TABLE_MAP_EVENT
| bin-log.000004 | 844 | Table_map | 1 | 892 | table_id: 33 (test.btest)
将表的定义映射到一个数字,在行操作事件之前记录(包括:WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT,DELETE_ROWS_EVENT);
Fixed data part: 6字节:表Id; 2字节:保留字段为将来使用;Variable part:
1字节:数据库名字的长度; 可变字节:数据库名字,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取; 1字节:表名的长度; 可变字节:表名,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取; Packed integer:用来记录表中字段的数量; 注:Packed integer是一个可变字节的类型,根据数据大小字节大小不一样, 更多详细: 可变字节:表字段类型数组,每个字段一个字节; Packed integer:用来记录表元数据的长度; 可变字节:元数据块,根据前一个字节记录的名字长度,获取的字节数组通过utf-8编码即可获取; 可变字节:用位域表示每一个字段是否为null,一个字节有8位,所以N个字段需要(N+7)/8个字节;12.WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT和DELETE_ROWS_EVENT
binlog_format为ROW模式下,执行insert,update和delete操作产生的事件; Fixed data part: 6字节:表Id; 2字节:保留字段为将来使用;Variable part:
Packed integer:记录表中字段的数量; 可变字节:用位域表示每个字段是否被使用(比如只有更新、插入的字段才是被使用的),N个字段需要(N+7)/8个字节; 可变字节:仅用在UPDATE_ROWS_EVENT事件中,用位域表示每个字段更新之后是否被使用(值只有真正被更新了才是被使用的),N个字段需要(N+7)/8个字节; 接下来是记录的每一行的数据: 可变字节:当前行中的字段值是否为NULL,只有这个字段被标识为被使用,才会出现在这; 可变字节:当前行所有字段的值,只有这个字段被标识为被使用,并且值不为NULL才会有值;13.INCIDENT_EVENT
主服务器发生了不正常的事件,通知从服务器并告知可能会导致数据处于不一致的状态; Fixed data part: 1字节:不正常事件的编号; 1字节:消息的长度;Variable part:
消息的内容,根据Fixed data part中指定的消息长度读取消息的内容;14.HEARTBEAT_LOG_EVENT
主服务器告诉从服务器,主服务器还活着,不写入到日志文件中; Fixed data part:空的Variable part:空的
更多参考:
Java读取简单实例
1.创建表,并插入数据,产生binlog日志文件;2.查看binlog中的事件;
mysql> show binlog events in 'bin-log.000001';+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| bin-log.000001 | 4 | Format_desc | 1 | 107 | Server ver: 5.5.29-log, Binlog ver: 4 || bin-log.000001 | 107 | Query | 1 | 364 | use `test`; CREATE TABLE `btest` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `age` int(11) DEFAULT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 || bin-log.000001 | 364 | Query | 1 | 432 | BEGIN || bin-log.000001 | 432 | Query | 1 | 536 | use `test`; insert into btest values(1,100,'zhaohui') || bin-log.000001 | 536 | Xid | 1 | 563 | COMMIT /* xid=30 */ || bin-log.000001 | 563 | Stop | 1 | 582 | |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3.通过java代码来读取binlog日志,具体代码如下:
public class BinlogRead { private static RandomAccessFile file; /** 魔数的字节长度 **/ private static final int MAGIN_LEN = 4; /** 事件header长度 **/ private static final int EVENT_HEADER_LEN = 19; /** Query_Event fix data长度 **/ private static final int QUERY_EVENT_FIX_LEN = 13; public static void main(String[] args) throws Exception { file = new RandomAccessFile(new File("D://bin-log.000001"), "rw"); FileChannel channel = file.getChannel(); /** 1.魔数4字节 **/ ByteBuffer magic = ByteBuffer.allocate(MAGIN_LEN); channel.read(magic); /** 2.Format_desc_Event事件 **/ EventHeader header = getEventHeader(channel); channel.position(header.getEventLen() + MAGIN_LEN); /** 3.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel)); /** 4.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel)); /** 5.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel)); /** 6.Xid_Event事件 **/ header = getEventHeader(channel); ByteBuffer xidNumber = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); channel.read(xidNumber); xidNumber.flip(); System.out.println("xidNumber = " + xidNumber.getLong()); /** 7.Stop_Event事件 **/ header = getEventHeader(channel); } /** * 获取事件Header信息 * * @param channel * @return * @throws IOException */ private static EventHeader getEventHeader(FileChannel channel) throws IOException { ByteBuffer formatDescEventHeader = ByteBuffer.allocate(EVENT_HEADER_LEN).order(ByteOrder.LITTLE_ENDIAN); channel.read(formatDescEventHeader); formatDescEventHeader.flip(); EventHeader header = new EventHeader(); header.setTimestamp(formatDescEventHeader.getInt()); header.setTypeCode(formatDescEventHeader.get()); header.setServerId(formatDescEventHeader.getInt()); header.setEventLen(formatDescEventHeader.getInt()); header.setNextPosition(formatDescEventHeader.getInt()); header.setFlags(formatDescEventHeader.getShort()); System.out.println(header.toString()); return header; } /** * 获取Query Event sql语句 * * @param queryEventLen * @param channel * @return * @throws IOException */ private static String getQueryEventSql(int queryEventLen, FileChannel channel) throws IOException { /** Query_Event fix data **/ ByteBuffer queryEventFix = ByteBuffer.allocate(QUERY_EVENT_FIX_LEN).order(ByteOrder.LITTLE_ENDIAN); channel.read(queryEventFix); queryEventFix.flip(); queryEventFix.position(11); /** 状态变量的长度 **/ int statusLen = queryEventFix.getShort(); int queryEventValLen = queryEventLen - EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN; ByteBuffer queryEventVal = ByteBuffer.allocate(queryEventValLen).order(ByteOrder.LITTLE_ENDIAN); channel.read(queryEventVal); queryEventVal.flip(); queryEventVal.position(statusLen); /** 数据库名称 **/ queryEventVal.mark(); int length = 0; while ('\0' != queryEventVal.get()) { length++; } queryEventVal.reset(); byte dbName[] = new byte[length]; queryEventVal.get(dbName); System.out.println("db name : " + new String(dbName, "utf-8")); /** sql语句 **/ byte sql[] = new byte[queryEventValLen - statusLen - length - 1]; queryEventVal.get(sql); return new String(sql, "utf-8"); }}
public class EventHeader { private int timestamp; private byte typeCode; private int serverId; private int eventLen; private int nextPosition; private int flags; @Override public String toString() { return "EventHeader [timestamp=" + timestamp + ", typeCode=" + typeCode + ", serverId=" + serverId + ", eventLen=" + eventLen + ", nextPosition=" + nextPosition + ", flags=" + flags + "]"; } //...get/set方法省略...}
总结
本文对事件的数据格式做了详细的介绍,因为所有事件的event header部分都是一样的,所以文中主要介绍的event data部分,event data主要包括两个部分:Fixed data part和Variable part;最后通过一个简单实例来大致了解事件数据的读取方式,后续会提供更详细的binlog事件数据读取。