From 097dc1ec80564e57f9b0ce7b5edd15ba8429adb4 Mon Sep 17 00:00:00 2001 From: Toby Jennings Date: Wed, 19 Feb 2025 15:52:44 -0600 Subject: [PATCH] wip: panda --- src/lsst/cmservice/common/htcondor.py | 22 +++--- src/lsst/cmservice/common/panda.py | 51 +++++++++++--- src/lsst/cmservice/config.py | 51 ++++++++++++-- src/lsst/cmservice/daemon.py | 4 ++ src/lsst/cmservice/handlers/functions.py | 7 +- src/lsst/cmservice/handlers/jobs.py | 64 +++++++++++------ .../cmservice/templates/bps_submit_yaml.j2 | 8 +++ src/lsst/cmservice/templates/wms_submit_sh.j2 | 20 +++--- tests/common/test_config.py | 9 +++ tests/common/test_panda.py | 69 ++++--------------- 10 files changed, 193 insertions(+), 112 deletions(-) diff --git a/src/lsst/cmservice/common/htcondor.py b/src/lsst/cmservice/common/htcondor.py index f80b1376..50b9cdf5 100644 --- a/src/lsst/cmservice/common/htcondor.py +++ b/src/lsst/cmservice/common/htcondor.py @@ -1,7 +1,6 @@ """Utility functions for working with htcondor jobs""" import importlib.util -import os import sys from collections.abc import Mapping from types import ModuleType @@ -198,7 +197,18 @@ def build_htcondor_submit_environment() -> Mapping[str, str]: # condor_environment = config.htcondor.model_dump(by_alias=True) # TODO we should not always use the same schedd host. We could get a list # of all schedds from the collector and pick one at random. - return dict( + + # FIXME / TODO + # This is nothing to do with htcondor vs panda as a WMS, but because CM + # uses htcondor as its primary script-running engine for bps workflows even + # if that workflow uses panda. Because of this, we need to also serialize + # all of the panda environmental config for the subprocess to pick up. + # We do this instead of delegating panda config to some arbitrary bash + # script elsewhere in the filesystem whose only job is to set these env + # vars for panda. This also allows us to provide our specific panda idtoken + # as an env var instead of requiring the target process to pick it up from + # some .token file that may or may not be present or valid. + return config.panda.model_dump(by_alias=True, exclude_none=True) | dict( CONDOR_CONFIG=config.htcondor.config_source, _CONDOR_CONDOR_HOST=config.htcondor.collector_host, _CONDOR_COLLECTOR_HOST=config.htcondor.collector_host, @@ -210,7 +220,7 @@ def build_htcondor_submit_environment() -> Mapping[str, str]: HOME=config.htcondor.remote_user_home, LSST_VERSION=config.bps.lsst_version, LSST_DISTRIB_DIR=config.bps.lsst_distrib_dir, - # FIX: because there is no db-auth.yaml in lsstsvc1's home directory + # FIXME: because there is no db-auth.yaml in lsstsvc1's home directory PGPASSFILE=f"{config.htcondor.remote_user_home}/.lsst/postgres-credentials.txt", PGUSER=config.butler.default_username, PATH=( @@ -233,12 +243,6 @@ def import_htcondor() -> ModuleType | None: logger.warning("HTcondor not available.") return None - # Ensure environment is configured for htcondor operations - # FIXME: the python process needs the correct condor env set up. Alternate - # to setting these values JIT in the os.environ would be to hack a way to - # have the config.htcondor submodel's validation_alias match the - # serialization_alias, e.g., "_CONDOR_value" - os.environ |= config.htcondor.model_dump(by_alias=True) htcondor.reload_config() return htcondor diff --git a/src/lsst/cmservice/common/panda.py b/src/lsst/cmservice/common/panda.py index a0620d1e..d3898fb3 100644 --- a/src/lsst/cmservice/common/panda.py +++ b/src/lsst/cmservice/common/panda.py @@ -38,30 +38,36 @@ def refresh_token(url: str, data: dict[str, str]) -> str | None: token_data: dict[str, str] = session.post( url=url, data=data, headers={"content-type": "application/x-www-form-urlencoded"} ).json() - # TODO do what with the new token_data? - # - write it out to the token file? - # Can't do that, it's going to be readonly - # - update the configuration object? + # with the new token... + # - update the configuration object config.panda.id_token = token_data["id_token"] config.panda.refresh_token = token_data["refresh_token"] - # - update the process environment? + # - update the process environment os.environ["PANDA_AUTH_ID_TOKEN"] = config.panda.id_token return config.panda.id_token def get_token() -> str | None: """Load a panda auth token from a ``.token`` file in the appropriate - location. + location. Renew the token if it's near expiration. TODO: make this async if necessary + + TODO: does this function really need to return anything, or is the side + effect of updating the configuration object sufficient? + + TODO: when does this function need to be called? At least before any PanDA + operation, either in- or out- of process, to ensure token validity. + With respect to the potential expense of this function, we should + call it at application startup and make the expiry datetime part of + the panda configuration object so it can be checked quickly """ # If a token has been added to the configuration object, use it instead of # loading one from disk try: if config.panda.id_token is None or config.panda.refresh_token is None: - token_path = config.panda.config_root or os.getenv("PANDA_CONFIG_ROOT", "/") - token_data = json.loads((Path(token_path) / ".token").read_text()) + token_data = json.loads((Path(config.panda.config_root) / ".token").read_text()) config.panda.id_token = token_data["id_token"] config.panda.refresh_token = token_data["refresh_token"] except (FileNotFoundError, json.JSONDecodeError): @@ -74,11 +80,12 @@ def get_token() -> str | None: # TODO if "exp" not in decoded_token: ... token_expiry = float(decoded_token["exp"]) + # TODO update config.panda.idtoken_expiration[datetime] if (token_expiry - datetime.datetime.now(datetime.UTC).timestamp()) < config.panda.renew_after: if config.panda.auth_config_url is None: logger.error("There is no PanDA auth config url known to the service, cannot refresh token.") - return token_data["id_token"] + return config.panda.id_token with http_client() as session: panda_auth_config = session.get(config.panda.auth_config_url).json() token_endpoint = session.get(panda_auth_config["oidc_config_url"]).json()["token_endpoint"] @@ -91,3 +98,29 @@ def get_token() -> str | None: return refresh_token(token_endpoint, data) else: return config.panda.id_token + + +# TODO not a production function, experimental-only. +def get_workflow_status(workflow_id: int) -> None: + """Access PandaMon API to get status of a workflow_id.""" + # TODO make async + + # or POST to tls_url + "/relay_idds_command" with the command name in the + # data body, as in pandaclient.Client.call_idds_command() + if config.panda.monitor_url is None: + # log error/warning + return + + # TODO check token expiry and get a new one if we need it + _ = get_token() + + url = config.panda.monitor_url + f"/tasks/?reqid={workflow_id}" + with http_client() as session: + headers = { + "Authorization": f"Bearer {config.panda.id_token}", + "Origin": config.panda.virtual_organization, + } + response = session.get(url, headers=headers).json() + + # parse response + logger.info(response) diff --git a/src/lsst/cmservice/config.py b/src/lsst/cmservice/config.py index a9450a60..b125b280 100644 --- a/src/lsst/cmservice/config.py +++ b/src/lsst/cmservice/config.py @@ -1,9 +1,9 @@ -from typing import Self +from typing import TYPE_CHECKING, Self from urllib.parse import urlparse from warnings import warn from dotenv import load_dotenv -from pydantic import BaseModel, Field, field_validator, model_validator +from pydantic import BaseModel, Field, field_serializer, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict from .common.enums import ScriptMethodEnum, StatusEnum, WmsComputeSite @@ -234,6 +234,12 @@ class PandaConfiguration(BaseModel): default=None, ) + cache_url: str | None = Field( + description="Base URL of PanDA sandbox server", + serialization_alias="PANDACACHE_URL", + default=None, + ) + virtual_organization: str = Field( description="Virtual organization name used with Panda OIDC", serialization_alias="PANDA_AUTH_VO", @@ -261,10 +267,11 @@ class PandaConfiguration(BaseModel): exclude=True, ) - config_root: str | None = Field( + config_root: str = Field( description="Location of the PanDA .token file", serialization_alias="PANDA_CONFIG_ROOT", - default=None, + default="/var/run/secrets/panda", + exclude=True, ) auth_type: str = Field( @@ -279,18 +286,47 @@ class PandaConfiguration(BaseModel): exclude=True, ) + behind_lb: bool = Field( + description="Whether Panda is behind a loadbalancer", + default=False, + serialization_alias="PANDA_BEHIND_REAL_LB", + ) + + verify_host: bool = Field( + description="Whether to verify PanDA host TLS", + default=True, + serialization_alias="PANDA_VERIFY_HOST", + ) + + use_native_httplib: bool = Field( + description="Use native http lib instead of curl", + default=True, + serialization_alias="PANDA_USE_NATIVE_HTTPLIB", + ) + @model_validator(mode="after") def set_base_url_fields(self) -> Self: """Set the base url field when only a tls url is configured and vice versa. """ + # If no panda urls have been specified there is no need to continue + # with model validation + if self.url is None and self.tls_url is None: + return self # It does not seem critical that these URLs actually use the scheme # with which they are nominally associated, only that both be set. - if self.url is None: + elif self.url is None: self.url = self.tls_url elif self.tls_url is None: self.tls_url = self.url + # default the cache url to the tls url + if self.cache_url is None: + self.cache_url = self.tls_url + + if TYPE_CHECKING: + # mypy needs to be convinced of this case + assert self.tls_url is not None url_parts = urlparse(self.tls_url) self.auth_config_url = f"{url_parts.scheme}://{url_parts.hostname}:{url_parts.port}/auth/{self.virtual_organization}_auth_config.json" return self @@ -301,6 +337,11 @@ def set_id_token(cls, value: str | None) -> str | None: # TODO read the token file and extract the idtoken return value + @field_serializer("behind_lb", "verify_host", "use_native_httplib") + def serialize_booleans(self, value: bool) -> str: # noqa: FBT001 + """Serialize boolean fields as string values.""" + return "on" if value else "off" + # TODO deprecate and remove "slurm"-specific logic from cm-service; it is # unlikely that interfacing with slurm directly from k8s will be possible. diff --git a/src/lsst/cmservice/daemon.py b/src/lsst/cmservice/daemon.py index e760e29f..d05adae7 100644 --- a/src/lsst/cmservice/daemon.py +++ b/src/lsst/cmservice/daemon.py @@ -1,3 +1,4 @@ +import os from asyncio import create_task from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -22,6 +23,9 @@ @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator: # start + # Update process environment with configuration models + os.environ |= config.panda.model_dump(by_alias=True, exclude_none=True) + os.environ |= config.htcondor.model_dump(by_alias=True, exclude_none=True) app.state.tasks = set() daemon = create_task(main_loop(), name="daemon") app.state.tasks.add(daemon) diff --git a/src/lsst/cmservice/handlers/functions.py b/src/lsst/cmservice/handlers/functions.py index 74cd5b53..fae2aeb9 100644 --- a/src/lsst/cmservice/handlers/functions.py +++ b/src/lsst/cmservice/handlers/functions.py @@ -12,6 +12,7 @@ from ..common.enums import StatusEnum from ..common.errors import CMMissingFullnameError, CMYamlParseError +from ..common.logging import LOGGER from ..common.utils import update_include_dict from ..config import config from ..db.campaign import Campaign @@ -27,6 +28,8 @@ from ..db.task_set import TaskSet from ..db.wms_task_report import WmsTaskReport +logger = LOGGER.bind(module=__name__) + async def upsert_spec_block( session: async_scoped_session, @@ -573,7 +576,7 @@ async def load_manifest_report( def status_from_bps_report( wms_run_report: WmsRunReport | None, - fake_status: StatusEnum | None, + fake_status: StatusEnum | None = None, ) -> StatusEnum | None: # pragma: no cover """Decide the status for a workflow for a bps report @@ -592,6 +595,8 @@ def status_from_bps_report( if wms_run_report is None: return fake_status or config.mock_status + logger.debug(wms_run_report) + the_state = wms_run_report.state # We treat RUNNING as running from the CM point of view, if the_state == WmsStates.RUNNING: diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py index bc3b22d0..00128e77 100644 --- a/src/lsst/cmservice/handlers/jobs.py +++ b/src/lsst/cmservice/handlers/jobs.py @@ -3,15 +3,16 @@ import os import shutil import types -from typing import Any +from functools import partial +from typing import TYPE_CHECKING, Any import yaml -from anyio import Path +from anyio import Path, to_thread from fastapi.concurrency import run_in_threadpool from jinja2 import Environment, PackageLoader from sqlalchemy.ext.asyncio import async_scoped_session -from lsst.ctrl.bps import BaseWmsService, WmsStates +from lsst.ctrl.bps import BaseWmsService, WmsRunReport, WmsStates from lsst.utils import doImport from ..common.bash import parse_bps_stdout, write_bash_script @@ -322,11 +323,19 @@ def __init__(self, spec_block_id: int, **kwargs: dict) -> None: self._wms_svc_class: types.ModuleType | type | None = None self._wms_svc: BaseWmsService | None = None - def _get_wms_svc(self, **kwargs: Any) -> BaseWmsService: - if self._wms_svc is None: - if self.wms_svc_class_name is None: # pragma: no cover - raise CMBadExecutionMethodError(f"{type(self)} should not be used, use a sub-class instead") + def _get_wms_svc(self, **kwargs: Any) -> BaseWmsService | None: + # FIXME this should happen in __init__ + if self.wms_svc_class_name is None: # pragma: no cover + raise CMBadExecutionMethodError(f"{type(self)} should not be used, use a sub-class instead") + + try: self._wms_svc_class = doImport(self.wms_svc_class_name) + except ImportError: + logger.exception() + # This may not be an error when under testing + return None + + if self._wms_svc is None: if isinstance(self._wms_svc_class, types.ModuleType): # pragma: no cover raise CMBadExecutionMethodError( f"Site class={self.wms_svc_class_name} is not a BaseWmsService subclass", @@ -357,21 +366,36 @@ async def _load_wms_reports( Status of requested job """ fake_status = kwargs.get("fake_status", config.mock_status) + wms_svc = self._get_wms_svc(config={}) + + # It is an error if the wms_svc_class cannot be imported when not under + # a fake status. + if all([wms_svc is None, fake_status is None]): + raise ImportError + + # Return early with fake status or a missing wf id + elif any([wms_workflow_id is None, fake_status is not None]): + return status_from_bps_report(None, fake_status=fake_status) + + if TYPE_CHECKING: + assert wms_svc is not None + assert wms_workflow_id is not None + run_reports: list[WmsRunReport] + wms_run_report: WmsRunReport | None try: - wms_svc = self._get_wms_svc(config={}) - except ImportError as msg: - if not fake_status: # pragma: no cover - raise msg - try: - if fake_status is not None or wms_workflow_id is None: - wms_run_report = None - else: # pragma: no cover - wms_run_report = wms_svc.report(wms_workflow_id=wms_workflow_id.strip())[0][0] - _job = await load_wms_reports(session, job, wms_run_report) - status = status_from_bps_report(wms_run_report, fake_status=fake_status) - except Exception as msg: - print(f"Catching wms_svc.report failure: {msg}, continuing") + # FIXME: Why does wms_workflow_id need to be stripped? + wms_svc_report = partial(wms_svc.report, wms_workflow_id=wms_workflow_id.strip()) + run_reports, message = await to_thread.run_sync(wms_svc_report) + logger.debug(message) + wms_run_report = run_reports[0] + _ = await load_wms_reports(session, job, wms_run_report) + status = status_from_bps_report(wms_run_report) + except Exception: + # FIXME setting status failed for any exception seems extreme, + # there should be *retryable* exceptions with some kind of + # backoff + logger.exception() status = StatusEnum.failed return status diff --git a/src/lsst/cmservice/templates/bps_submit_yaml.j2 b/src/lsst/cmservice/templates/bps_submit_yaml.j2 index 307b5868..f6bb10c7 100644 --- a/src/lsst/cmservice/templates/bps_submit_yaml.j2 +++ b/src/lsst/cmservice/templates/bps_submit_yaml.j2 @@ -53,5 +53,13 @@ memoryLimit: 400000 {%- if wms == "panda" %} wmsServiceClass: lsst.ctrl.bps.panda.PanDAService {%- if compute_site == "usdf" %} +computeCloud: US +computeSite: SLAC +requestMemory: 4000 +memoryMultiplier: 1.2 +{%- elif compute_site == "lanc" %} +computeSite: LANCS +{%- elif compute_site == "in2p3" %} +computeSite: CC-IN2P3 {%- endif %} {%- endif %} diff --git a/src/lsst/cmservice/templates/wms_submit_sh.j2 b/src/lsst/cmservice/templates/wms_submit_sh.j2 index fae24484..ecd38193 100644 --- a/src/lsst/cmservice/templates/wms_submit_sh.j2 +++ b/src/lsst/cmservice/templates/wms_submit_sh.j2 @@ -1,22 +1,20 @@ #!/usr/bin/env bash -{%- if script_method == "bash" %} -# Assuming native environment already setup -{%- elif script_method == "htcondor" %} - -# setup LSST env. export LSST_VERSION='{{ lsst_version }}' export LSST_DISTRIB_DIR='{{ lsst_distrib_dir }}' source ${LSST_DISTRIB_DIR}/${LSST_VERSION}/loadLSST.bash setup lsst_distrib -{%- elif script_method == "panda" %} - -# setup PanDA env. -latest_panda=$(ls -td /cvmfs/sw.lsst.eu/linux-x86_64/panda_env/v* | head -1) -source ${latest_panda}/setup_panda_usdf.sh ${WEEKLY} -panda_auth status +{%- if wms == "htcondor" %} +{%- elif wms == "panda" %} +{# what value is this LATEST_PANDA path? #} +{# LATEST_PANDA=$(ls -td /cvmfs/sw.lsst.eu/almalinux-x86_64/panda_env/v* | head -1) #} +export PANDA_SYS=${CONDA_PREFIX} +export IDDS_CONFIG=${PANDA_SYS}/etc/idds/idds.cfg.client.template +export IDDS_MAX_NAME_LENGTH=30000 +export BPS_WMS_SERVICE_CLASS=lsst.ctrl.bps.panda.PanDAService +unset PANDA_CONFIG_ROOT {%- endif %} {%- if custom_lsst_setup %} diff --git a/tests/common/test_config.py b/tests/common/test_config.py index 77d69bbf..540cfdf3 100644 --- a/tests/common/test_config.py +++ b/tests/common/test_config.py @@ -52,3 +52,12 @@ def test_config_enum_validation(monkeypatch: Any) -> None: config = Configuration() assert config.script_handler == ScriptMethodEnum.htcondor + + +def test_config_boolean_serialization(monkeypatch: Any) -> None: + """Test the serialization of boolean field values to strings""" + config = Configuration() + assert type(config.panda.verify_host) is bool + d = config.panda.model_dump(exclude_none=True) + assert type(d["verify_host"]) is str + assert d["verify_host"] in ["on", "off"] diff --git a/tests/common/test_panda.py b/tests/common/test_panda.py index 7bb57992..02a33bce 100644 --- a/tests/common/test_panda.py +++ b/tests/common/test_panda.py @@ -1,12 +1,11 @@ # ruff: noqa: F841 -import base64 import datetime import json -import os from base64 import urlsafe_b64encode +from collections.abc import Generator from pathlib import Path -from urllib.parse import urlparse +from typing import Any from uuid import uuid4 import pytest @@ -20,7 +19,7 @@ @pytest.fixture -def mock_id_token(): +def mock_id_token() -> Generator[str]: """Create a mock PanDA id token that expires in 3 days.""" expiry = int((datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=3)).timestamp()) token_payload = { @@ -46,7 +45,7 @@ def mock_id_token(): @pytest.fixture -def panda_env(monkeypatch, mock_id_token): +def panda_env(monkeypatch: Any, mock_id_token: Any) -> None: config.panda.tls_url = "https://mock-panda.local:8443/server/panda" config.panda.auth_config_url = "https://mock-panda.local:8443/auth/Rubin_auth_config.json" config.panda.url = config.panda.tls_url @@ -64,56 +63,8 @@ def panda_env(monkeypatch, mock_id_token): monkeypatch.setenv("PANDA_USE_NATIVE_HTTPLIB", "1") -@pytest.mark.skip -def test_panda_token(panda_env): - panda_config_home = config.panda.config_root or os.getenv("PANDA_CONFIG_ROOT", "/") - - # Load a panda token file - token_file = Path(panda_config_home) / ".token" - token_data = json.loads(token_file.read_text()) - - # tokens - refresh_token = token_data["refresh_token"] - id_token = token_data["id_token"] - - # Details of the token can be decoded from the token body, but this is not - # used in the refresh action. - enc = id_token.split(".")[1] - enc += "=" * (-len(enc) % 4) - _ = json.loads(base64.urlsafe_b64decode(enc.encode())) - - # The authurl to use for token ops is derived from the panda base url - url_parts = urlparse(config.panda.url) - auth_url = "{url_parts.scheme}://{url_parts.hostname}:{url_parts.port}/auth/{config.panda.virtual_organization}_auth_config.json" - - # the pandaclient uses a filesystem cache to cache the results of a dip to - # the auth config for up to 1 hour. Otherwise, accesses the url, writes the - # resulting text to a file, then reads it back in. - # the contents of this file are - # { - # "client_secret": "xxx", - # "audience": "https://pandaserver-doma.cern.ch", - # "client_id": "uuid", - # "oidc_config_url": "https://panda-iam-doma.cern.ch/.well-known/openid-configuration", - # "vo": "Rubin", "no_verify": "True", "robot_ids": "NONE" - # } - # - # The refresh token operation is assembled using - # - `client_id` and `client_secret` from the auth config endpoint - # - `token_endpoint` from the oidc_config_url, - # e.g., https://panda-iam-doma.cern.ch/.well-known/openid-configuration - # - `refresh_token_string` from the current token file - # self.refresh_token( - # endpoint_config["token_endpoint"], - # auth_config["client_id"], - # auth_config["client_secret"], - # refresh_token_string - # ) - token = get_token() - - @pytest.fixture -def auth_config_mock_response(): +def auth_config_mock_response() -> Generator[dict[str, str]]: yield { "client_secret": "XXXxxXxXxXXxXXXXxXxxxXXXXxxxXXXxxxXXX-XXXxxxxXxxxxxxXxxxXX", "audience": "https://pandaserver-doma.local", @@ -126,14 +77,18 @@ def auth_config_mock_response(): @pytest.fixture -def oidc_config_mock_response(): +def oidc_config_mock_response() -> Generator[str]: response_fixture = Path(__file__).parent.parent / "fixtures" / "panda" / "oidc_config_response.json" yield response_fixture.read_text() def test_get_panda_token( - respx_mock, panda_env, mock_id_token, auth_config_mock_response, oidc_config_mock_response -): + respx_mock: Any, + panda_env: Any, + mock_id_token: Any, + auth_config_mock_response: Any, + oidc_config_mock_response: Any, +) -> None: # Tests loading a panda token that has not been expired auth_config_mock = respx_mock.get(config.panda.auth_config_url) auth_config_mock.return_value = Response(200, json=auth_config_mock_response)