Skip to content

Enhance Data Integrity with Binlog Event Checksum Verification #458

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def __init__(self, connection_settings, server_id,
slave_heartbeat=None,
is_mariadb=False,
annotate_rows_event=False,
ignore_decode_errors=False):
ignore_decode_errors=False,
verify_checksum=False,):
"""
Attributes:
ctl_connection_settings: Connection settings for cluster holding
Expand Down Expand Up @@ -184,6 +185,7 @@ def __init__(self, connection_settings, server_id,
used with 'is_mariadb'
ignore_decode_errors: If true, any decode errors encountered
when reading column data will be ignored.
verify_checksum: If true, verify events read from the binary log by examining checksums.
"""

self.__connection_settings = connection_settings
Expand All @@ -206,6 +208,7 @@ def __init__(self, connection_settings, server_id,
only_events, ignored_events, filter_non_implemented_events)
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
self.__ignore_decode_errors = ignore_decode_errors
self.__verify_checksum = verify_checksum

# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
Expand Down Expand Up @@ -535,7 +538,8 @@ def fetchone(self):
self.__ignored_schemas,
self.__freeze_schema,
self.__fail_on_table_metadata_unavailable,
self.__ignore_decode_errors)
self.__ignore_decode_errors,
self.__verify_checksum,)

if binlog_event.event_type == ROTATE_EVENT:
self.log_pos = binlog_event.event.position
Expand Down
19 changes: 18 additions & 1 deletion pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import binascii
import struct
import datetime
import zlib
from pymysqlreplication.constants.STATUS_VAR_KEY import *
from pymysqlreplication.exceptions import StatusVariableMismatch

Expand All @@ -16,7 +17,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
ignored_schemas=None,
freeze_schema=False,
fail_on_table_metadata_unavailable=False,
ignore_decode_errors=False):
ignore_decode_errors=False,
verify_checksum=False,):
self.packet = from_packet
self.table_map = table_map
self.event_type = self.packet.event_type
Expand All @@ -26,17 +28,32 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
self.mysql_version = mysql_version
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
self._ignore_decode_errors = ignore_decode_errors
self._verify_checksum = verify_checksum
self._is_event_valid = None
# The event have been fully processed, if processed is false
# the event will be skipped
self._processed = True
self.complete = True
self._verify_event()

def _read_table_id(self):
# Table ID is 6 byte
# pad little-endian number
table_id = self.packet.read(6) + b"\x00\x00"
return struct.unpack('<Q', table_id)[0]

def _verify_event(self):
if not self._verify_checksum:
return

self.packet.rewind(1)
data = self.packet.read(19 + self.event_size)
footer = self.packet.read(4)
byte_data = zlib.crc32(data).to_bytes(4, byteorder='little')
self._is_event_valid = True if byte_data == footer else False
self.packet.read_bytes -= (19 + self.event_size + 4)
self.packet.rewind(20)

def dump(self):
print("=== %s ===" % (self.__class__.__name__))
print("Date: %s" % (datetime.datetime.utcfromtimestamp(self.timestamp)
Expand Down
7 changes: 5 additions & 2 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def __init__(self, from_packet, table_map,
ignored_schemas,
freeze_schema,
fail_on_table_metadata_unavailable,
ignore_decode_errors):
ignore_decode_errors,
verify_checksum,):
# -1 because we ignore the ok byte
self.read_bytes = 0
# Used when we want to override a value in the data buffer
Expand Down Expand Up @@ -135,6 +136,7 @@ def __init__(self, from_packet, table_map,
if use_checksum:
event_size_without_header = self.event_size - 23
else:
verify_checksum = False
event_size_without_header = self.event_size - 19

self.event = None
Expand All @@ -151,7 +153,8 @@ def __init__(self, from_packet, table_map,
ignored_schemas=ignored_schemas,
freeze_schema=freeze_schema,
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
ignore_decode_errors=ignore_decode_errors)
ignore_decode_errors=ignore_decode_errors,
verify_checksum=verify_checksum)
if self.event._processed == False:
self.event = None

Expand Down
54 changes: 53 additions & 1 deletion pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
from pymysqlreplication.event import *
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *
from pymysqlreplication.packet import BinLogPacketWrapper
from pymysql.protocol import MysqlPacket

__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"]
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings",
"TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting",
"TestRowsQueryLogEvents"]


class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
Expand Down Expand Up @@ -522,6 +526,54 @@ def test_end_log_pos(self):
self.assertEqual(last_log_pos, 888)
self.assertEqual(last_event_type, TABLE_MAP_EVENT)

def test_event_validation(self):
def create_binlog_packet_wrapper(pkt):
return BinLogPacketWrapper(pkt, self.stream.table_map,
self.stream._ctl_connection, self.stream.mysql_version,
self.stream._BinLogStreamReader__use_checksum,
self.stream._BinLogStreamReader__allowed_events_in_packet,
self.stream._BinLogStreamReader__only_tables,
self.stream._BinLogStreamReader__ignored_tables,
self.stream._BinLogStreamReader__only_schemas,
self.stream._BinLogStreamReader__ignored_schemas,
self.stream._BinLogStreamReader__freeze_schema,
self.stream._BinLogStreamReader__fail_on_table_metadata_unavailable,
self.stream._BinLogStreamReader__ignore_decode_errors,
self.stream._BinLogStreamReader__verify_checksum,)
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
blocking=False,
verify_checksum=True
)
# For event data, refer to the official document example data of mariaDB.
# https://mariadb.com/kb/en/query_event/#example-with-crc32
correct_event_data = (
# OK value
b"\x00"
# Header
b"q\x17(Z\x02\x8c'\x00\x00U\x00\x00\x00\x01\t\x00\x00\x00\x00"
# Content
b"f\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1a\x00"
b"\x00\x00\x00\x00\x00\x01\x00\x00\x00P\x00\x00"
b"\x00\x00\x06\x03std\x04\x08\x00\x08\x00\x08\x00\x00"
b"TRUNCATE TABLE test.t4"
# CRC 32, 4 Bytes
b"Ji\x9e\xed"
)
# Assume a bit flip occurred while data was being transmitted q(1001000) -> U(0110111)
modified_byte = b"U"
wrong_event_data = correct_event_data[:1] + modified_byte + correct_event_data[2:]

packet = MysqlPacket(correct_event_data, 0)
wrong_packet = MysqlPacket(wrong_event_data, 0)
self.stream.fetchone() # for '_ctl_connection' parameter
binlog_event = create_binlog_packet_wrapper(packet)
wrong_event = create_binlog_packet_wrapper(wrong_packet)
self.assertEqual(binlog_event.event._is_event_valid, True)
self.assertNotEqual(wrong_event.event._is_event_valid, True)


class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
Expand Down