-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfs2json.py
126 lines (110 loc) · 3.98 KB
/
fs2json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os, re
import sys
import hashlib
from .xdict import XDict
LOG = None
CDIR = os.path.dirname(__file__)
def to_bytes(obj, charset='utf-8', errors='strict'):
if obj is None:
return None
if isinstance(obj, (bytes, bytearray)):
return bytes(obj)
if isinstance(obj, str):
return obj.encode(charset, errors)
raise TypeError('Expected bytes')
def md5_hash(text):
"""Generate an md5 hash with the given text."""
return hashlib.md5(to_bytes(text)).hexdigest()
LAST_ID = None
FILE_MASK_REX = re.compile('(.(?!\.min))+.\.(js|py|css|html|vuepy|pyj)$', flags= re.I)
def get_id():
global LAST_ID
LAST_ID += 1
return '0' if LAST_ID == 0 else 'ID%s' % LAST_ID
def safe_read(fp):
with open(fp, 'r', encoding = 'utf-8') as f:
ret = f.read()
return ret
def get_file(fp, parent_id ):
stat = os.stat(fp)
content = safe_read(fp)
ret = dict( id = get_id(),
name = os.path.split(fp)[1],
parent = parent_id,
content = content,
ctime = int(stat.st_ctime * 1000),
mtime = int(stat.st_mtime * 1000),
md5_hash = md5_hash(content)
)
return ret
def get_dir(pth_to_dir, parent_id, files, dirs, dir_list = None, file_mask = FILE_MASK_REX ):
# {id:0, name: '', parent: None, content: []}
ret = dict(id = get_id(),
name = os.path.split(pth_to_dir)[1],
parent = parent_id,
content = [])
for it in os.listdir(pth_to_dir):
fp = os.path.join(pth_to_dir, it)
if os.path.isfile(fp):
if file_mask.match(it):
fl = get_file(fp, ret['id'])
files[fl['id']] = fl
ret['content'].append(fl['id'])
else:
continue
elif dir_list != None:
sub_dir_list = dir_list == '*' and '*' or \
dir_list.get(it, dir_list.get('*') and '*')
if sub_dir_list != None:
d = get_dir(fp, ret['id'], files, dirs, sub_dir_list, file_mask)
dirs[d['id']] = d
ret['content'].append(d['id'])
return ret
def dir_to_fs(root_d, dir_list = None,file_mask = FILE_MASK_REX):
global LAST_ID
LAST_ID = -1
dirs = {}
files = {}
root = get_dir(root_d, None, files, dirs, dir_list, file_mask)
root['name'] = ''
dirs[root['id']] = root
return dict(files = files, dirs = dirs, last_id = LAST_ID)
def validate_fdata(fdata, app_folder, must_exist = False):
fdata = XDict(fdata)
ret = XDict(md5_hash = None, error = '', os_path = None)
pth = fdata.path.strip()
sanitize_pth_re = re.compile(r'\s*(\\|/)*([^\s]*)\s*$')
pth = sanitize_pth_re.match(pth).groups()[1]
ret.os_path = os_path = os.path.join(app_folder, pth)
if must_exist and not os.path.exists(os_path):
ret.error = 'it seems that path does not exist: %s' % os_path
elif os.path.isdir(os_path):
ret.error = 'path to a file was expected: %s [%s]' % (os_path, fdata.path)
elif os.path.isfile(os_path):
if not fdata.md5_hash:
ret.error = 'md5_hash is required'
elif md5_hash(safe_read(os_path)) != fdata.md5_hash:
ret.error = 'file was changed on disk'
elif os.path.exists(os_path):
ret.error = 'path exists but it`s to never : %s' % os_path
return ret
def write_file(fdata, app_folder):
ret = validate_fdata(fdata, app_folder)
if ret.error:
return ret
content = fdata.get('content', '')
if content and hasattr(content, 'read'):
content = content.read()
else:
content = content.encode('utf8')
with open(ret.os_path, 'wb') as fl:
fl.write(content or b'')
ret.md5_hash = md5_hash(content)
return ret
def del_file(fdata, app_folder):
ret = validate_fdata(fdata, app_folder, must_exist = True)
if ret.error:
return ret
os.unlink(ret.os_path)
ret.msg = 'done'
return ret