From b79b5e5f89fd917c76265ec8b59137e3d013d24c Mon Sep 17 00:00:00 2001 From: Cedric Hombourger Date: Fri, 27 Sep 2024 16:50:22 +0200 Subject: [PATCH] fix(storage): add retry mechanism to recover from a full input queue If the client writes data too quickly, the agent may see its input queue full. Give ourselves up to 30 seconds to submit the data to be written to the shared storage before giving up. Signed-off-by: Cedric Hombourger --- mtda/client.py | 22 ++++++++++++++++------ mtda/constants.py | 2 ++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/mtda/client.py b/mtda/client.py index 5d0a5eb8..63dfd5e2 100644 --- a/mtda/client.py +++ b/mtda/client.py @@ -525,18 +525,18 @@ def copy(self): self.progress() # Write block to shared storage device - bytes_wanted = self._agent.storage_write(data, self._session) + bytes_wanted = self._write(data) # Check what to do next if bytes_wanted < 0: - raise IOError('write or decompression error from the ' - 'shared storage') + exc = 'write or decompression error from shared storage' + raise IOError(exc) + elif bytes_wanted == 0: + exc = 'timeout from shared storage' + raise IOError(exc) elif bytes_wanted > 0: # Read next block data = inputstream.read(bytes_wanted) - else: - # Agent may continue without further data - data = b'' finally: if comp_on_the_fly: @@ -548,6 +548,16 @@ def size(self): st = os.stat(self._path) return st.st_size + def _write(self, data): + max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL) + + for _ in range(max_tries): + result = self._agent.storage_write(data, self._session) + if result != 0: + return result + time.sleep(CONSTS.STORAGE.RETRY_INTERVAL) + return 0 + class ImageS3(ImageFile): """ An image to be downloaded from a S3 bucket """ diff --git a/mtda/constants.py b/mtda/constants.py index 475d8ca2..ee74eb0c 100644 --- a/mtda/constants.py +++ b/mtda/constants.py @@ -67,6 +67,8 @@ class STORAGE: LOCKED = "LOCKED" UNLOCKED = "UNLOCKED" UNKNOWN = "???" + RETRY_INTERVAL = 0.5 + TIMEOUT = 30 class WRITER: