-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathprocess.py
232 lines (196 loc) · 6.68 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""Managed process module."""
import json
import logging
import multiprocessing as mp
import os
import shlex
import subprocess # nosec B404
from contextlib import AbstractContextManager, ExitStack
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Optional, Union
from funcy import cached_property
from shortuuid import uuid
from ..contrib.kombu_filesystem import LOCK_EX, LOCK_SH, lock, unlock
from ..utils import makedirs
from .exceptions import TimeoutExpired
logger = logging.getLogger(__name__)
@dataclass
class ProcessInfo:
"""Process information."""
pid: int
stdin: Optional[str]
stdout: Optional[str]
stderr: Optional[str]
returncode: Optional[int]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
"""Construct ProcessInfo from the specified dictionary."""
return cls(**data)
@classmethod
def load(cls, filename: str) -> "ProcessInfo":
"""Construct the process information from a file."""
with open(filename, "r", encoding="utf-8") as fobj:
lock(fobj, LOCK_SH)
try:
return cls.from_dict(json.load(fobj))
finally:
unlock(fobj)
def asdict(self) -> Dict[str, Any]:
"""Return this info as a dictionary."""
return asdict(self)
def dump(self, filename: str) -> None:
"""Dump the process information into a file."""
with open(filename, "w", encoding="utf-8") as fobj:
lock(fobj, LOCK_EX)
try:
json.dump(self.asdict(), fobj)
finally:
unlock(fobj)
class ManagedProcess(AbstractContextManager):
"""Class to manage the specified process with redirected output.
stdout and stderr will both be redirected to <name>.out.
Interactive processes (requiring stdin input) are currently unsupported.
"""
def __init__(
self,
args: Union[str, List[str]],
env: Optional[Dict[str, str]] = None,
wdir: Optional[str] = None,
name: Optional[str] = None,
):
"""Construct a MangedProcess.
Arguments:
args: Command to be run.
env: Optional environment variables.
wdir: If specified, redirected output files will be placed in
`wdir`. Defaults to current working directory.
name: Name to use for this process, if not specified a UUID will be
generated instead.
"""
self.args: List[str] = (
shlex.split(args, posix=os.name == "posix")
if isinstance(args, str)
else list(args)
)
self.env = env
self.wdir = wdir
self.name = name or uuid()
self.returncode: Optional[int] = None
self._fd_stack = ExitStack()
self._proc: Optional[subprocess.Popen] = None
def __enter__(self):
if self._proc is None:
self.run()
return self
def __exit__(self, *args, **kwargs):
self.wait()
def _close_fds(self):
with self._fd_stack:
pass
def _make_path(self, path: str) -> str:
return os.path.join(self.wdir, path) if self.wdir else path
@cached_property
def stdout_path(self) -> str:
"""Return redirected stdout path."""
return self._make_path(f"{self.name}.out")
@cached_property
def info_path(self) -> str:
"""Return process information file path."""
return self._make_path(f"{self.name}.json")
@cached_property
def pidfile_path(self) -> str:
"""Return process pidfile path."""
return self._make_path(f"{self.name}.pid")
@property
def info(self) -> "ProcessInfo":
"""Return process information."""
return ProcessInfo(
pid=self.pid,
stdin=None,
stdout=self.stdout_path,
stderr=None,
returncode=self.returncode,
)
@property
def pid(self) -> int:
"""Return process PID.
Raises:
ValueError: Process is not running.
"""
if self._proc is None:
raise ValueError
return self._proc.pid
def _make_wdir(self):
if self.wdir:
makedirs(self.wdir, exist_ok=True)
def _dump(self):
self._make_wdir()
self.info.dump(self.info_path)
with open(self.pidfile_path, "w", encoding="utf-8") as fobj:
fobj.write(str(self.pid))
def run(self):
"""Run this process."""
self._make_wdir()
logger.debug(
"Appending output to '%s'",
self.stdout_path,
)
stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab"))
try:
# pylint: disable=consider-using-with
self._proc = subprocess.Popen( # nosec B603
self.args,
stdin=subprocess.DEVNULL,
stdout=stdout,
stderr=subprocess.STDOUT,
close_fds=True,
shell=False,
env=self.env,
)
self._dump()
except Exception:
if self._proc is not None:
self._proc.kill()
self._close_fds()
raise
def wait(self, timeout: Optional[int] = None) -> Optional[int]:
"""Block until a process started with `run` has completed.
Raises:
TimeoutExpired if `timeout` was set and the process
did not terminate after `timeout` seconds.
"""
if self.returncode is not None or self._proc is None:
return self.returncode
try:
self._proc.wait(timeout=timeout)
except subprocess.TimeoutExpired as exc:
raise TimeoutExpired(exc.cmd, exc.timeout) from exc
self.returncode = self._proc.returncode
self._close_fds()
self._dump()
return self.returncode
@classmethod
def spawn(cls, *args, **kwargs) -> Optional[int]:
"""Spawn a ManagedProcess command in the background.
Returns: The spawned process PID.
"""
proc = _DaemonProcess(
target=cls._spawn,
args=args,
kwargs=kwargs,
daemon=True,
)
proc.start()
# Do not terminate the child daemon when the main process exits
# pylint: disable=protected-access
mp.process._children.discard(proc) # type: ignore[attr-defined]
return proc.pid
@classmethod
def _spawn(cls, *args, **kwargs):
with cls(*args, **kwargs):
pass
class _DaemonProcess(mp.Process):
def run(self):
if os.name != "nt":
os.setpgid(0, 0) # pylint: disable=no-member
super().run()