Skip to content

Commit 527a324

Browse files
karajan1001pmrowla
andcommitted
Add read lock to msg file reading.
fix: #91 Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
1 parent af70c33 commit 527a324

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

src/dvc_task/app/filesystem.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88
from kombu.utils.encoding import bytes_to_str
99
from kombu.utils.json import loads
1010

11-
from ..contrib.kombu_filesystem import backport_filesystem_transport
11+
from ..contrib.kombu_filesystem import (
12+
LOCK_SH,
13+
backport_filesystem_transport,
14+
lock,
15+
unlock,
16+
)
1217
from ..utils import makedirs, remove, unc_path
1318

1419
logger = logging.getLogger(__name__)
@@ -122,12 +127,18 @@ def _iter_folder(
122127
path = os.path.join(folder, filename)
123128
try:
124129
with open(path, "rb") as fobj:
125-
payload = fobj.read()
130+
lock(fobj, LOCK_SH)
131+
try:
132+
payload = fobj.read()
133+
finally:
134+
unlock(fobj)
126135
except FileNotFoundError:
127136
# Messages returned by `listdir` call may have been
128137
# acknowledged and moved to `processed_folder` by the
129138
# time we try to read them here
130139
continue
140+
if not payload:
141+
continue
131142
msg = channel.Message(
132143
loads(bytes_to_str(payload)), channel=channel
133144
)

0 commit comments

Comments
 (0)