Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(opa-runner) #760

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions packages/opal-client/opal_client/engine/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ def _init_events(self):

class OpaRunner(PolicyEngineRunner):
PANIC_DETECTION_SUBSTRINGS = [b"go/src/runtime/panic.go"]
MAX_HEALTH_CHECK_ATTEMPTS = 30
HEALTH_CHECK_INTERVAL = 1

def __init__(
self,
Expand Down Expand Up @@ -279,6 +281,54 @@ def get_arguments(self) -> list[str]:

return args

async def _check_opa_health(self) -> bool:
"""Check if OPA is healthy by making a request to its health endpoint."""
try:
# Get the configured address from options, default to localhost:8181
addr = self._options.addr if self._options.addr else ":8181"
host, port = addr.split(":")
host = host if host else "localhost"

async with aiohttp.ClientSession() as session:
url = f"http://{host}:{port}/health"
async with session.get(url) as response:
return response.status == 200
except Exception as e:
logger.debug(f"OPA health check failed: {str(e)}")
return False

async def _wait_for_opa_ready(self) -> bool:
for attempt in range(self.MAX_HEALTH_CHECK_ATTEMPTS):
if await self._check_opa_health():
logger.info("OPA is healthy and ready to accept requests")
return True

if attempt < self.MAX_HEALTH_CHECK_ATTEMPTS - 1:
logger.debug(f"OPA not ready yet, attempt {attempt + 1}/{self.MAX_HEALTH_CHECK_ATTEMPTS}")
await asyncio.sleep(self.HEALTH_CHECK_INTERVAL)

logger.error(f"OPA failed to become healthy after {self.MAX_HEALTH_CHECK_ATTEMPTS} attempts")
return False

async def _run_start_callbacks(self):
"""Runs callbacks after OPA process starts and is healthy."""
# Wait for OPA to be healthy before proceeding
if not await self._wait_for_opa_ready():
# If OPA doesn't become healthy, terminate it
self._terminate_engine()
raise Exception("OPA failed to start and become healthy")

if self._process_was_never_up_before:
# no need to rehydrate the first time
self._process_was_never_up_before = False
logger.info("Running policy engine initial start callbacks")
asyncio.create_task(
self._run_callbacks(self._on_process_initial_start_callbacks)
)
else:
logger.info("Running policy engine rehydration callbacks")
asyncio.create_task(self._run_callbacks(self._on_process_restart_callbacks))

@staticmethod
def setup_opa_runner(
options: Optional[OpaServerOptions] = None,
Expand Down
Loading