From 5d3c5836fc3f5233b74bcc29a9ac203b6a258a6a Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Tue, 8 Nov 2022 16:05:27 +0800 Subject: [PATCH] Add locks to process operations to prevent race conditions. fix: #93 Our old solution on #53, can reduce possibility of race condition, but it still happens occasionally. 1. Add locks to write and read operations to prevent this kind of error. 2. Remove old file replacing solution. 3. Use reraise to simplify the code in `__getitem__` --- src/dvc_task/proc/manager.py | 6 ++---- src/dvc_task/proc/process.py | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/dvc_task/proc/manager.py b/src/dvc_task/proc/manager.py index 374d3d0..9a0d3ed 100644 --- a/src/dvc_task/proc/manager.py +++ b/src/dvc_task/proc/manager.py @@ -45,12 +45,10 @@ def __iter__(self) -> Generator[str, None, None]: for name in os.listdir(self.wdir): yield name + @reraise(FileNotFoundError, KeyError) def __getitem__(self, key: str) -> "ProcessInfo": info_path = self._get_info_path(key) - try: - return ProcessInfo.load(info_path) - except FileNotFoundError as exc: - raise KeyError from exc + return ProcessInfo.load(info_path) @reraise(FileNotFoundError, KeyError) def __setitem__(self, key: str, value: "ProcessInfo"): diff --git a/src/dvc_task/proc/process.py b/src/dvc_task/proc/process.py index ba76d81..63dff3a 100644 --- a/src/dvc_task/proc/process.py +++ b/src/dvc_task/proc/process.py @@ -12,6 +12,7 @@ from funcy import cached_property from shortuuid import uuid +from ..contrib.kombu_filesystem import LOCK_EX, LOCK_SH, lock, unlock from ..utils import makedirs from .exceptions import TimeoutExpired @@ -37,7 +38,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo": def load(cls, filename: str) -> "ProcessInfo": """Construct the process information from a file.""" with open(filename, "r", encoding="utf-8") as fobj: - return cls.from_dict(json.load(fobj)) + lock(fobj, LOCK_SH) + try: + return cls.from_dict(json.load(fobj)) + finally: + unlock(fobj) def asdict(self) -> Dict[str, Any]: """Return this info as a dictionary.""" @@ -45,10 +50,12 @@ def asdict(self) -> Dict[str, Any]: def dump(self, filename: str) -> None: """Dump the process information into a file.""" - temp_info_file = f"{filename}.{uuid()}" - with open(temp_info_file, "w", encoding="utf-8") as fobj: - json.dump(self.asdict(), fobj) - os.replace(temp_info_file, filename) + with open(filename, "w", encoding="utf-8") as fobj: + lock(fobj, LOCK_EX) + try: + json.dump(self.asdict(), fobj) + finally: + unlock(fobj) class ManagedProcess(AbstractContextManager):