-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflow_control.py
124 lines (98 loc) · 3.22 KB
/
flow_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import sys
assert sys.version_info.major == 3
assert sys.version_info.minor == 9
import inspect
import struct
import dis
from itertools import count
from mem_view import Mem, _p_hex
locals().update(dis.opmap)
class Frame:
def __init__(self, frame):
self.code = frame.f_code.co_code
self.code_view = Mem.view(self.code)
self.pos = frame.f_lasti
@property
def current_opcode(self):
return self.code[self.pos]
@property
def last_opcode(self):
return self.code[-2]
def __len__(self):
return len(self.code)
def __setitem__(self, pos, patch):
assert len(patch) <= len(self.code_view), f"len(patch) = {len(patch)} > len(code) = {len(code)}"
assert 0 <= pos <= len(self.code_view) - len(patch), f"Index {pos:d} out of range [0, {len(self.code_view) - len(patch) - 1}]"
self.code_view[pos:pos + len(patch)] = patch
def patch(self, patch, pos, anchor="head"):
if anchor == "head":
self[pos] = patch
elif anchor == "current":
self[pos + self.pos] = patch
else:
raise NotImplementedError
def return_(what):
frame = Frame(inspect.currentframe().f_back)
assert frame.current_opcode == CALL_FUNCTION
frame.patch(bytes([RETURN_VALUE, 0]) * 2, 0, "current")
return what
def _jump_absolute(i):
bts = i.to_bytes((i.bit_length() + 7) // 8, byteorder="big")
assert len(bts) < 5
return bytes(sum((
[EXTENDED_ARG, b]
for b in bts[:-1]
), []) + [JUMP_ABSOLUTE, bts[-1]])
def return2(what):
frame = Frame(inspect.currentframe().f_back)
assert frame.current_opcode == CALL_FUNCTION
assert frame.last_opcode == RETURN_VALUE
frame.patch(bytes([NOP, 0]) + _jump_absolute(len(frame) - 2), 0, "current")
return what
def permajump(where, anchor="head", offset=2):
frame = Frame(inspect.currentframe().f_back)
if anchor == "head":
pass
elif anchor == "current":
where += frame.pos
else:
raise NotImplementedError
assert 0 <= where < len(frame), f"Cannot jump to {where:d}"
assert frame.current_opcode == CALL_FUNCTION
frame.patch(_jump_absolute(where), offset + 2, "current")
return
def jump(where, anchor="head"):
assert where >= 2, "Cannot jump to the top of the frame"
frame = Frame(inspect.currentframe().f_back)
backup = bytearray(frame.code)
if anchor == "head":
pass
elif anchor == "current":
where += frame.pos
else:
raise NotImplementedError
assert 0 <= where < len(frame), f"Cannot jump to {where:d}"
assert frame.current_opcode == CALL_FUNCTION
frame.patch(_jump_absolute(where - 2), 2, "current")
frame.patch(bytes([CALL_FUNCTION, 0]), where - 2)
print(dis.dis(frame.code))
def _restore_backup():
frame.patch(bytes(backup), 0)
return _restore_backup
if __name__ == "__main__":
def a():
return2("h" + "acked123"[:-3])
return 2
assert a() == "hacked"
def a():
x = "hacked"
permajump(8, "current")
x = 42
return x
assert a() == "hacked"
def a():
x = "hacked"
jump(8, "current")
x = 42
return x
assert a() == "hacked"