Skip to content

Commit

Permalink
Re-enable and fix output_timeout (#272)
Browse files Browse the repository at this point in the history
* Re-enable and fix output_timeout

* move known events to an enum
  • Loading branch information
plars authored May 9, 2024
1 parent 9eeddf4 commit 5af70e8
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
13 changes: 12 additions & 1 deletion agent/testflinger_agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import time

from testflinger_agent.errors import TFServerError
from .runner import CommandRunner
from .runner import CommandRunner, RunnerEvents
from .handlers import LiveOutputHandler, LogUpdateHandler
from .stop_condition_checkers import (
JobCancelledChecker,
GlobalTimeoutChecker,
OutputTimeoutChecker,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -94,6 +95,16 @@ def run_test_phase(self, phase, rundir):
)
runner.register_stop_condition_checker(global_timeout_checker)

# We only need to check for output timeouts during the test phase
if phase == "test":
output_timeout_checker = OutputTimeoutChecker(
self.get_output_timeout()
)
runner.register_stop_condition_checker(output_timeout_checker)
runner.subscribe_event(
RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update
)

# Do not allow cancellation during provision for safety reasons
if phase != "provision":
job_cancelled_checker = JobCancelledChecker(
Expand Down
28 changes: 28 additions & 0 deletions agent/testflinger_agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import threading
import time

from collections import defaultdict
from enum import Enum
from typing import Callable, Optional, List

logger = logging.getLogger(__name__)
Expand All @@ -29,13 +31,29 @@
StopConditionType = Callable[[], Optional[str]]


class RunnerEvents(Enum):
"""
Runner events that can be subscribed to.
"""

OUTPUT_RECEIVED = "output_received"


class CommandRunner:
"""
Run a command and handle output and stop conditions.
There are also events that can be subscribed to for notifications. The
known event types are defined in RunnerEvents.
"""

def __init__(self, cwd: Optional[str], env: Optional[dict]):
self.output_handlers: List[OutputHandlerType] = []
self.stop_condition_checkers: List[StopConditionType] = []
self.process: Optional[subprocess.Popen] = None
self.cwd = cwd
self.env = os.environ.copy()
self.events = defaultdict(list)
if env:
self.env.update(
{k: str(v) for k, v in env.items() if isinstance(v, str)}
Expand All @@ -44,6 +62,15 @@ def __init__(self, cwd: Optional[str], env: Optional[dict]):
def register_output_handler(self, handler: OutputHandlerType):
self.output_handlers.append(handler)

def subscribe_event(self, event_name: RunnerEvents, handler: Callable):
"""Set a callback for an event that we want to be notified of"""
self.events[event_name].append(handler)

def post_event(self, event_name: RunnerEvents):
"""Post an event for subscribers to be notified of"""
for handler in self.events[event_name]:
handler()

def post_output(self, data: str):
for handler in self.output_handlers:
handler(data)
Expand All @@ -63,6 +90,7 @@ def check_and_post_output(self):
raw_output = self.process.stdout.read()
if not raw_output:
return
self.post_event(RunnerEvents.OUTPUT_RECEIVED)

output = raw_output.decode(sys.stdout.encoding, errors="replace")
self.post_output(output)
Expand Down
8 changes: 6 additions & 2 deletions agent/testflinger_agent/stop_condition_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ def __call__(self) -> Optional[str]:
class OutputTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.start_time = time.time()
self.last_output_time = time.time()

def __call__(self) -> Optional[str]:
if time.time() - self.start_time > self.timeout:
if time.time() - self.last_output_time > self.timeout:
return f"\nERROR: Output timeout reached! ({self.timeout}s)\n"
return None

def update(self):
"""Update the last output time to the current time."""
self.last_output_time = time.time()
11 changes: 11 additions & 0 deletions agent/testflinger_agent/tests/test_stop_condition_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,14 @@ def test_output_timeout_checker(self):
assert checker() is None
time.sleep(0.6)
assert "ERROR: Output timeout reached! (0.5s)" in checker()

def test_output_timeout_update(self):
"""
Test that the output timeout checker doesn't get triggered when we
keep updating the last output time.
"""
checker = OutputTimeoutChecker(0.3)
for _ in range(5):
time.sleep(0.1)
checker.update()
assert checker() is None

0 comments on commit 5af70e8

Please # to comment.