From 19622f5ac1a8c29cd2e982f665084e5d373d4fb0 Mon Sep 17 00:00:00 2001 From: Pilmo Date: Mon, 28 Aug 2023 14:15:19 +0900 Subject: [PATCH 1/4] Implement event validation code using crc32 4-byte --- pymysqlreplication/binlogstream.py | 8 +++- pymysqlreplication/event.py | 19 ++++++++- pymysqlreplication/packet.py | 7 +++- pymysqlreplication/tests/test_basic.py | 55 +++++++++++++++++++++++++- 4 files changed, 83 insertions(+), 6 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 4f8213d5..101f0abe 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -141,7 +141,8 @@ def __init__(self, connection_settings, server_id, slave_heartbeat=None, is_mariadb=False, annotate_rows_event=False, - ignore_decode_errors=False): + ignore_decode_errors=False, + use_crc32=False,): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -183,6 +184,7 @@ def __init__(self, connection_settings, server_id, used with 'is_mariadb' ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. + use_crc32: If true, use CRC32 4-byte for events validation, ensuring data integrity. """ self.__connection_settings = connection_settings @@ -205,6 +207,7 @@ def __init__(self, connection_settings, server_id, only_events, ignored_events, filter_non_implemented_events) self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable self.__ignore_decode_errors = ignore_decode_errors + self.__use_crc32 = use_crc32 # We can't filter on packet level TABLE_MAP and rotate event because # we need them for handling other operations @@ -534,7 +537,8 @@ def fetchone(self): self.__ignored_schemas, self.__freeze_schema, self.__fail_on_table_metadata_unavailable, - self.__ignore_decode_errors) + self.__ignore_decode_errors, + self.__use_crc32,) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 12c285c8..e39c35fc 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,6 +3,7 @@ import binascii import struct import datetime +import zlib from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch @@ -16,7 +17,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, ignored_schemas=None, freeze_schema=False, fail_on_table_metadata_unavailable=False, - ignore_decode_errors=False): + ignore_decode_errors=False, + use_crc32=False,): self.packet = from_packet self.table_map = table_map self.event_type = self.packet.event_type @@ -26,10 +28,13 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, self.mysql_version = mysql_version self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable self._ignore_decode_errors = ignore_decode_errors + self._use_crc32 = use_crc32 + self._is_event_valid = None # The event have been fully processed, if processed is false # the event will be skipped self._processed = True self.complete = True + self._validate_event() def _read_table_id(self): # Table ID is 6 byte @@ -37,6 +42,18 @@ def _read_table_id(self): table_id = self.packet.read(6) + b"\x00\x00" return struct.unpack(' U(0110111) + modified_byte = b"U" + wrong_event_data = correct_event_data[:1] + modified_byte + correct_event_data[2:] + + packet = MysqlPacket(correct_event_data, 0) + wrong_packet = MysqlPacket(wrong_event_data, 0) + self.stream.fetchone() # for '_ctl_connection' parameter + binlog_event = create_binlog_packet_wrapper(packet) + wrong_event = create_binlog_packet_wrapper(wrong_packet) + self.assertEqual(binlog_event.event._is_event_valid, True) + self.assertNotEqual(wrong_event.event._is_event_valid, True) + class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): From 78d55d45014ec535c86f8a0f58b5bef91838bf11 Mon Sep 17 00:00:00 2001 From: Pilmo Date: Mon, 28 Aug 2023 14:21:13 +0900 Subject: [PATCH 2/4] correct a typo --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index dce63e73..38bd2841 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -539,7 +539,7 @@ def create_binlog_packet_wrapper(pkt): self.stream._BinLogStreamReader__freeze_schema, self.stream._BinLogStreamReader__fail_on_table_metadata_unavailable, self.stream._BinLogStreamReader__ignore_decode_errors, - self.stream._BinLogStreamReader__verify_crc32) + self.stream._BinLogStreamReader__use_crc32) self.stream.close() self.stream = BinLogStreamReader( From 6398ecb2f900a0824eb0ef116db9425aa18a607d Mon Sep 17 00:00:00 2001 From: Pilmo Date: Mon, 28 Aug 2023 14:38:50 +0900 Subject: [PATCH 3/4] Fix typo --- pymysqlreplication/tests/test_basic.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 4baa763c..03c474e0 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -539,8 +539,7 @@ def create_binlog_packet_wrapper(pkt): self.stream._BinLogStreamReader__freeze_schema, self.stream._BinLogStreamReader__fail_on_table_metadata_unavailable, self.stream._BinLogStreamReader__ignore_decode_errors, - self.stream._BinLogStreamReader__use_crc32) - + self.stream._BinLogStreamReader__use_crc32,) self.stream.close() self.stream = BinLogStreamReader( self.database, From a2896ac0e808c9cc00135139181a43647d4cef19 Mon Sep 17 00:00:00 2001 From: Pilmo Date: Mon, 28 Aug 2023 16:50:01 +0900 Subject: [PATCH 4/4] rename a parameter and a function --- pymysqlreplication/binlogstream.py | 8 ++++---- pymysqlreplication/event.py | 10 +++++----- pymysqlreplication/packet.py | 6 +++--- pymysqlreplication/tests/test_basic.py | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index c012251b..0f51f1a7 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -143,7 +143,7 @@ def __init__(self, connection_settings, server_id, is_mariadb=False, annotate_rows_event=False, ignore_decode_errors=False, - use_crc32=False,): + verify_checksum=False,): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -185,7 +185,7 @@ def __init__(self, connection_settings, server_id, used with 'is_mariadb' ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. - use_crc32: If true, use CRC32 4-byte for events validation, ensuring data integrity. + verify_checksum: If true, verify events read from the binary log by examining checksums. """ self.__connection_settings = connection_settings @@ -208,7 +208,7 @@ def __init__(self, connection_settings, server_id, only_events, ignored_events, filter_non_implemented_events) self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable self.__ignore_decode_errors = ignore_decode_errors - self.__use_crc32 = use_crc32 + self.__verify_checksum = verify_checksum # We can't filter on packet level TABLE_MAP and rotate event because # we need them for handling other operations @@ -539,7 +539,7 @@ def fetchone(self): self.__freeze_schema, self.__fail_on_table_metadata_unavailable, self.__ignore_decode_errors, - self.__use_crc32,) + self.__verify_checksum,) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index bd3c362e..7bab24b5 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -18,7 +18,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, freeze_schema=False, fail_on_table_metadata_unavailable=False, ignore_decode_errors=False, - use_crc32=False,): + verify_checksum=False,): self.packet = from_packet self.table_map = table_map self.event_type = self.packet.event_type @@ -28,13 +28,13 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, self.mysql_version = mysql_version self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable self._ignore_decode_errors = ignore_decode_errors - self._use_crc32 = use_crc32 + self._verify_checksum = verify_checksum self._is_event_valid = None # The event have been fully processed, if processed is false # the event will be skipped self._processed = True self.complete = True - self._validate_event() + self._verify_event() def _read_table_id(self): # Table ID is 6 byte @@ -42,8 +42,8 @@ def _read_table_id(self): table_id = self.packet.read(6) + b"\x00\x00" return struct.unpack('