Skip to content

Commit

Permalink
feat(panda): Support panda wms and idtoken operations
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Feb 21, 2025
1 parent 9774bfa commit 4cf4d11
Show file tree
Hide file tree
Showing 11 changed files with 516 additions and 50 deletions.
22 changes: 13 additions & 9 deletions src/lsst/cmservice/common/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Utility functions for working with htcondor jobs"""

import importlib.util
import os
import sys
from collections.abc import Mapping
from types import ModuleType
Expand Down Expand Up @@ -198,7 +197,18 @@ def build_htcondor_submit_environment() -> Mapping[str, str]:
# condor_environment = config.htcondor.model_dump(by_alias=True)
# TODO we should not always use the same schedd host. We could get a list
# of all schedds from the collector and pick one at random.
return dict(

# FIXME / TODO
# This is nothing to do with htcondor vs panda as a WMS, but because CM
# uses htcondor as its primary script-running engine for bps workflows even
# if that workflow uses panda. Because of this, we need to also serialize
# all of the panda environmental config for the subprocess to pick up.
# We do this instead of delegating panda config to some arbitrary bash
# script elsewhere in the filesystem whose only job is to set these env
# vars for panda. This also allows us to provide our specific panda idtoken
# as an env var instead of requiring the target process to pick it up from
# some .token file that may or may not be present or valid.
return config.panda.model_dump(by_alias=True, exclude_none=True) | dict(
CONDOR_CONFIG=config.htcondor.config_source,
_CONDOR_CONDOR_HOST=config.htcondor.collector_host,
_CONDOR_COLLECTOR_HOST=config.htcondor.collector_host,
Expand All @@ -210,7 +220,7 @@ def build_htcondor_submit_environment() -> Mapping[str, str]:
HOME=config.htcondor.remote_user_home,
LSST_VERSION=config.bps.lsst_version,
LSST_DISTRIB_DIR=config.bps.lsst_distrib_dir,
# FIX: because there is no db-auth.yaml in lsstsvc1's home directory
# FIXME: because there is no db-auth.yaml in lsstsvc1's home directory
PGPASSFILE=f"{config.htcondor.remote_user_home}/.lsst/postgres-credentials.txt",
PGUSER=config.butler.default_username,
PATH=(
Expand All @@ -233,12 +243,6 @@ def import_htcondor() -> ModuleType | None:
logger.warning("HTcondor not available.")
return None

# Ensure environment is configured for htcondor operations
# FIXME: the python process needs the correct condor env set up. Alternate
# to setting these values JIT in the os.environ would be to hack a way to
# have the config.htcondor submodel's validation_alias match the
# serialization_alias, e.g., "_CONDOR_value"
os.environ |= config.htcondor.model_dump(by_alias=True)
htcondor.reload_config()

return htcondor
131 changes: 131 additions & 0 deletions src/lsst/cmservice/common/panda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Module for PanDA operations within CM-Service"""

import datetime
import json
import os
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path

import httpx
from pandaclient.openidc_utils import decode_id_token

from ..config import config
from .logging import LOGGER

logger = LOGGER.bind(module=__name__)
"""A module-level logger"""


@contextmanager
def http_client() -> Generator[httpx.Client]:
"""Generate a client session for panda API operations."""
transport = httpx.HTTPTransport(
verify=config.panda.verify_host,
retries=3,
)
with httpx.Client(transport=transport) as session:
yield session


