diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index c41643aa..73159350 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -19,11 +19,12 @@ import time from testflinger_agent.errors import TFServerError -from .runner import CommandRunner +from .runner import CommandRunner, RunnerEvents from .handlers import LiveOutputHandler, LogUpdateHandler from .stop_condition_checkers import ( JobCancelledChecker, GlobalTimeoutChecker, + OutputTimeoutChecker, ) logger = logging.getLogger(__name__) @@ -94,6 +95,16 @@ def run_test_phase(self, phase, rundir): ) runner.register_stop_condition_checker(global_timeout_checker) + # We only need to check for output timeouts during the test phase + if phase == "test": + output_timeout_checker = OutputTimeoutChecker( + self.get_output_timeout() + ) + runner.register_stop_condition_checker(output_timeout_checker) + runner.subscribe_event( + RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update + ) + # Do not allow cancellation during provision for safety reasons if phase != "provision": job_cancelled_checker = JobCancelledChecker( diff --git a/agent/testflinger_agent/runner.py b/agent/testflinger_agent/runner.py index 1a30ffda..2403e406 100644 --- a/agent/testflinger_agent/runner.py +++ b/agent/testflinger_agent/runner.py @@ -21,6 +21,8 @@ import threading import time +from collections import defaultdict +from enum import Enum from typing import Callable, Optional, List logger = logging.getLogger(__name__) @@ -29,13 +31,29 @@ StopConditionType = Callable[[], Optional[str]] +class RunnerEvents(Enum): + """ + Runner events that can be subscribed to. + """ + + OUTPUT_RECEIVED = "output_received" + + class CommandRunner: + """ + Run a command and handle output and stop conditions. + + There are also events that can be subscribed to for notifications. The + known event types are defined in RunnerEvents. + """ + def __init__(self, cwd: Optional[str], env: Optional[dict]): self.output_handlers: List[OutputHandlerType] = [] self.stop_condition_checkers: List[StopConditionType] = [] self.process: Optional[subprocess.Popen] = None self.cwd = cwd self.env = os.environ.copy() + self.events = defaultdict(list) if env: self.env.update( {k: str(v) for k, v in env.items() if isinstance(v, str)} @@ -44,6 +62,15 @@ def __init__(self, cwd: Optional[str], env: Optional[dict]): def register_output_handler(self, handler: OutputHandlerType): self.output_handlers.append(handler) + def subscribe_event(self, event_name: RunnerEvents, handler: Callable): + """Set a callback for an event that we want to be notified of""" + self.events[event_name].append(handler) + + def post_event(self, event_name: RunnerEvents): + """Post an event for subscribers to be notified of""" + for handler in self.events[event_name]: + handler() + def post_output(self, data: str): for handler in self.output_handlers: handler(data) @@ -63,6 +90,7 @@ def check_and_post_output(self): raw_output = self.process.stdout.read() if not raw_output: return + self.post_event(RunnerEvents.OUTPUT_RECEIVED) output = raw_output.decode(sys.stdout.encoding, errors="replace") self.post_output(output) diff --git a/agent/testflinger_agent/stop_condition_checkers.py b/agent/testflinger_agent/stop_condition_checkers.py index 704b8cf8..9a01cde0 100644 --- a/agent/testflinger_agent/stop_condition_checkers.py +++ b/agent/testflinger_agent/stop_condition_checkers.py @@ -43,9 +43,13 @@ def __call__(self) -> Optional[str]: class OutputTimeoutChecker: def __init__(self, timeout: int): self.timeout = timeout - self.start_time = time.time() + self.last_output_time = time.time() def __call__(self) -> Optional[str]: - if time.time() - self.start_time > self.timeout: + if time.time() - self.last_output_time > self.timeout: return f"\nERROR: Output timeout reached! ({self.timeout}s)\n" return None + + def update(self): + """Update the last output time to the current time.""" + self.last_output_time = time.time() diff --git a/agent/testflinger_agent/tests/test_stop_condition_checkers.py b/agent/testflinger_agent/tests/test_stop_condition_checkers.py index 8f175dad..bf785546 100644 --- a/agent/testflinger_agent/tests/test_stop_condition_checkers.py +++ b/agent/testflinger_agent/tests/test_stop_condition_checkers.py @@ -51,3 +51,14 @@ def test_output_timeout_checker(self): assert checker() is None time.sleep(0.6) assert "ERROR: Output timeout reached! (0.5s)" in checker() + + def test_output_timeout_update(self): + """ + Test that the output timeout checker doesn't get triggered when we + keep updating the last output time. + """ + checker = OutputTimeoutChecker(0.3) + for _ in range(5): + time.sleep(0.1) + checker.update() + assert checker() is None