-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathgpslogger.py
91 lines (66 loc) · 2.7 KB
/
gpslogger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Parse [[https://github.com/mendhak/gpslogger][gpslogger]] .gpx (xml) files
"""
REQUIRES = ["gpxpy"]
from dataclasses import dataclass
from my.config import location
from my.core import Paths
@dataclass
class config(location.gpslogger):
# path[s]/glob to the synced gpx (XML) files
export_path: Paths
# default accuracy for gpslogger
accuracy: float = 50.0
from collections.abc import Iterator, Sequence
from datetime import datetime, timezone
from itertools import chain
from pathlib import Path
import gpxpy
from gpxpy.gpx import GPXXMLSyntaxException
from more_itertools import unique_everseen
from my.core import LazyLogger, Stats
from my.core.cachew import mcachew
from my.core.common import get_files
from .common import Location
logger = LazyLogger(__name__, level="warning")
def _input_sort_key(path: Path) -> str:
if "_" in path.name:
return path.name.split("_", maxsplit=1)[1]
return path.name
def inputs() -> Sequence[Path]:
# gpslogger files can optionally be prefixed by a device id,
# like b5760c66102a5269_20211214142156.gpx
return sorted(get_files(config.export_path, glob="*.gpx", sort=False), key=_input_sort_key)
def _cachew_depends_on() -> list[float]:
return [p.stat().st_mtime for p in inputs()]
# TODO: could use a better cachew key/this has to recompute every file whenever the newest one changes
@mcachew(depends_on=_cachew_depends_on, logger=logger)
def locations() -> Iterator[Location]:
yield from unique_everseen(
chain(*map(_extract_locations, inputs())), key=lambda loc: loc.dt
)
def _extract_locations(path: Path) -> Iterator[Location]:
with path.open("r") as gf:
try:
gpx_obj = gpxpy.parse(gf)
except GPXXMLSyntaxException as e:
logger.warning("failed to parse XML %s: %s", path, e)
return
for track in gpx_obj.tracks:
for segment in track.segments:
for point in segment.points:
if point.time is None:
continue
# hmm - for gpslogger, seems that timezone is always SimpleTZ('Z'), which
# specifies UTC -- see https://github.com/tkrajina/gpxpy/blob/cb243b22841bd2ce9e603fe3a96672fc75edecf2/gpxpy/gpxfield.py#L38
yield Location(
lat=point.latitude,
lon=point.longitude,
accuracy=config.accuracy,
elevation=point.elevation,
dt=datetime.replace(point.time, tzinfo=timezone.utc),
datasource="gpslogger",
)
def stats() -> Stats:
from my.core import stat
return {**stat(locations)}