-
Notifications
You must be signed in to change notification settings - Fork 4
/
bwtv.py
262 lines (212 loc) · 7.56 KB
/
bwtv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from base64 import b64decode
from configparser import ConfigParser, NoSectionError, NoOptionError
from hashlib import sha512
from os import environ, getpid
from os.path import expanduser
from subprocess import check_output, CalledProcessError
from bundlewrap.exceptions import FaultUnavailable
from bundlewrap.utils import Fault
from bundlewrap.utils.text import bold, mark_for_translation as _, yellow
from bundlewrap.utils.ui import io
from passlib.hash import apr_md5_crypt, sha512_crypt
from requests import Session
from requests.exceptions import RequestException
CONFIG_PATH = expanduser(environ.get("BW_TEAMVAULT_SECRETS_FILE", "~/.bw_teamvault_secrets.cfg"))
DUMMY_MODE = environ.get("BW_TEAMVAULT_DUMMY_MODE", "0") == "1"
cache = {}
config = ConfigParser()
try:
config.read([CONFIG_PATH])
except Exception:
io.stderr("{x} WARNING: Unable to read TeamVault config at {path}".format(
path=CONFIG_PATH,
x=yellow("!"),
))
sessions = {}
cached_credentials = {}
class BWTVCredentialsError(Exception):
pass
def load_site_credentials(site):
try:
if site not in cached_credentials:
if (
('password' not in config[site] or not config[site]['password'])
and 'pass_command' in config[site]
):
try:
with io.job(_("{tv} running pass_command for site {site}").format(tv=bold("TeamVault"), site=site)):
password = check_output(
config[site]['pass_command'],
shell=True
).decode('UTF-8').splitlines()[0].strip()
except (FileNotFoundError, CalledProcessError, IndexError) as e:
# To avoid trying to get the password over and over.
cached_credentials[site] = None
raise FaultUnavailable from e
else:
cached_credentials[site] = (
config.get(site, "url"),
config.get(site, 'username'),
password,
)
else:
cached_credentials[site] = (
config.get(site, "url"),
config.get(site, "username"),
config.get(site, "password"),
)
except (KeyError, NoSectionError, NoOptionError):
raise BWTVCredentialsError(
"Could not find TeamVault credentials for '{site}' in {path}".format(
path=CONFIG_PATH,
site=site,
)
)
def _fetch_secret(site, secret_id):
try:
return cache[site][secret_id]
except KeyError:
pass
session = sessions.setdefault(getpid(), Session())
try:
load_site_credentials(site)
except BWTVCredentialsError:
raise FaultUnavailable(
"Tried to get TeamVault secret with ID '{secret_id}' "
"from site '{site}', but credentials missing in {path}".format(
path=CONFIG_PATH,
secret_id=secret_id,
site=site,
),
)
if cached_credentials[site] is None:
raise FaultUnavailable(
"Getting credentials for {site} failed in earlier try".format(
site=site,
),
)
full_url = "{}/api/secrets/{}/".format(
cached_credentials[site][0],
secret_id,
)
try:
with io.job(_("{tv} fetching {secret}").format(tv=bold("TeamVault"), secret=secret_id)):
response = session.get(full_url, auth=cached_credentials[site][1:3])
except RequestException as e:
raise FaultUnavailable(
"Exception while getting secret {secret} from TeamVault: {exc}".format(
secret=secret_id,
exc=repr(e),
)
)
if response.status_code != 200:
raise FaultUnavailable(
"TeamVault returned {status} for secret {secret} with url {url}".format(
status=response.status_code,
secret=secret_id,
url=full_url,
)
)
secret = response.json()
try:
response = session.get(secret['current_revision'] + "data", auth=cached_credentials[site][1:3])
except RequestException as e:
raise FaultUnavailable(
"Exception while getting secret {secret} from TeamVault: {exc}".format(
secret=secret_id,
exc=repr(e),
)
)
if response.status_code != 200:
raise FaultUnavailable(
"TeamVault returned {status} for secret {secret} with url {url}".format(
status=response.status_code,
secret=secret_id,
url=secret['current_revision'] + "data",
)
)
secret['data'] = response.json()
cache.setdefault(site, {})[secret_id] = secret
return secret
def _file(secret_id=None, site=None):
if DUMMY_MODE:
return "TEAMVAULT DUMMY CONTENT"
else:
secret = _fetch_secret(site, secret_id)
return b64decode(secret['data']['file']).decode('utf-8')
def file(secret_id, site="default"):
return Fault("bwtv file", _file, secret_id=secret_id, site=site)
def _file_as_base64(secret_id=None, site=None):
if DUMMY_MODE:
return "TEAMVAULT DUMMY CONTENT"
else:
secret = _fetch_secret(site, secret_id)
return secret['data']['file']
def file_as_base64(secret_id, site="default"):
return Fault(
"bwtv file_as_base64",
_file_as_base64,
secret_id=secret_id,
site=site,
)
def _htpasswd_entry(secret_id=None, site=None):
if DUMMY_MODE:
return "TEAMVAULT:DUMMYCONTENT"
else:
secret = _fetch_secret(site, secret_id)
return "{}:{}".format(
secret['username'],
apr_md5_crypt.encrypt(
secret['data']['password'],
salt=sha512(secret_id.encode('utf-8')).hexdigest()[:8],
),
)
def htpasswd_entry(secret_id, site="default"):
return Fault(
"bwtv htpasswd_entry",
_htpasswd_entry,
secret_id=secret_id,
site=site,
)
def _password(secret_id=None, site=None):
if DUMMY_MODE:
return "TEAMVAULT_DUMMY_CONTENT"
else:
secret = _fetch_secret(site, secret_id)
return secret['data']['password']
def password(secret_id, site="default"):
return Fault("bwtv password", _password, secret_id=secret_id, site=site)
def _password_crypt_sha512(secret_id=None, site=None):
if DUMMY_MODE:
return "TEAMVAULT_DUMMY_CONTENT"
else:
secret = _fetch_secret(site, secret_id)
return sha512_crypt.encrypt(
secret['data']['password'],
salt=sha512(secret_id.encode('utf-8')).hexdigest()[:16],
rounds=5000,
)
def password_crypt_sha512(secret_id, site="default"):
return Fault(
"bwtv password_crypt_sha512",
_password_crypt_sha512,
secret_id=secret_id,
site=site,
)
def _username(secret_id=None, site=None):
if DUMMY_MODE:
return "teamvault_dummy_content"
else:
secret = _fetch_secret(site, secret_id)
return secret['username']
def username(secret_id, site="default"):
return Fault("bwtv username", _username, secret_id=secret_id, site=site)
def _format(fault=None, format_string=None):
return format_string.format(fault.value)
def format(fault, format_string):
return Fault(
"bwtv format",
_format,
fault=fault,
format_string=format_string,
)