diff --git a/packages/opal-client/opal_client/engine/runner.py b/packages/opal-client/opal_client/engine/runner.py index 8ef3cfcc..8e596225 100644 --- a/packages/opal-client/opal_client/engine/runner.py +++ b/packages/opal-client/opal_client/engine/runner.py @@ -244,6 +244,8 @@ def _init_events(self): class OpaRunner(PolicyEngineRunner): PANIC_DETECTION_SUBSTRINGS = [b"go/src/runtime/panic.go"] + MAX_HEALTH_CHECK_ATTEMPTS = 30 + HEALTH_CHECK_INTERVAL = 1 def __init__( self, @@ -279,6 +281,54 @@ def get_arguments(self) -> list[str]: return args + async def _check_opa_health(self) -> bool: + """Check if OPA is healthy by making a request to its health endpoint.""" + try: + # Get the configured address from options, default to localhost:8181 + addr = self._options.addr if self._options.addr else ":8181" + host, port = addr.split(":") + host = host if host else "localhost" + + async with aiohttp.ClientSession() as session: + url = f"http://{host}:{port}/health" + async with session.get(url) as response: + return response.status == 200 + except Exception as e: + logger.debug(f"OPA health check failed: {str(e)}") + return False + + async def _wait_for_opa_ready(self) -> bool: + for attempt in range(self.MAX_HEALTH_CHECK_ATTEMPTS): + if await self._check_opa_health(): + logger.info("OPA is healthy and ready to accept requests") + return True + + if attempt < self.MAX_HEALTH_CHECK_ATTEMPTS - 1: + logger.debug(f"OPA not ready yet, attempt {attempt + 1}/{self.MAX_HEALTH_CHECK_ATTEMPTS}") + await asyncio.sleep(self.HEALTH_CHECK_INTERVAL) + + logger.error(f"OPA failed to become healthy after {self.MAX_HEALTH_CHECK_ATTEMPTS} attempts") + return False + + async def _run_start_callbacks(self): + """Runs callbacks after OPA process starts and is healthy.""" + # Wait for OPA to be healthy before proceeding + if not await self._wait_for_opa_ready(): + # If OPA doesn't become healthy, terminate it + self._terminate_engine() + raise Exception("OPA failed to start and become healthy") + + if self._process_was_never_up_before: + # no need to rehydrate the first time + self._process_was_never_up_before = False + logger.info("Running policy engine initial start callbacks") + asyncio.create_task( + self._run_callbacks(self._on_process_initial_start_callbacks) + ) + else: + logger.info("Running policy engine rehydration callbacks") + asyncio.create_task(self._run_callbacks(self._on_process_restart_callbacks)) + @staticmethod def setup_opa_runner( options: Optional[OpaServerOptions] = None,