-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfilesystem.py
144 lines (125 loc) · 4.96 KB
/
filesystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""(Local) filesystem based Celery application."""
import logging
import os
from typing import Any, Dict, Generator, Optional
from celery import Celery
from kombu.message import Message
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads
from ..utils import makedirs
logger = logging.getLogger(__name__)
def _get_fs_config(
wdir: str,
mkdir: bool = False,
task_serializer: str = "json",
result_serializer: str = "json",
) -> Dict[str, Any]:
broker_path = os.path.join(wdir, "broker")
broker_in_path = _unc_path(os.path.join(broker_path, "in"))
broker_processed_path = _unc_path(os.path.join(broker_path, "processed"))
result_path = os.path.join(wdir, "result")
if mkdir:
for path in (broker_in_path, broker_processed_path, result_path):
makedirs(path, exist_ok=True)
return {
"broker_url": "filesystem://",
"broker_transport_options": {
"data_folder_in": broker_in_path,
"data_folder_out": broker_in_path,
"processed_folder": broker_processed_path,
"store_processed": True,
},
"result_backend": f"file://{_unc_path(result_path)}",
"result_persistent": True,
"task_serializer": task_serializer,
"result_serializer": result_serializer,
"accept_content": [task_serializer],
}
def _unc_path(path: str) -> str:
# Celery/Kombu URLs only take absolute filesystem paths
# (UNC paths on windows)
path = os.path.abspath(path)
if os.name != "nt":
return path
drive, tail = os.path.splitdrive(path.replace(os.sep, "/"))
if drive.endswith(":"):
return f"//?/{drive}{tail}"
return f"{drive}{tail}"
class FSApp(Celery):
"""Local filesystem-based Celery application.
Uses Kombu filesystem:// broker and results backend
"""
def __init__(
self,
*args,
wdir: Optional[str] = None,
mkdir: bool = False,
task_serializer: str = "json",
result_serializer: str = "json",
**kwargs: Any,
):
"""Construct an FSApp.
Arguments:
wdir: App broker/results directory. Defaults to current working
directory.
mkdir: Create broker/results subdirectories if they do not already
exist.
task_serializer: Default task serializer.
result_serializer: Default result serializer.
Additional arguments will be passed into the Celery constructor.
"""
if "broker" in kwargs or "backend" in kwargs:
logger.warning("Broker/Results config will be overridden")
super().__init__(*args, **kwargs)
self.wdir = wdir or os.getcwd()
self.conf.update(
_get_fs_config(
self.wdir,
mkdir=mkdir,
task_serializer=task_serializer,
result_serializer=result_serializer,
)
)
logger.debug("Initialized filesystem:// app in '%s'", wdir)
def iter_queued(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
"""Iterate over queued tasks which have not been taken by a worker.
Arguments:
queue: Optional name of queue.
"""
queue = queue or self.conf.task_default_queue
with self.connection_for_read() as conn: # type: ignore[attr-defined]
with conn.channel() as channel:
for filename in sorted(os.listdir(channel.data_folder_in)):
with open(
os.path.join(channel.data_folder_in, filename), "rb"
) as fobj:
payload = fobj.read()
msg = channel.Message(
loads(bytes_to_str(payload)), channel=channel
)
delivery_info = msg.properties.get("delivery_info", {})
if delivery_info.get("routing_key") == queue:
yield msg
def iter_processed(
self, queue: Optional[str] = None
) -> Generator[Message, None, None]:
"""Iterate over tasks which have been taken by a worker.
Arguments:
queue: Optional name of queue.
"""
queue = queue or self.conf.task_default_queue
with self.connection_for_read() as conn: # type: ignore[attr-defined]
with conn.channel() as channel:
for filename in sorted(os.listdir(channel.processed_folder)):
with open(
os.path.join(channel.processed_folder, filename), "rb"
) as fobj:
payload = fobj.read()
msg = channel.Message(
loads(bytes_to_str(payload)), channel=channel
)
delivery_info = msg.properties.get("delivery_info", {})
if delivery_info.get("routing_key") == queue:
yield msg