diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index c153fcda..0f51f1a7 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -142,7 +142,8 @@ def __init__(self, connection_settings, server_id, slave_heartbeat=None, is_mariadb=False, annotate_rows_event=False, - ignore_decode_errors=False): + ignore_decode_errors=False, + verify_checksum=False,): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -184,6 +185,7 @@ def __init__(self, connection_settings, server_id, used with 'is_mariadb' ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. + verify_checksum: If true, verify events read from the binary log by examining checksums. """ self.__connection_settings = connection_settings @@ -206,6 +208,7 @@ def __init__(self, connection_settings, server_id, only_events, ignored_events, filter_non_implemented_events) self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable self.__ignore_decode_errors = ignore_decode_errors + self.__verify_checksum = verify_checksum # We can't filter on packet level TABLE_MAP and rotate event because # we need them for handling other operations @@ -535,7 +538,8 @@ def fetchone(self): self.__ignored_schemas, self.__freeze_schema, self.__fail_on_table_metadata_unavailable, - self.__ignore_decode_errors) + self.__ignore_decode_errors, + self.__verify_checksum,) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 12db2915..7bab24b5 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,6 +3,7 @@ import binascii import struct import datetime +import zlib from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch @@ -16,7 +17,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, ignored_schemas=None, freeze_schema=False, fail_on_table_metadata_unavailable=False, - ignore_decode_errors=False): + ignore_decode_errors=False, + verify_checksum=False,): self.packet = from_packet self.table_map = table_map self.event_type = self.packet.event_type @@ -26,10 +28,13 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, self.mysql_version = mysql_version self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable self._ignore_decode_errors = ignore_decode_errors + self._verify_checksum = verify_checksum + self._is_event_valid = None # The event have been fully processed, if processed is false # the event will be skipped self._processed = True self.complete = True + self._verify_event() def _read_table_id(self): # Table ID is 6 byte @@ -37,6 +42,18 @@ def _read_table_id(self): table_id = self.packet.read(6) + b"\x00\x00" return struct.unpack(' U(0110111) + modified_byte = b"U" + wrong_event_data = correct_event_data[:1] + modified_byte + correct_event_data[2:] + + packet = MysqlPacket(correct_event_data, 0) + wrong_packet = MysqlPacket(wrong_event_data, 0) + self.stream.fetchone() # for '_ctl_connection' parameter + binlog_event = create_binlog_packet_wrapper(packet) + wrong_event = create_binlog_packet_wrapper(wrong_packet) + self.assertEqual(binlog_event.event._is_event_valid, True) + self.assertNotEqual(wrong_event.event._is_event_valid, True) + class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self):