-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patht.py
166 lines (133 loc) · 5.32 KB
/
t.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
import random
import time
import uuid
from argparse import ArgumentParser, RawTextHelpFormatter
import psycopg
from psycopg.errors import SerializationFailure, Error
from psycopg.rows import namedtuple_row
def create_accounts(conn):
id1 = uuid.uuid4()
id2 = uuid.uuid4()
with conn.cursor() as cur:
cur.execute(
"CREATE TABLE IF NOT EXISTS accounts (id UUID PRIMARY KEY, balance INT)"
)
cur.execute(
"UPSERT INTO accounts (id, balance) VALUES (%s, 1000), (%s, 250)", (id1, id2))
print("create_accounts(): status message: %s",
cur.statusmessage)
return [id1, id2]
def delete_accounts(conn):
with conn.cursor() as cur:
cur.execute("DELETE FROM accounts")
print("delete_accounts(): status message: %s",cur.statusmessage)
def print_balances(conn):
with conn.cursor() as cur:
print(f"Balances at {time.asctime()}:")
for row in cur.execute("SELECT id, balance FROM accounts"):
print("account id: {0} balance: ${1:2d}".format(row.id, row.balance))
def transfer_funds(conn, frm, to, amount):
with conn.cursor() as cur:
# Check the current balance.
cur.execute("SELECT balance FROM accounts WHERE id = %s", (frm,))
from_balance = cur.fetchone()[0]
if from_balance < amount:
raise RuntimeError(
f"insufficient funds in {frm}: have {from_balance}, need {amount}"
)
# Perform the transfer.
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s", (
amount, frm)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s", (
amount, to)
)
print("transfer_funds(): status message: %s", cur.statusmessage)
def run_transaction(conn, op, max_retries=3):
"""
Execute the operation *op(conn)* retrying serialization failure.
If the database returns an error asking to retry the transaction, retry it
*max_retries* times before giving up (and propagate it).
"""
# leaving this block the transaction will commit or rollback
# (if leaving with an exception)
with conn.transaction():
for retry in range(1, max_retries + 1):
try:
op(conn)
# If we reach this point, we were able to commit, so we break
# from the retry loop.
return
except SerializationFailure as e:
# This is a retry error, so we roll back the current
# transaction and sleep for a bit before retrying. The
# sleep time increases for each failed transaction.
print("got error: %s", e)
conn.rollback()
print("EXECUTE SERIALIZATION_FAILURE BRANCH")
sleep_seconds = (2**retry) * 0.1 * (random.random() + 0.5)
print("Sleeping %s seconds", sleep_seconds)
time.sleep(sleep_seconds)
except psycopg.Error as e:
print("got error: %s", e)
print("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
raise e
raise ValueError(
f"transaction did not succeed after {max_retries} retries")
def main():
opt = parse_cmdline()
try:
# Attempt to connect to cluster with connection string provided to
# script. By default, this script uses the value saved to the
# DATABASE_URL environment variable.
# For information on supported connection string formats, see
# https://www.cockroachlabs.com/docs/stable/connect-to-the-database.html.
db_url = opt.dsn
conn = psycopg.connect(db_url,
application_name="$ docs_simplecrud_psycopg3",
row_factory=namedtuple_row)
ids = create_accounts(conn)
print_balances(conn)
amount = 100
toId = ids.pop()
fromId = ids.pop()
try:
run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))
except ValueError as ve:
# Below, we print the error and continue on so this example is easy to
# run (and run, and run...). In real code you should handle this error
# and any others thrown by the database interaction.
print("run_transaction(conn, op) failed: %s", ve)
pass
except psycopg.Error as e:
print("got error: %s", e)
raise e
print_balances(conn)
delete_accounts(conn)
except Exception as e:
print("database connection failed")
print(e)
return
def parse_cmdline():
parser = ArgumentParser(description=__doc__,
formatter_class=RawTextHelpFormatter)
parser.add_argument("-v", "--verbose",
action="store_true", help="print debug info")
parser.add_argument(
"dsn",
default=os.environ.get("DATABASE_URL"),
nargs="?",
help="""\
database connection string\
(default: value of the DATABASE_URL environment variable)
""",
)
opt = parser.parse_args()
if opt.dsn is None:
parser.error("database connection string not set")
return opt
if __name__ == "__main__":
main()