def refresh_panda_token(url: str, data: dict[str, str]) -> str | None:
"""Refresh a panda auth token."""
with http_client() as session:
response = session.post(
url=url, data=data, headers={"content-type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()

token_data: dict[str, str] = response.json()
# with the new token...
# - update the configuration object
config.panda.id_token = token_data["id_token"]
config.panda.refresh_token = token_data["refresh_token"]
# - update the process environment
os.environ["PANDA_AUTH_ID_TOKEN"] = config.panda.id_token
# - update token expiry
decoded_token = decode_id_token(config.panda.id_token)
config.panda.token_expiry = float(decoded_token["exp"]) # type: ignore
return config.panda.id_token


def get_panda_token() -> str | None:
"""Fetch a panda id token from configuration or a token file as necessary.
If a token does not exist or is near expiry, create or refresh a token.
Returns
-------
str or None
The string value of a panda id token or None if no such token exists or
can be created.
TODO: make this async if necessary, but the daemon is less sensitive to
sync operations as long as they do not block indefinitely.
Notes
-----
This function should be called at application startup to bootstrap an id
token, and again before panda operations that may require the use of the
id token, to ensure the validity within the token expiry time.
The refresh operation never actually uses the current idtoken except to
discover the expiry time. We don't actually need any bootstrap value for
the idtoken if we start with a refresh token; the auth_config_url is
determined from the panda url and the oidc VO.
"""

# If a token has been added to the configuration object, use it instead of
# loading one from disk
try:
if config.panda.refresh_token is None:
token_data = json.loads((Path(config.panda.config_root) / ".token").read_text())
config.panda.id_token = token_data["id_token"]
config.panda.refresh_token = token_data["refresh_token"]
except (FileNotFoundError, json.JSONDecodeError):
logger.exception()
return None

now_utc = datetime.datetime.now(datetime.UTC)

# Determine whether the token should be renewed
# The token expiry time is part of the encoded token
try:
decoded_token = decode_id_token(config.panda.id_token)
# TODO if "exp" not in decoded_token: ...
config.panda.token_expiry = float(decoded_token["exp"]) # type: ignore
except Exception:
# FIXME: this should generally be an AttributeError but the 3rdparty
# function may change its operation.
# If current id_token is None or otherwise not decodable, we will get a
# new one from the refresh operation
logger.exception()
config.panda.token_expiry = now_utc

if (config.panda.token_expiry - now_utc) < datetime.timedelta(config.panda.renew_after):
if config.panda.auth_config_url is None:
logger.error("There is no PanDA auth config url known to the service, cannot refresh token.")
return config.panda.id_token

try:
# TODO it is probably safe to cache these response tokens
with http_client() as session:
auth_config_response = session.get(config.panda.auth_config_url)
auth_config_response.raise_for_status()
panda_auth_config = auth_config_response.json()

token_response = session.get(panda_auth_config["oidc_config_url"])
token_response.raise_for_status()
token_endpoint = token_response.json()["token_endpoint"]

data = dict(
client_id=panda_auth_config["client_id"],
client_secret=panda_auth_config["client_secret"],
grant_type="refresh_token",
refresh_token=config.panda.refresh_token,
)

_ = refresh_panda_token(token_endpoint, data)
except httpx.HTTPStatusError:
logger.exception()

return config.panda.id_token
144 changes: 143 additions & 1 deletion src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from datetime import UTC, datetime
from typing import Self
from urllib.parse import urlparse
from warnings import warn

from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field, computed_field, field_serializer, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from .common.enums import ScriptMethodEnum, StatusEnum, WmsComputeSite
Expand Down Expand Up @@ -210,6 +213,144 @@ class HTCondorConfiguration(BaseModel):
)


class PandaConfiguration(BaseModel, validate_assignment=True):
"""Configuration parameters for the PanDA WMS"""

tls_url: str | None = Field(
description="Base HTTPS URL of PanDA server",
serialization_alias="PANDA_URL_SSL",
default=None,
)

url: str | None = Field(
description="Base HTTP URL of PanDA server",
serialization_alias="PANDA_URL",
default=None,
)

monitor_url: str | None = Field(
description="URL of PanDA monitor",
serialization_alias="PANDAMON_URL",
default=None,
)

cache_url: str | None = Field(
description="Base URL of PanDA sandbox server",
serialization_alias="PANDACACHE_URL",
default=None,
)

virtual_organization: str = Field(
description="Virtual organization name used with Panda OIDC",
serialization_alias="PANDA_AUTH_VO",
default="Rubin",
)

renew_after: int = Field(
description="Minimum auth token lifetime in seconds before renewal attempts are made",
default=302_400,
exclude=True,
)

# The presence of this environment variable should cause the panda client
# to use specified token directly, skipping IO related to reading a token
# file.
id_token: str | None = Field(
description="Current id token for PanDA authentication",
serialization_alias="PANDA_AUTH_ID_TOKEN",
default=None,
)

refresh_token: str | None = Field(
description="Current refresh token for PanDA token operations",
default=None,
exclude=True,
)

token_expiry: datetime = Field(
description="Time at which the current idtoken expires",
default=datetime.now(tz=UTC),
exclude=True,
)

config_root: str = Field(
description="Location of the PanDA .token file",
serialization_alias="PANDA_CONFIG_ROOT",
default="/var/run/secrets/panda",
exclude=True,
)

auth_type: str = Field(
description="Panda Auth type",
serialization_alias="PANDA_AUTH",
default="oidc",
)

behind_lb: bool = Field(
description="Whether Panda is behind a loadbalancer",
default=False,
serialization_alias="PANDA_BEHIND_REAL_LB",
)

verify_host: bool = Field(
description="Whether to verify PanDA host TLS",
default=True,
serialization_alias="PANDA_VERIFY_HOST",
)

use_native_httplib: bool = Field(
description="Use native http lib instead of curl",
default=True,
serialization_alias="PANDA_USE_NATIVE_HTTPLIB",
)

@computed_field(repr=False) # type: ignore[prop-decorator]
@property
def auth_config_url(self) -> str | None:
"""Location of auth config for PanDA VO."""
if self.tls_url is None:
return None
url_parts = urlparse(self.tls_url)
return f"{url_parts.scheme}://{url_parts.hostname}:{url_parts.port}/auth/{self.virtual_organization}_auth_config.json"

@model_validator(mode="after")
def set_base_url_fields(self) -> Self:
"""Set all url fields when only a subset of urls are supplied."""
# NOTE: there is a danger of this validator creating a recursion error
# if unbounded field-setters are used. Every update to the model
# will itself trigger this validator because of the
# `validate_assignment` directive on the model itself.

# If no panda urls have been specified there is no need to continue
# with model validation
if self.url is None and self.tls_url is None:
return self
# It does not seem critical that these URLs actually use the scheme
# with which they are nominally associated, only that both be set.
elif self.url is None:
self.url = self.tls_url
elif self.tls_url is None:
self.tls_url = self.url

# default the cache url to the tls url
if self.cache_url is None:
self.cache_url = self.tls_url
return self

@field_validator("token_expiry", mode="after")
@classmethod
def set_datetime_utc(cls, value: datetime) -> datetime:
"""Applies UTC timezone to datetime value."""
# For tz-naive datetimes, treat the time as UTC in the first place
# otherwise coerce the tz-aware datetime into UTC
return value.replace(tzinfo=UTC) if value.tzinfo is None else value.astimezone(tz=UTC)

@field_serializer("behind_lb", "verify_host", "use_native_httplib")
def serialize_booleans(self, value: bool) -> str: # noqa: FBT001
"""Serialize boolean fields as string values."""
return "on" if value else "off"


# TODO deprecate and remove "slurm"-specific logic from cm-service; it is
# unlikely that interfacing with slurm directly from k8s will be possible.
class SlurmConfiguration(BaseModel):
Expand Down Expand Up @@ -383,6 +524,7 @@ class Configuration(BaseSettings):
htcondor: HTCondorConfiguration = HTCondorConfiguration()
logging: LoggingConfiguration = LoggingConfiguration()
slurm: SlurmConfiguration = SlurmConfiguration()
panda: PandaConfiguration = PandaConfiguration()

# Root fields
script_handler: ScriptMethodEnum = Field(
Expand Down
7 changes: 7 additions & 0 deletions src/lsst/cmservice/daemon.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from asyncio import create_task
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
Expand All @@ -11,6 +12,7 @@
from . import __version__
from .common.daemon import daemon_iteration
from .common.logging import LOGGER
from .common.panda import get_panda_token
from .config import config
from .routers.healthz import health_router

Expand All @@ -22,6 +24,11 @@
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
# start
# Bootstrap a panda id token
_ = get_panda_token()
# Update process environment with configuration models
os.environ |= config.panda.model_dump(by_alias=True, exclude_none=True)
os.environ |= config.htcondor.model_dump(by_alias=True, exclude_none=True)
app.state.tasks = set()
daemon = create_task(main_loop(), name="daemon")
app.state.tasks.add(daemon)
Expand Down
Loading

0 comments on commit 4cf4d11

Please # to comment.