Skip to content

Commit

Permalink
wip: panda
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Feb 20, 2025
1 parent ba7a12f commit e3fd403
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 118 deletions.
22 changes: 13 additions & 9 deletions src/lsst/cmservice/common/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Utility functions for working with htcondor jobs"""

import importlib.util
import os
import sys
from collections.abc import Mapping
from types import ModuleType
Expand Down Expand Up @@ -198,7 +197,18 @@ def build_htcondor_submit_environment() -> Mapping[str, str]:
# condor_environment = config.htcondor.model_dump(by_alias=True)
# TODO we should not always use the same schedd host. We could get a list
# of all schedds from the collector and pick one at random.
return dict(

# FIXME / TODO
# This is nothing to do with htcondor vs panda as a WMS, but because CM
# uses htcondor as its primary script-running engine for bps workflows even
# if that workflow uses panda. Because of this, we need to also serialize
# all of the panda environmental config for the subprocess to pick up.
# We do this instead of delegating panda config to some arbitrary bash
# script elsewhere in the filesystem whose only job is to set these env
# vars for panda. This also allows us to provide our specific panda idtoken
# as an env var instead of requiring the target process to pick it up from
# some .token file that may or may not be present or valid.
return config.panda.model_dump(by_alias=True, exclude_none=True) | dict(
CONDOR_CONFIG=config.htcondor.config_source,
_CONDOR_CONDOR_HOST=config.htcondor.collector_host,
_CONDOR_COLLECTOR_HOST=config.htcondor.collector_host,
Expand All @@ -210,7 +220,7 @@ def build_htcondor_submit_environment() -> Mapping[str, str]:
HOME=config.htcondor.remote_user_home,
LSST_VERSION=config.bps.lsst_version,
LSST_DISTRIB_DIR=config.bps.lsst_distrib_dir,
# FIX: because there is no db-auth.yaml in lsstsvc1's home directory
# FIXME: because there is no db-auth.yaml in lsstsvc1's home directory
PGPASSFILE=f"{config.htcondor.remote_user_home}/.lsst/postgres-credentials.txt",
PGUSER=config.butler.default_username,
PATH=(
Expand All @@ -233,12 +243,6 @@ def import_htcondor() -> ModuleType | None:
logger.warning("HTcondor not available.")
return None

# Ensure environment is configured for htcondor operations
# FIXME: the python process needs the correct condor env set up. Alternate
# to setting these values JIT in the os.environ would be to hack a way to
# have the config.htcondor submodel's validation_alias match the
# serialization_alias, e.g., "_CONDOR_value"
os.environ |= config.htcondor.model_dump(by_alias=True)
htcondor.reload_config()

return htcondor
51 changes: 42 additions & 9 deletions src/lsst/cmservice/common/panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,36 @@ def refresh_token(url: str, data: dict[str, str]) -> str | None:
token_data: dict[str, str] = session.post(
url=url, data=data, headers={"content-type": "application/x-www-form-urlencoded"}
).json()
# TODO do what with the new token_data?
# - write it out to the token file?
# Can't do that, it's going to be readonly
# - update the configuration object?
# with the new token...
# - update the configuration object
config.panda.id_token = token_data["id_token"]
config.panda.refresh_token = token_data["refresh_token"]
# - update the process environment?
# - update the process environment
os.environ["PANDA_AUTH_ID_TOKEN"] = config.panda.id_token
return config.panda.id_token


def get_token() -> str | None:
"""Load a panda auth token from a ``.token`` file in the appropriate
location.
location. Renew the token if it's near expiration.
TODO: make this async if necessary
TODO: does this function really need to return anything, or is the side
effect of updating the configuration object sufficient?
TODO: when does this function need to be called? At least before any PanDA
operation, either in- or out- of process, to ensure token validity.
With respect to the potential expense of this function, we should
call it at application startup and make the expiry datetime part of
the panda configuration object so it can be checked quickly
"""

# If a token has been added to the configuration object, use it instead of
# loading one from disk
try:
if config.panda.id_token is None or config.panda.refresh_token is None:
token_path = config.panda.config_root or os.getenv("PANDA_CONFIG_ROOT", "/")
token_data = json.loads((Path(token_path) / ".token").read_text())
token_data = json.loads((Path(config.panda.config_root) / ".token").read_text())
config.panda.id_token = token_data["id_token"]
config.panda.refresh_token = token_data["refresh_token"]
except (FileNotFoundError, json.JSONDecodeError):
Expand All @@ -74,11 +80,12 @@ def get_token() -> str | None:

# TODO if "exp" not in decoded_token: ...
token_expiry = float(decoded_token["exp"])
# TODO update config.panda.idtoken_expiration[datetime]

if (token_expiry - datetime.datetime.now(datetime.UTC).timestamp()) < config.panda.renew_after:
if config.panda.auth_config_url is None:
logger.error("There is no PanDA auth config url known to the service, cannot refresh token.")
return token_data["id_token"]
return config.panda.id_token
with http_client() as session:
panda_auth_config = session.get(config.panda.auth_config_url).json()
token_endpoint = session.get(panda_auth_config["oidc_config_url"]).json()["token_endpoint"]
Expand All @@ -91,3 +98,29 @@ def get_token() -> str | None:
return refresh_token(token_endpoint, data)
else:
return config.panda.id_token


# TODO not a production function, experimental-only.
def get_workflow_status(workflow_id: int) -> None:
"""Access PandaMon API to get status of a workflow_id."""
# TODO make async

# or POST to tls_url + "/relay_idds_command" with the command name in the
# data body, as in pandaclient.Client.call_idds_command()
if config.panda.monitor_url is None:
# log error/warning
return

# TODO check token expiry and get a new one if we need it
_ = get_token()

url = config.panda.monitor_url + f"/tasks/?reqid={workflow_id}"
with http_client() as session:
headers = {
"Authorization": f"Bearer {config.panda.id_token}",
"Origin": config.panda.virtual_organization,
}
response = session.get(url, headers=headers).json()

# parse response
logger.info(response)
51 changes: 46 additions & 5 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Self
from typing import TYPE_CHECKING, Self
from urllib.parse import urlparse
from warnings import warn

from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator, model_validator
from pydantic import BaseModel, Field, field_serializer, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from .common.enums import ScriptMethodEnum, StatusEnum, WmsComputeSite
Expand Down Expand Up @@ -234,6 +234,12 @@ class PandaConfiguration(BaseModel):
default=None,
)

cache_url: str | None = Field(
description="Base URL of PanDA sandbox server",
serialization_alias="PANDACACHE_URL",
default=None,
)

virtual_organization: str = Field(
description="Virtual organization name used with Panda OIDC",
serialization_alias="PANDA_AUTH_VO",
Expand Down Expand Up @@ -261,10 +267,11 @@ class PandaConfiguration(BaseModel):
exclude=True,
)

config_root: str | None = Field(
config_root: str = Field(
description="Location of the PanDA .token file",
serialization_alias="PANDA_CONFIG_ROOT",
default=None,
default="/var/run/secrets/panda",
exclude=True,
)

auth_type: str = Field(
Expand All @@ -279,18 +286,47 @@ class PandaConfiguration(BaseModel):
exclude=True,
)

behind_lb: bool = Field(
description="Whether Panda is behind a loadbalancer",
default=False,
serialization_alias="PANDA_BEHIND_REAL_LB",
)

verify_host: bool = Field(
description="Whether to verify PanDA host TLS",
default=True,
serialization_alias="PANDA_VERIFY_HOST",
)

use_native_httplib: bool = Field(
description="Use native http lib instead of curl",
default=True,
serialization_alias="PANDA_USE_NATIVE_HTTPLIB",
)

@model_validator(mode="after")
def set_base_url_fields(self) -> Self:
"""Set the base url field when only a tls url is configured and vice
versa.
"""
# If no panda urls have been specified there is no need to continue
# with model validation
if self.url is None and self.tls_url is None:
return self
# It does not seem critical that these URLs actually use the scheme
# with which they are nominally associated, only that both be set.
if self.url is None:
elif self.url is None:
self.url = self.tls_url
elif self.tls_url is None:
self.tls_url = self.url

# default the cache url to the tls url
if self.cache_url is None:
self.cache_url = self.tls_url

if TYPE_CHECKING:
# mypy needs to be convinced of this case
assert self.tls_url is not None
url_parts = urlparse(self.tls_url)
self.auth_config_url = f"{url_parts.scheme}://{url_parts.hostname}:{url_parts.port}/auth/{self.virtual_organization}_auth_config.json"
return self
Expand All @@ -301,6 +337,11 @@ def set_id_token(cls, value: str | None) -> str | None:
# TODO read the token file and extract the idtoken
return value

@field_serializer("behind_lb", "verify_host", "use_native_httplib")
def serialize_booleans(self, value: bool) -> str: # noqa: FBT001
"""Serialize boolean fields as string values."""
return "on" if value else "off"


# TODO deprecate and remove "slurm"-specific logic from cm-service; it is
# unlikely that interfacing with slurm directly from k8s will be possible.
Expand Down
4 changes: 4 additions & 0 deletions src/lsst/cmservice/daemon.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from asyncio import create_task
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
Expand All @@ -22,6 +23,9 @@
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
# start
# Update process environment with configuration models
os.environ |= config.panda.model_dump(by_alias=True, exclude_none=True)
os.environ |= config.htcondor.model_dump(by_alias=True, exclude_none=True)
app.state.tasks = set()
daemon = create_task(main_loop(), name="daemon")
app.state.tasks.add(daemon)
Expand Down
7 changes: 6 additions & 1 deletion src/lsst/cmservice/handlers/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from ..common.enums import StatusEnum
from ..common.errors import CMMissingFullnameError, CMYamlParseError
from ..common.logging import LOGGER
from ..common.utils import update_include_dict
from ..config import config
from ..db.campaign import Campaign
Expand All @@ -27,6 +28,8 @@
from ..db.task_set import TaskSet
from ..db.wms_task_report import WmsTaskReport

logger = LOGGER.bind(module=__name__)


async def upsert_spec_block(
session: async_scoped_session,
Expand Down Expand Up @@ -573,7 +576,7 @@ async def load_manifest_report(

def status_from_bps_report(
wms_run_report: WmsRunReport | None,
fake_status: StatusEnum | None,
fake_status: StatusEnum | None = None,
) -> StatusEnum | None: # pragma: no cover
"""Decide the status for a workflow for a bps report
Expand All @@ -592,6 +595,8 @@ def status_from_bps_report(
if wms_run_report is None:
return fake_status or config.mock_status

logger.debug(wms_run_report)

the_state = wms_run_report.state
# We treat RUNNING as running from the CM point of view,
if the_state == WmsStates.RUNNING:
Expand Down
Loading

0 comments on commit e3fd403

Please # to comment.