mysql binlog系列(二)----java解析binlog
在进入正题之前,我们需要知道binlog的event的类型,先来看看自己binlog文件有哪些?
其中红色部分为event_type。
binlog event 的类型有很多,具体可以参见mysql官方文档:http://dev.mysql.com/doc/internals/en/event-meanings.html
(一)Open Replicator中相关的Event类与接口
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog
events以回调的方式通知应用。
所有的Event实现了BinlogEventV4接口。
BinlogEventV4的接口如下:
/** * +=====================================+ * | event | timestamp 0 : 4 | * | header +----------------------------+ * | | type_code 4 : 1 | * | +----------------------------+ * | | server_id 5 : 4 | * | +----------------------------+ * | | event_length 9 : 4 | * | +----------------------------+ * | | next_position 13 : 4 | * | +----------------------------+ * | | flags 17 : 2 | * +=====================================+ * | event | fixed part 19 : y | * | data +----------------------------+ * | | variable part | * +=====================================+ * @author Jingqi Xu */ public interface BinlogEventV4 { BinlogEventV4Header getHeader(); }
(二)利用Open Replicator解析binlog
在这里首先申明本人的测试环境为:mysql 5.1.61 ,binlog的类型设置为Row,本次解析只考虑insert、update、delete三种事件类型。我们先将三种类型的时间包装为一个新的Event,如下所示:
public class LogEvent implements Serializable{ /** * 只针对delete、insert、update事件 */ private static final long serialVersionUID = 5503152746318421290L; private String eventId = null; private String databaseName = null; private String tableName = null; private String eventType = null; private Long timestamp = null; private Long timestampRecepite = null; private String binlogName = null; private Long position = null; private Long nextPosition = null; private Long serverId = null; private Map<String, String> before =null; private Map<String, String> after = null; public LogEvent(){ } public LogEvent(final QueryEvent qe,String databaseName,String tableName){ this.init(qe); this.databaseName=databaseName; this.tableName=tableName; } public LogEvent(final AbstractRowEvent re){ this.init(re); TableMapEvent tableMapEvent =re.getTme(); this.databaseName=tableMapEvent.getDatabaseName().toString(); this.tableName=tableMapEvent.getTableName().toString(); } private void init(final BinlogEventV4 be){ this.eventId=UUID.randomUUID().toString(); BinlogEventV4Header header = be.getHeader(); this.binlogName = header.getBinlogName(); this.position = header.getPosition(); this.nextPosition = header.getNextPosition(); this.timestamp = header.getTimestamp(); this.timestampRecepite = header.getTimestampOfReceipt(); this.serverId=header.getServerId(); this.eventType=MySqlEventTypeIdToString.getInstance().get(header.getEventType()); } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("{ eventId:").append(eventId); builder.append(",databaseName:").append(databaseName); builder.append(",tableName:").append(tableName); builder.append(",eventType:").append(eventType); builder.append(",timestamp:").append(timestamp); builder.append(",timestampRecepite:").append(timestampRecepite); builder.append(",binlogName:").append(binlogName); builder.append(",position:").append(position); builder.append(",nextPosition:").append(nextPosition); builder.append(",serverId:").append(serverId); builder.append(",before:").append(before); builder.append(",after:").append(after).append(" }"); return builder.toString(); } public String getEventId() { return eventId; } public void setEventId(String eventId) { this.eventId = eventId; } public String getDatabaseName() { return databaseName; } public void setDatabaseName(String databaseName) { this.databaseName = databaseName; } public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public Long getTimestampRecepite() { return timestampRecepite; } public void setTimestampRecepite(Long timestampRecepite) { this.timestampRecepite = timestampRecepite; } public String getBinlogName() { return binlogName; } public void setBinlogName(String binlogName) { this.binlogName = binlogName; } public Long getPosition() { return position; } public void setPosition(Long position) { this.position = position; } public Long getNextPosition() { return nextPosition; } public void setNextPosition(Long nextPosition) { this.nextPosition = nextPosition; } public Long getServerId() { return serverId; } public void setServerId(Long serverId) { this.serverId = serverId; } public Map<String, String> getBefore() { return before; } public void setBefore(Map<String, String> before) { this.before = before; } public Map<String, String> getAfter() { return after; } public void setAfter(Map<String, String> after) { this.after = after; }
其中 before、after为一个map,表示变化前后所在行的所有数据(columnName:columnValue)!
好的,先上主程序:
public class OpenReplicatorTest { public static void main(String args[]) throws Exception { final OpenReplicator or = new OpenReplicator(); or.setUser("root"); or.setPassword("root"); or.setHost("xx.xxx.xx.xx"); or.setPort(3306); or.setServerId(23); or.setBinlogPosition(106); or.setBinlogFileName("mysql-bin.000001"); or.setBinlogEventListener(new NotificationListener()); or.start(); } }
设置监控器NotificationListener,NotificationListener需要实现BinlogEventListener接口:
public class NotificationListener implements BinlogEventListener{ private static Logger logger = LoggerFactory.getLogger(NotificationListener.class); private String host="xx.xx.xx.xx"; private Integer port=3306; private String username="root"; private String password="root"; public void onEvents(BinlogEventV4 event) { if(event==null){ logger.error("binlog event is null"); return; } if(event instanceof UpdateRowsEvent){ UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent)event; LogEvent logEvent = new LogEvent(updateRowsEvent); List<Pair<Row>> rows = updateRowsEvent.getRows(); List<Column> cols_after = null; List<Column> cols_before = null; for(Pair<Row> p : rows){ Row after = p.getAfter(); Row before = p.getBefore(); cols_after = after.getColumns(); cols_before = before.getColumns(); break; } logEvent.setBefore(getMap(cols_before, updateRowsEvent.getTme().getDatabaseName().toString(), updateRowsEvent.getTme().getTableName().toString())); logEvent.setAfter(getMap(cols_after, updateRowsEvent.getTme().getDatabaseName().toString(), updateRowsEvent.getTme().getTableName().toString())); logger.info("update event is:"+logEvent); }else if(event instanceof DeleteRowsEvent){ DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent)event; LogEvent logEvent = new LogEvent(deleteRowsEvent); List<Row> rows = deleteRowsEvent.getRows(); List<Column> before = null; for(Row row:rows){ before = row.getColumns(); break; } logEvent.setBefore(getMap(before, deleteRowsEvent.getTme().getDatabaseName().toString(), deleteRowsEvent.getTme().getTableName().toString())); logger.info("delete event is:"+logEvent); }else if(event instanceof WriteRowsEvent){ WriteRowsEvent wrtiteRowsEvent = (WriteRowsEvent)event; LogEvent logEvent = new LogEvent(wrtiteRowsEvent); List<Row> rows = wrtiteRowsEvent.getRows(); List<Column> before = null; for(Row row:rows){ before = row.getColumns(); break; } logEvent.setAfter(getMap(before, wrtiteRowsEvent.getTme().getDatabaseName().toString(), wrtiteRowsEvent.getTme().getTableName().toString())); logger.info("write event is:"+logEvent); } } private Map<String, String> getMap(List<Column> cols,String databaseName,String tableName){ if(cols==null||cols.size()==0){ return null; } List<String> columnNames = new TableInfo(host,username,password, port).getColumns(databaseName, tableName); if(columnNames==null){ return null; } if(columnNames.size()!=cols.size()){ logger.error("the size does not match..."); return null; } Map<String, String> map = new HashMap<String, String>(); for(int i=0;i<columnNames.size();i++){ if(cols.get(i).getValue()==null){ map.put(columnNames.get(i).toString(),""); }else{ map.put(columnNames.get(i).toString(),cols.get(i).toString()); } } return map; }
由于Open Replicator提供的Event中不包含数据库表中所有字段column name的信息,DeleteRowsEvent、UpdateRowsEvent、WriteRowsEvent包含变化前后的字段column value信息,而我们需要将其组合成before与after,因此需要想办法获取column names:
public class TableInfo { private static Logger logger = LoggerFactory.getLogger(TableInfo.class); /** * key:databaseName+""+tableName * value:columns name */ private static Map<String, List<String>> columnsMap = new HashMap<String, List<String>>(); private String host; private Integer port; private String username; private String password; public TableInfo(String host,String username,String password,Integer port){ this.host=host; this.username=username; this.password=password; this.port = port; if(columnsMap==null||columnsMap.size()==0){ MysqlConnection.setConnection(this.host,this.port,this.username,this.password); columnsMap = MysqlConnection.getColumns(); } } public Map<String, List<String>> getMap(){ return columnsMap; } public List<String> getColumns(String databaseName,String tableName){ if(StringUtils.isNullOrEmpty(databaseName)||StringUtils.isNullOrEmpty(tableName)){ return null; } String key = databaseName + "."+tableName; List<String> list =null; if(columnsMap.size()==0){ MysqlConnection.setConnection(this.host,this.port,this.username,this.password); columnsMap = MysqlConnection.getColumns(); list = columnsMap.get(key); }else{ list=columnsMap.get(key); if(list==null||list.size()==0){ MysqlConnection.setConnection(this.host,this.port,this.username,this.password); columnsMap = MysqlConnection.getColumns(); list = columnsMap.get(key); } } return list; }
public class MysqlConnection { private static Connection conn; private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class); private static String host; private static Integer port; private static String user; private static String password; public static void setConnection(String mySQLHost, Integer mySQLPort, String mySQLUser, String mySQLPassword) { try { if (conn == null || conn.isClosed()) { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://" + mySQLHost + ":" + mySQLPort + "/", mySQLUser, mySQLPassword); logger.info("connected to mysql:{} : {}", mySQLHost, mySQLPort); host = mySQLHost; port = mySQLPort; user = mySQLUser; password = mySQLPassword; } } catch (Exception e) { logger.error(e.getMessage(), e); } } public static Connection getConnection() { try { if (conn == null || conn.isClosed()) { setConnection(host, port, user, password); } } catch (Exception e) { logger.error(e.getMessage(), e); } return conn; } public static Map<String, List<String>> getColumns(){ Map<String, List<String>> cols = new HashMap<String, List<String>>(); Connection conn = getConnection(); try { DatabaseMetaData metaData = conn.getMetaData(); ResultSet r = metaData.getCatalogs(); String tableType[] = { "TABLE" }; while (r.next()) { String databaseName = r.getString("TABLE_CAT"); ResultSet result = metaData.getTables(databaseName, null, null, tableType); while (result.next()) { String tableName = result.getString("TABLE_NAME"); String key = databaseName + "." + tableName; ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null); cols.put(key, new ArrayList<String>()); while (colSet.next()) { String column = colSet.getString("COLUMN_NAME"); cols.get(key).add(column); } } } } catch (SQLException e) { logger.error(e.getMessage(), e); return null; } return cols; } }
辅助类,根据event id获取event type:
public class MySqlEventTypeIdToString { private static Map<Integer, String> idToString = new HashMap<Integer, String>(); private MySqlEventTypeIdToString() { Init(); } public static MySqlEventTypeIdToString getInstance() { return m; } private void Init() { idToString.put(0,"UNKNOWN_EVENT"); idToString.put(1,"START_EVENT_V3"); idToString.put(2,"QUERY_EVENT"); idToString.put(3,"STOP_EVENT"); idToString.put(4,"ROTATE_EVENT"); idToString.put(5,"INTVAR_EVENT"); idToString.put(6,"LOAD_EVENT"); idToString.put(7,"SLAVE_EVENT"); idToString.put(8,"CREATE_FILE_EVENT"); idToString.put(9,"APPEND_BLOCK_EVENT"); idToString.put(10,"EXEC_LOAD_EVENT"); idToString.put(11,"DELETE_FILE_EVENT"); idToString.put(12,"NEW_LOAD_EVENT"); idToString.put(13,"RAND_EVENT"); idToString.put(14,"USER_VAR_EVENT"); idToString.put(15,"FORMAT_DESCRIPTION_EVENT"); idToString.put(16,"XID_EVENT"); idToString.put(17,"BEGIN_LOAD_QUERY_EVENT"); idToString.put(18,"EXECUTE_LOAD_QUERY_EVENT"); idToString.put(19,"TABLE_MAP_EVENT"); idToString.put(20,"PRE_GA_WRITE_ROWS_EVENT"); idToString.put(21,"PRE_GA_UPDATE_ROWS_EVENT"); idToString.put(22,"PRE_GA_DELETE_ROWS_EVENT"); idToString.put(23,"WRITE_ROWS_EVENT"); idToString.put(24,"UPDATE_ROWS_EVENT"); idToString.put(25,"DELETE_ROWS_EVENT"); idToString.put(26,"INCIDENT_EVENT"); idToString.put(27,"HEARTBEAT_LOG_EVENT"); idToString.put(28,"IGNORABLE_LOG_EVENT"); idToString.put(29,"ROWS_QUERY_LOG_EVENT"); idToString.put(30,"WRITE_ROWS_EVENT_V2"); idToString.put(31,"UPDATE_ROWS_EVENT_V2"); idToString.put(32,"DELETE_ROWS_EVENT_V2"); idToString.put(33,"GTID_LOG_EVENT"); idToString.put(34,"ANONYMOUS_GTID_LOG_EVENT"); idToString.put(35,"PREVIOUS_GTIDS_LOG_EVENT"); } public String get(Integer eventId) { return idToString.get(eventId); } }
运行:
update event is: { eventId: a7acc3d0-7721-4ffe-84d4-4c2b7db5423a, databaseName: test, tableName: task, eventType: UPDATE_ROWS_EVENT, timestamp: 1450753740000, timestampRecepite: 1450887259271, binlogName: mysql-bin.000001, position: 248, nextPosition: 358, serverId: 23, before: { id=791, user_name=123, topology_path=, update_time=2015-08-05 10:53:57.0, status=1, department=, name=user01, create_time=2015-12-21 19:30:36.0, user_id=-1 }, after: { id=791, user_name=123, topology_path=, update_time=2015-08-05 10:53:57.0, status=2, department=, name=user02, create_time=2015-12-22 11:09:00.0, user_id=-1 } }
声明:该文观点仅代表作者本人,入门客AI创业平台信息发布平台仅提供信息存储空间服务,如有疑问请联系rumenke@qq.com。
- 上一篇:没有了
- 下一篇: mysql查询最近7天的数据,没有数据自动补0