-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextract_mysql_binlog_stream_to_csv.py
73 lines (63 loc) · 2.2 KB
/
extract_mysql_binlog_stream_to_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Important Notice: This script requires the user
# to have the appropriate permissions.
# SUPER or REPLICATION CLIENT
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication import row_event
import configparser
import pymysqlreplication
import csv
import boto3
# get the MySQL connection info
parser = configparser.ConfigParser()
parser.read('pipeline.conf')
hostname = parser.get('mysql_config', 'hostname')
port = parser.get('mysql_config', 'port')
username = parser.get('mysql_config', 'username')
password = parser.get('mysql_config', 'password')
mysql_settings = {
'host': hostname,
'port': int(port),
'user': username,
'passwd': password
}
b_stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100,
only_events=[
row_event.DeleteRowsEvent,
row_event.WriteRowsEvent,
row_event.UpdateRowsEvent
]
)
order_events = []
for binlogevent in b_stream:
for row in binlogevent.rows: # type: ignore
if binlogevent.table == 'Orders': # type: ignore
event = {}
if isinstance(binlogevent, row_event.DeleteRowsEvent):
event['action'] = 'delete'
event.update(row['values'].items())
elif isinstance(binlogevent, row_event.UpdateRowsEvent):
event['action'] = 'update'
event.update(row['after_values'].items())
elif isinstance(binlogevent, row_event.WriteRowsEvent):
event['action'] = 'insert'
event.update(row['values'].items())
order_events.append(event)
b_stream.close()
keys = order_events[0].keys()
local_filename = 'orders_extract.csv'
with open(
local_filename,
'w',
newline=''
) as output_file:
dict_writer = csv.DictWriter(output_file, keys, delimiter='|')
dict_writer.writerows(order_events)
acces_key = parser.get('aws_boto_credentials', 'access_key')
secret_key = parser.get('aws_boto_credentials', 'secret_key')
bucket_name = parser.get('aws_boto_credentials', 'bucket_name')
s3 = boto3.client('s3', aws_access_key_id=acces_key,
aws_secret_access_key=secret_key)
s3_file = local_filename
s3.upload_file(local_filename, bucket_name, s3_file)