-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
517 lines (427 loc) · 17.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
import argparse
import html
import logging
import os
import re
import unicodedata
from datetime import datetime, timedelta, timezone
import pytz
import requests
import yaml
from bs4 import BeautifulSoup
from jinja2 import Environment, FileSystemLoader, select_autoescape
from munch import munchify
# load config from main.yaml into Munch object, exit if the file is not found
try:
with open("config.yaml", "r") as file:
config = munchify(yaml.safe_load(file))
except FileNotFoundError:
print("Config file not found")
exit(1)
# set timezone constant
TIMEZONE = pytz.timezone(config.timezone)
# set file constants
SOURCE_EXTRALIST_FILE = os.path.join(config.source.dir, config.source.extralist)
SOURCE_FIXLIST_FILE = os.path.join(config.source.dir, config.source.fixlist)
SOURCE_SKIPLIST_FILE = os.path.join(config.source.dir, config.source.skiplist)
DATA_SOURCE_FILE = os.path.join(config.data.dir, config.data.source)
DATA_SUMMARY_FILE = os.path.join(config.data.dir, config.data.summary)
DATA_ERRORS_FILE = os.path.join(config.data.dir, config.data.errors)
OUTPUT_YEAR_FILE = os.path.join(config.output.dir, config.output.year)
# define enum for status
class Status:
EMPTY_SOURCE = "EMPTY_SOURCE"
PLAYTIME_MISSING = "PLAYTIME_MISSING"
PLAYTIME_TOO_HIGH = "PLAYTIME_TOO_HIGH"
OK = "OK"
# get arguments from argparser
# argument year is required
parser = argparse.ArgumentParser()
parser.add_argument("--year", type=int, required=True, help="Year to process")
args = parser.parse_args()
# initialize logger, set output to console
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
def fetch_discussion(from_id=None):
# construct the base URL for the request
url = f"{config.nyx.api_url}/{config.nyx.discussion_id}?{config.nyx.query_base}"
# add the query_previous to URL if from_id is set
url += config.nyx.query_previous.format(from_id=from_id) if from_id else ""
logger.info(f"Fetching {url}")
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
exit(1)
return response.json()
def convert_html_to_plaintext(text):
# unescape HTML entities
text = html.unescape(text)
# replace <br> with newlines
text = text.replace("<br>", "\n")
# remove all remaining HTML tags
text = BeautifulSoup(text, "html.parser").get_text()
# strip blank lines
text = "\n".join([line for line in text.split("\n") if line.strip()])
return text
def find_source_line(text):
# find the first line that contains #dohrano or #dohráno
for line in text.split("\n"):
if "#dohrano" in line or "#dohráno" in line:
return line
return None
def get_source_parts(text):
if text is None:
return None
# split text into parts by pipes, backslashes or slashes (by default), trim whitespaces
# remove #dohrano or #dohráno from parts
separator = "|" if "|" in text else "\\" if "\\" in text else "/"
parts = [
part.replace("#dohrano", "").replace("#dohráno", "").strip()
for part in text.split(separator)
]
# remove parts that are empty
parts = [part for part in parts if part]
return parts
def convert_parts_to_data(parts):
if parts is None:
return None
data = []
# iterate through the parts to determine the type of each part
for index, part in enumerate(parts):
# if this is the first part, assume it's a name of a game
if index == 0:
type = "game"
value = part
else:
# if the first character is a digit, assume it's a playtime
if part[0].isdigit():
# get the pure number from the part:
# there might be a dot or comma in the middle of the part
# there might be an unit at the end of the part
# there are various units
# unit might not be separated by a space
# (e.g. "5hod" "1h" "3 hodiny" "2,5 hod" "6.5h")
value = re.match("^(\d*[,\.]?\d*)", part).group(1)
# replace comma with dot
value = value.replace(",", ".")
# try to convert value to an float
try:
type = "playtime"
value = float(value)
except ValueError:
type = "error"
value = None
# else it's probably a platform
else:
type = "platform"
value = part
data.append(
{
"type": type,
"original": part,
"value": value,
}
)
# if there is no playtime in the data, try some magic
if get_status(data) == Status.PLAYTIME_MISSING:
# if there is a platform in the data, assume it can be a playtime
# maybe in some weird format, e.g. "pres 35 hodin"
for part in data:
if part["type"] == "platform" and "hod" in unicodedata.normalize(
"NFKD", part["value"]
):
for chunk in part["value"].split(" "):
try:
value = re.match("^(\d*[,\.]?\d*)", chunk).group(1)
value = value.replace(",", ".")
part["type"] = "playtime"
part["value"] = float(value)
break
except ValueError:
part["type"] = "platform"
pass
return data
# TODO: refactor this to a universal function for extralist and fixlist
def convert_extra_to_data(extra):
data = []
# iterate through the extra and create a data structure
for field in ["id", "username", "inserted_at", "game", "platform", "playtime"]:
if field in extra:
data.append(
{
"type": field,
"original": "",
"value": extra[field],
}
)
return data
def convert_fix_to_data(fix):
data = []
# iterate through the fix and create a data structure
for field in ["game", "platform", "playtime"]:
if field in fix:
data.append(
{
"type": field,
"original": "",
"value": fix[field],
}
)
return data
def get_status(data, force=False):
if force:
return Status.OK
if data is None:
return Status.EMPTY_SOURCE
# check if there is a playtime in the data
playtime = [part for part in data if part["type"] == "playtime"]
if not playtime:
return Status.PLAYTIME_MISSING
# check that playtime is lower than config.playtime_max
if playtime[0]["value"] > config.playtime_max:
return Status.PLAYTIME_TOO_HIGH
return Status.OK
def main():
# set the minimum and maximum date of the posts using datetime
date_min = datetime(args.year, 1, 1, tzinfo=TIMEZONE)
date_max = datetime(args.year + 1, 1, 1, tzinfo=TIMEZONE)
logger.info(f"Date range: {date_min} - {date_max}")
# create the data and output directory if they doesn't exist
os.makedirs(config.data.dir, exist_ok=True)
os.makedirs(config.output.dir, exist_ok=True)
os.makedirs(config.source.dir, exist_ok=True)
# set data and output variables
data_source_file = DATA_SOURCE_FILE.format(year=args.year)
data_summary_file = DATA_SUMMARY_FILE.format(year=args.year)
data_errors_file = DATA_ERRORS_FILE.format(year=args.year)
output_year_file = OUTPUT_YEAR_FILE.format(year=args.year)
# load extralist from YAML file into Munch object, use emtpy list if the file is not found
try:
with open(SOURCE_EXTRALIST_FILE, "r") as file:
extralist = munchify(yaml.safe_load(file))
except FileNotFoundError:
extralist = []
# load fixlist from YAML file into Munch object, use emtpy list if the file is not found
try:
with open(SOURCE_FIXLIST_FILE, "r") as file:
fixlist = munchify(yaml.safe_load(file))
except FileNotFoundError:
fixlist = []
# create a dictionary from fixlist with post id as key
fixlist_by_id = {fix.id: fix for fix in fixlist}
# load skiplist from YAML file into Munch object, use emtpy list if the file is not found
try:
with open(SOURCE_SKIPLIST_FILE, "r") as file:
skiplist = munchify(yaml.safe_load(file))
except FileNotFoundError:
skiplist = []
# set empty data variables
data_source_by_username = {}
data_summary_by_username = {}
data_errors = []
# TODO: refactor this to a function
# iterate over extralist and append data to data_source_by_username
for extra in extralist:
# if inserted_at of the post is lower than date_min or higher than date_max, skip the post
post_inserted_at = datetime.fromisoformat(extra.inserted_at).replace(
tzinfo=TIMEZONE
)
if post_inserted_at < date_min or post_inserted_at > date_max:
logger.info(f"Skipping extra post {extra.id} (not in date range)")
continue
logger.info(f"Added extra post {extra.id}")
source_data = convert_extra_to_data(extra)
status = get_status(source_data)
data = {
"id": extra.id,
"status": status,
"url": config.nyx.post_url.format(
discussion_id=config.nyx.discussion_id, post_id=extra.id
),
"username": extra.username,
"inserted_at": extra.inserted_at,
"content": "",
"source_line": "",
"source_parts": [],
"source_data": source_data,
}
if extra.username in data_source_by_username:
data_source_by_username[extra.username].append(data)
else:
data_source_by_username[extra.username] = [data]
if status != Status.OK:
data_errors.append(data)
# fetch the first (newest) page of the discussion
discussion = fetch_discussion()
while True:
for post in discussion["posts"]:
# TODO: refactor this to a function
# if inserted_at of the post is lower than date_min or higher than date_max, skip the post
post_inserted_at = datetime.fromisoformat(post["inserted_at"]).replace(
tzinfo=TIMEZONE
)
if post_inserted_at < date_min or post_inserted_at > date_max:
logger.info(f"Skipping post {post['id']} (not in date range)")
continue
# transform the post content to source data and get the status
content = convert_html_to_plaintext(post["content"])
source_line = find_source_line(content)
source_parts = get_source_parts(source_line)
source_data = convert_parts_to_data(source_parts)
status = get_status(source_data)
# if the status is not OK, decide what to do with the post
if status != Status.OK:
# if the post is in the skiplist, skip the post
if post["id"] in skiplist:
logger.info(f"Skipping post {post['id']} (in skiplist)")
continue
# if the post is in the fixlist, use the fixlist data
if post["id"] in fixlist_by_id:
logger.info(f"Fixing post {post['id']} (in fixlist)")
source_data = convert_fix_to_data(fixlist_by_id[post["id"]])
status = get_status(source_data, force=True)
# log the status of the post
logger.info(f"Post {post['id']} {status}")
# prepare data structure
data = {
"id": post["id"],
"status": status,
"url": config.nyx.post_url.format(
discussion_id=config.nyx.discussion_id, post_id=post["id"]
),
"username": post["username"],
"inserted_at": post["inserted_at"],
"content": content,
"source_line": source_line,
"source_parts": source_parts,
"source_data": source_data,
}
# append data to data_source_by_username
if post["username"] in data_source_by_username:
data_source_by_username[post["username"]].append(data)
else:
data_source_by_username[post["username"]] = [data]
# if status is not OK, append data to data_errors
if status != Status.OK:
data_errors.append(data)
# get the last post
last_post = discussion["posts"][-1]
# if inserted_at of the last_post is lower than date_min, break the loop
last_post_inserted_at = datetime.fromisoformat(
last_post["inserted_at"]
).replace(tzinfo=TIMEZONE)
if last_post_inserted_at < date_min:
break
# use the last_post id as the from_id for the next request
discussion = fetch_discussion(from_id=last_post["id"])
# iterate over data_source_by_username and sort the data by inserted_at
for username, data in data_source_by_username.items():
data_source_by_username[username] = sorted(
data, key=lambda post: post["inserted_at"], reverse=True
)
# create a dictionary from data_source_by_username with username as key
# and boolean value that indicates if the newest post is not older than one week
data_newest_by_username = {
username: datetime.fromisoformat(data[0]["inserted_at"]).replace(
tzinfo=TIMEZONE
)
> datetime.now(TIMEZONE) - timedelta(days=config.newest_days)
for username, data in data_source_by_username.items()
}
# iterate over data_source_by_username and create data_summary_by_username
for username, data in data_source_by_username.items():
# count the number of posts with status OK
count = len([post for post in data if post["status"] == Status.OK])
if count > 0:
# sum the playtime of posts with status OK
playtime = sum(
[
[
part
for part in post["source_data"]
if part["type"] == "playtime"
][0]["value"]
for post in data
if post["status"] == Status.OK
]
)
# format playtime without decimal places if there is no decimal part
playtime = int(playtime) if playtime == int(playtime) else playtime
# create a list of games with status OK
games = [
{
"name": [
part for part in post["source_data"] if part["type"] == "game"
][0]["value"],
"playtime": [
part
for part in post["source_data"]
if part["type"] == "playtime"
][0]["value"],
"date": datetime.fromisoformat(post["inserted_at"])
.replace(tzinfo=TIMEZONE)
.strftime("%-d.%-m."),
"url": post["url"],
}
for post in data
if post["status"] == Status.OK
]
# format playtime without decimal places if there is no decimal part
for game in games:
game["playtime"] = (
int(game["playtime"])
if game["playtime"] == int(game["playtime"])
else game["playtime"]
)
# append data to data_summary_by_username
data_summary_by_username[username] = {
"count": count,
"playtime": playtime,
"games": games,
}
# sort data_summary_by_username by playtime
data_summary_by_username = {
k: v
for k, v in sorted(
data_summary_by_username.items(),
key=lambda item: item[1]["playtime"],
reverse=True,
)
}
# save data_source_by_username as YAML file
with open(data_source_file, "w", encoding="utf8") as file:
yaml.dump(data_source_by_username, file, allow_unicode=True)
# save data_summary_by_username as YAML file
with open(data_summary_file, "w", encoding="utf8") as file:
yaml.dump(data_summary_by_username, file, allow_unicode=True)
# save data_errors as YAML file
with open(data_errors_file, "w", encoding="utf8") as file:
yaml.dump(data_errors, file, allow_unicode=True)
# create a Jinja2 environment
env = Environment(
loader=FileSystemLoader(config.templates.dir), autoescape=select_autoescape()
)
# load and render the template
template = env.get_template(config.templates.main)
html = template.render(
summary=data_summary_by_username,
newest=data_newest_by_username,
max_playtime=max(
[summary["playtime"] for summary in data_summary_by_username.values()]
),
generated_at=datetime.now(TIMEZONE).strftime("%-d.%-m.%Y @ %H:%M:%S"),
years=config.years,
year_current=args.year,
base_url=config.base_url,
)
# save the rendered template as HTML file
with open(output_year_file, "w") as file:
file.write(html)
if __name__ == "__main__":
main()