-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathradiologyassist.py
88 lines (69 loc) · 3.23 KB
/
radiologyassist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import concurrent.futures as cf
import datetime
import itertools
import pathlib
import string
import sys
from typing import Any, Optional
import diskcache
import requests
FILE_PATH = pathlib.Path(__file__)
PACKAGE_PATH = FILE_PATH.parents[1]
DISKCACHE_PATH = PACKAGE_PATH / '.diskcache' / FILE_PATH.stem
DISKCACHE = diskcache.FanoutCache(directory=str(DISKCACHE_PATH), timeout=1, size_limit=1024 ** 3)
MAX_KEY_LEN = 0 # Note: No additional benefit was observed with lengths 1 or 2.
MAX_WORKERS = 1
EXCLUDED_TESTS = {'SHORT DESCRIPTION', 'Virtual Consultation'}
# Note: If this script freezes during execution, it may be because of diskcache handling a process executor poorly. In this case, either stop and rerun the script, or otherwise use a thread executor instead.
@DISKCACHE.memoize(expire=datetime.timedelta(weeks=4).total_seconds(), tag='get_data')
def get_data(term: str, /) -> Optional[list[dict[str, Any]]]:
# print(f'Reading data for search term {term!r}.')
response = requests.get('https://radiologyassist.com/js/CPTSearch/CPTSearch_Select2.php', params={'term': term, 'type': 'public'})
try:
response.raise_for_status()
except Exception:
print(f'Failed to get valid response for search term {term!r}.', file=sys.stderr)
raise
try:
data = response.json()
except Exception:
print(f'Failed to parse data for search term {term!r}.', file=sys.stderr)
raise
print(f'Read data for search term {term!r}.')
return data
def get_results(term: str, /) -> list[str]:
data = get_data(term)
if not data:
# print(f'No results exist for search term {term!r}.')
return []
results = [d['text'].removeprefix(f'{d['id']} - ').strip() for d in data]
results = [r for r in results if r not in EXCLUDED_TESTS]
for idx, result in enumerate(results.copy()):
if ' - ' not in result:
results[idx] = f'Other - {result}'
return results
def main() -> None:
final_results = set()
for key_len in range(MAX_KEY_LEN + 1):
chars = string.ascii_lowercase
if key_len in (1, 2):
chars += string.digits
# if key_len == 1:
# chars += string.punctuation # No benefit was observed.
keys = [''.join(key) for key in itertools.product(chars, repeat=key_len)]
debugging = bool(sys.gettrace())
executor = cf.ThreadPoolExecutor if debugging else cf.ProcessPoolExecutor
with executor(max_workers=MAX_WORKERS) as executor:
curr_results_groups = executor.map(get_results, keys)
for curr_results in curr_results_groups:
final_results.update(curr_results)
print(f'Obtained a total of {len(final_results)} results until key length {key_len}.')
# output_results = {r['name']: r['description'] for r in final_results.values()}
# output_results = [f'{k}: {v}' for k, v in output_results.items()]
output_results = sorted(final_results)
output_text = '\n'.join(output_results)
output_path = PACKAGE_PATH / 'uploads/RadiologyAssist_tests_list.txt'
print(f'Writing {len(output_results)} results having text length {len(output_text):,} to {output_path}.')
output_path.write_text(output_text)
if __name__ == '__main__':
main()