-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstream.py
107 lines (84 loc) · 3.21 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from struct import unpack
class MemoryNotDefined(Exception):
def __init__(self, ea, length, file_offset):
self.message = f'Error: {length} bytes @{ea:04x}..@{ea+length-1:04x} not defined ({file_offset})'
super().__init__(self.message)
class STREAM:
def __init__(self):
self.bin = None # bytearray
self.len = 0
self.pos = 0 # смещение в bin
self.mem_offset = 0 # соответсвующее смещение в памяти
self.file_offset = 0 # соответсвующее смещение в файле
def set_binary(self, binary, mem_offset=0):
self.bin = binary
self.len = len(binary)
self.pos = mem_offset
self.mem_offset = mem_offset
self.file_offset = 0
def load_binary(self, filename, mem_offset=0):
with open(filename, 'rb') as f:
self.set_binary(f.read(), mem_offset)
def load_hex(self, filename, mem_offset=0):
with open(filename, 'r') as f:
content = f.read()
self.set_binary(bytes.fromhex(content), mem_offset)
def get_all(self):
return self.bin[0:self.len] if self.bin is not None else None
def get_block(self, start, end):
length = end-start
p = self.__get_abs_pos(start, length)
result = self.bin[p:p+length]
return result
def is_defined(self, pos, length):
return False if self.__is_defined(pos, length) is None else True
def __is_defined(self, pos, length):
p = pos-self.mem_offset
if length > 0 and (p < 0 or p > self.len-length):
return None
return p
def __get_abs_pos(self, pos, length):
p = self.__is_defined(pos, length)
if p is None:
raise MemoryNotDefined(pos, length, pos-self.mem_offset+self.file_offset)
return p
def __read(self, fmt, length):
p = self.__get_abs_pos(self.pos, length)
result = unpack(fmt, self.bin[p:p+length])
self.pos += length
return result[0]
def peek_char(self):
p = self.__get_abs_pos(self.pos, 1)
return unpack('c', self.bin[p:p + 1])[0]
def read_block(self, length):
p = self.__get_abs_pos(self.pos, length)
result = self.bin[p:p+length]
self.pos += length
return result
def read_stream(self, length):
result = STREAM()
p = self.__get_abs_pos(self.pos, length)
result.set_binary(self.bin[p:p+length])
result.file_offset = p+self.file_offset
self.pos += length
return result
def read_char(self):
return self.__read('c', 1)
def read_byte(self):
return self.__read('B', 1)
def read_word_be(self):
return self.__read('>H', 2)
def read_dword_be(self):
return self.__read('>I', 4)
def read_word_le(self):
return self.__read('<H', 2)
def read_dword_le(self):
return self.__read('<I', 4)
def read_str(self, end=b'\0', encoding='cp1251'):
result = b''
while True:
c = self.read_char()
if c == end:
break
result += c
return result.decode(encoding=encoding, errors='ignore')