From 92800d92febf17e42c37f6398b611b3307af9132 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Wed, 16 Dec 2020 02:12:23 -0800 Subject: [PATCH] Add spark k8s operator launcher (#1225) * support spark k8s operator launcher Signed-off-by: Oleg Avdeev * make bq package path constant Signed-off-by: Oleg Avdeev * address more comments Signed-off-by: Oleg Avdeev --- sdk/python/feast/constants.py | 9 + sdk/python/feast/pyspark/abc.py | 15 +- .../historical_feature_retrieval_job.py | 13 +- sdk/python/feast/pyspark/launcher.py | 12 + .../feast/pyspark/launchers/aws/emr_utils.py | 10 +- .../pyspark/launchers/gcloud/dataproc.py | 30 +- .../feast/pyspark/launchers/k8s/__init__.py | 13 + sdk/python/feast/pyspark/launchers/k8s/k8s.py | 341 ++++++++++++++++++ .../feast/pyspark/launchers/k8s/k8s_utils.py | 285 +++++++++++++++ .../pyspark/launchers/standalone/local.py | 5 +- sdk/python/feast/staging/storage_client.py | 23 +- sdk/python/requirements-dev.txt | 1 + sdk/python/setup.py | 1 + 13 files changed, 725 insertions(+), 33 deletions(-) create mode 100644 sdk/python/feast/pyspark/launchers/k8s/__init__.py create mode 100644 sdk/python/feast/pyspark/launchers/k8s/k8s.py create mode 100644 sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index ef6e5cf0372..af5efca1df6 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -180,6 +180,15 @@ class ConfigOptions(metaclass=ConfigMeta): #: No. of executor memory for Dataproc cluster DATAPROC_EXECUTOR_MEMORY = "2g" + # namespace to use for Spark jobs launched using k8s spark operator + SPARK_K8S_NAMESPACE = "default" + + # expect k8s spark operator to be running in the same cluster as Feast + SPARK_K8S_USE_INCLUSTER_CONFIG = True + + # SparkApplication resource template + SPARK_K8S_JOB_TEMPLATE_PATH = None + #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index 4ec2b79962a..216c31df2ba 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -2,6 +2,7 @@ import hashlib import json import os +from base64 import b64encode from datetime import datetime from enum import Enum from typing import Dict, List, Optional @@ -15,6 +16,9 @@ class SparkJobFailure(Exception): pass +BQ_SPARK_PACKAGE = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0" + + class SparkJobStatus(Enum): STARTING = 0 IN_PROGRESS = 1 @@ -243,15 +247,18 @@ def get_main_file_path(self) -> str: ) def get_arguments(self) -> List[str]: + def json_b64_encode(obj) -> str: + return b64encode(json.dumps(obj).encode("utf8")).decode("ascii") + return [ "--feature-tables", - json.dumps(self._feature_tables), + json_b64_encode(self._feature_tables), "--feature-tables-sources", - json.dumps(self._feature_tables_sources), + json_b64_encode(self._feature_tables_sources), "--entity-source", - json.dumps(self._entity_source), + json_b64_encode(self._entity_source), "--destination", - json.dumps(self._destination), + json_b64_encode(self._destination), ] def get_destination_path(self) -> str: diff --git a/sdk/python/feast/pyspark/historical_feature_retrieval_job.py b/sdk/python/feast/pyspark/historical_feature_retrieval_job.py index 234a096c647..26be6888f41 100644 --- a/sdk/python/feast/pyspark/historical_feature_retrieval_job.py +++ b/sdk/python/feast/pyspark/historical_feature_retrieval_job.py @@ -1,6 +1,7 @@ import abc import argparse import json +from base64 import b64decode from datetime import timedelta from typing import Any, Dict, List, NamedTuple, Optional @@ -794,13 +795,17 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable: ) +def json_b64_decode(s: str) -> Any: + return json.loads(b64decode(s.encode("ascii"))) + + if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() args = _get_args() - feature_tables_conf = json.loads(args.feature_tables) - feature_tables_sources_conf = json.loads(args.feature_tables_sources) - entity_source_conf = json.loads(args.entity_source) - destination_conf = json.loads(args.destination) + feature_tables_conf = json_b64_decode(args.feature_tables) + feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources) + entity_source_conf = json_b64_decode(args.entity_source) + destination_conf = json_b64_decode(args.destination) start_job( spark, entity_source_conf, diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index d7919af5afd..2044389b880 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -65,10 +65,22 @@ def _get_optional(option): ) +def _k8s_launcher(config: Config) -> JobLauncher: + from feast.pyspark.launchers import k8s + + return k8s.KubernetesJobLauncher( + namespace=config.get(opt.SPARK_K8S_NAMESPACE), + resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None), + staging_location=config.get(opt.SPARK_STAGING_LOCATION), + incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG), + ) + + _launchers = { "standalone": _standalone_launcher, "dataproc": _dataproc_launcher, "emr": _emr_launcher, + "k8s": _k8s_launcher, } diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py index aed08dfd4ab..d8f848df01c 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py @@ -8,6 +8,8 @@ import yaml +from feast.pyspark.abc import BQ_SPARK_PACKAGE + __all__ = [ "FAILED_STEP_STATES", "HISTORICAL_RETRIEVAL_JOB_TYPE", @@ -107,7 +109,7 @@ def _sync_offline_to_online_step( "--class", "feast.ingestion.IngestionJob", "--packages", - "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0", + BQ_SPARK_PACKAGE, jar_path, ] + args, @@ -330,11 +332,7 @@ def _stream_ingestion_step( ], "Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"] + jars_args - + [ - "--packages", - "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0", - jar_path, - ] + + ["--packages", BQ_SPARK_PACKAGE, jar_path] + args, "Jar": "command-runner.jar", }, diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 1e66110b6c8..8fe24ece8ee 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -1,4 +1,3 @@ -import json import os import time import uuid @@ -265,7 +264,7 @@ def _stage_file(self, file_path: str, job_id: str) -> str: return blob_uri_str def dataproc_submit( - self, job_params: SparkJobParameters + self, job_params: SparkJobParameters, extra_properties: Dict[str, str] ) -> Tuple[Job, Callable[[], Job], Callable[[], None]]: local_job_id = str(uuid.uuid4()) main_file_uri = self._stage_file(job_params.get_main_file_path(), local_job_id) @@ -280,18 +279,22 @@ def dataproc_submit( job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash() if job_params.get_class_name(): + properties = { + "spark.yarn.user.classpath.first": "true", + "spark.executor.instances": self.executor_instances, + "spark.executor.cores": self.executor_cores, + "spark.executor.memory": self.executor_memory, + } + + properties.update(extra_properties) + job_config.update( { "spark_job": { "jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS, "main_class": job_params.get_class_name(), "args": job_params.get_arguments(), - "properties": { - "spark.yarn.user.classpath.first": "true", - "spark.executor.instances": self.executor_instances, - "spark.executor.cores": self.executor_cores, - "spark.executor.memory": self.executor_memory, - }, + "properties": properties, } } ) @@ -302,6 +305,7 @@ def dataproc_submit( "main_python_file_uri": main_file_uri, "jar_file_uris": self.EXTERNAL_JARS, "args": job_params.get_arguments(), + "properties": extra_properties if extra_properties else {}, } } ) @@ -332,7 +336,9 @@ def dataproc_cancel(self, job_id): def historical_feature_retrieval( self, job_params: RetrievalJobParameters ) -> RetrievalJob: - job, refresh_fn, cancel_fn = self.dataproc_submit(job_params) + job, refresh_fn, cancel_fn = self.dataproc_submit( + job_params, {"dev.feast.outputuri": job_params.get_destination_path()} + ) return DataprocRetrievalJob( job, refresh_fn, cancel_fn, job_params.get_destination_path() ) @@ -340,13 +346,13 @@ def historical_feature_retrieval( def offline_to_online_ingestion( self, ingestion_job_params: BatchIngestionJobParameters ) -> BatchIngestionJob: - job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params) + job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {}) return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn) def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters ) -> StreamIngestionJob: - job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params) + job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {}) job_hash = ingestion_job_params.get_job_hash() return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash) @@ -368,7 +374,7 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob: cancel_fn = partial(self.dataproc_cancel, job_id) if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower(): - output_path = json.loads(job.pyspark_job.args[-1])["path"] + output_path = job.pyspark_job.properties.get("dev.feast.outputuri") return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path) if job_type == SparkJobType.BATCH_INGESTION.name.lower(): diff --git a/sdk/python/feast/pyspark/launchers/k8s/__init__.py b/sdk/python/feast/pyspark/launchers/k8s/__init__.py new file mode 100644 index 00000000000..816d9880904 --- /dev/null +++ b/sdk/python/feast/pyspark/launchers/k8s/__init__.py @@ -0,0 +1,13 @@ +from .k8s import ( + KubernetesBatchIngestionJob, + KubernetesJobLauncher, + KubernetesRetrievalJob, + KubernetesStreamIngestionJob, +) + +__all__ = [ + "KubernetesRetrievalJob", + "KubernetesBatchIngestionJob", + "KubernetesStreamIngestionJob", + "KubernetesJobLauncher", +] diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s.py b/sdk/python/feast/pyspark/launchers/k8s/k8s.py new file mode 100644 index 00000000000..7e7dbb56c0d --- /dev/null +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s.py @@ -0,0 +1,341 @@ +import random +import string +import time +from io import BytesIO +from pathlib import Path +from typing import Any, Dict, List, Optional, cast +from urllib.parse import urlparse, urlunparse + +import yaml +from kubernetes.client.api import CustomObjectsApi + +from feast.pyspark.abc import ( + BQ_SPARK_PACKAGE, + BatchIngestionJob, + BatchIngestionJobParameters, + JobLauncher, + RetrievalJob, + RetrievalJobParameters, + SparkJob, + SparkJobFailure, + SparkJobStatus, + StreamIngestionJob, + StreamIngestionJobParameters, +) +from feast.staging.storage_client import get_staging_client + +from .k8s_utils import ( + DEFAULT_JOB_TEMPLATE, + HISTORICAL_RETRIEVAL_JOB_TYPE, + METADATA_JOBHASH, + METADATA_OUTPUT_URI, + OFFLINE_TO_ONLINE_JOB_TYPE, + STREAM_TO_ONLINE_JOB_TYPE, + JobInfo, + _cancel_job_by_id, + _get_api, + _get_job_by_id, + _list_jobs, + _prepare_job_resource, + _submit_job, +) + + +def _load_resource_template(job_template_path: Path) -> Dict[str, Any]: + with open(job_template_path, "rt") as f: + return yaml.safe_load(f) + + +def _generate_job_id() -> str: + return "feast-" + "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(8) + ) + + +class KubernetesJobMixin: + def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str): + self._api = api + self._job_id = job_id + self._namespace = namespace + + def get_id(self) -> str: + return self._job_id + + def get_status(self) -> SparkJobStatus: + job = _get_job_by_id(self._api, self._namespace, self._job_id) + assert job is not None + return job.state + + def cancel(self): + _cancel_job_by_id(self._api, self._namespace, self._job_id) + + def _wait_for_complete(self, timeout_seconds: Optional[float]) -> bool: + """ Returns true if the job completed successfully """ + start_time = time.time() + while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds): + status = self.get_status() + if status == SparkJobStatus.COMPLETED: + return True + elif status == SparkJobStatus.FAILED: + return False + else: + time.sleep(1) + else: + raise TimeoutError("Timeout waiting for job to complete") + + +class KubernetesRetrievalJob(KubernetesJobMixin, RetrievalJob): + """ + Historical feature retrieval job result for a k8s cluster + """ + + def __init__( + self, api: CustomObjectsApi, namespace: str, job_id: str, output_file_uri: str + ): + """ + This is the job object representing the historical retrieval job, returned by KubernetesClusterLauncher. + + Args: + output_file_uri (str): Uri to the historical feature retrieval job output file. + """ + super().__init__(api, namespace, job_id) + self._output_file_uri = output_file_uri + + def get_output_file_uri(self, timeout_sec=None, block=True): + if not block: + return self._output_file_uri + + if self._wait_for_complete(timeout_sec): + return self._output_file_uri + else: + raise SparkJobFailure("Spark job failed") + + +class KubernetesBatchIngestionJob(KubernetesJobMixin, BatchIngestionJob): + """ + Ingestion job result for a k8s cluster + """ + + def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str): + super().__init__(api, namespace, job_id) + + +class KubernetesStreamIngestionJob(KubernetesJobMixin, StreamIngestionJob): + """ + Ingestion streaming job for a k8s cluster + """ + + def __init__( + self, api: CustomObjectsApi, namespace: str, job_id: str, job_hash: str + ): + super().__init__(api, namespace, job_id) + self._job_hash = job_hash + + def get_hash(self) -> str: + return self._job_hash + + +class KubernetesJobLauncher(JobLauncher): + """ + Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs. + """ + + def __init__( + self, + namespace: str, + incluster: bool, + staging_location: str, + resource_template_path: Optional[Path], + ): + self._namespace = namespace + self._api = _get_api(incluster=incluster) + self._staging_location = staging_location + if resource_template_path is not None: + self._resource_template = _load_resource_template(resource_template_path) + else: + self._resource_template = yaml.safe_load(DEFAULT_JOB_TEMPLATE) + + def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: + if job_info.job_type == HISTORICAL_RETRIEVAL_JOB_TYPE: + assert METADATA_OUTPUT_URI in job_info.extra_metadata + return KubernetesRetrievalJob( + api=self._api, + namespace=job_info.namespace, + job_id=job_info.job_id, + output_file_uri=job_info.extra_metadata[METADATA_OUTPUT_URI], + ) + elif job_info.job_type == OFFLINE_TO_ONLINE_JOB_TYPE: + return KubernetesBatchIngestionJob( + api=self._api, namespace=job_info.namespace, job_id=job_info.job_id, + ) + elif job_info.job_type == STREAM_TO_ONLINE_JOB_TYPE: + # job_hash must not be None for stream ingestion jobs + assert METADATA_JOBHASH in job_info.extra_metadata + return KubernetesStreamIngestionJob( + api=self._api, + namespace=job_info.namespace, + job_id=job_info.job_id, + job_hash=job_info.extra_metadata[METADATA_JOBHASH], + ) + else: + # We should never get here + raise ValueError(f"Unknown job type {job_info.job_type}") + + def _get_staging_client(self): + uri = urlparse(self._staging_location) + return get_staging_client(uri.scheme) + + def historical_feature_retrieval( + self, job_params: RetrievalJobParameters + ) -> RetrievalJob: + """ + Submits a historical feature retrieval job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + RetrievalJob: wrapper around remote job that returns file uri to the result file. + """ + + with open(job_params.get_main_file_path()) as f: + pyspark_script = f.read() + + pyspark_script_path = urlunparse( + self._get_staging_client().upload_fileobj( + BytesIO(pyspark_script.encode("utf8")), + local_path="historical_retrieval.py", + remote_path_prefix=self._staging_location, + remote_path_suffix=".py", + ) + ) + + job_id = _generate_job_id() + + resource = _prepare_job_resource( + job_template=self._resource_template, + job_id=job_id, + job_type=HISTORICAL_RETRIEVAL_JOB_TYPE, + main_application_file=pyspark_script_path, + main_class=None, + packages=[], + jars=[], + extra_metadata={METADATA_OUTPUT_URI: job_params.get_destination_path()}, + arguments=job_params.get_arguments(), + ) + + job_info = _submit_job( + api=self._api, resource=resource, namespace=self._namespace, + ) + + return cast(RetrievalJob, self._job_from_job_info(job_info)) + + def _upload_jar(self, jar_path: str) -> str: + if jar_path.startswith("s3://") or jar_path.startswith("s3a://"): + return jar_path + elif jar_path.startswith("file://"): + local_jar_path = urlparse(jar_path).path + else: + local_jar_path = jar_path + with open(local_jar_path, "rb") as f: + return urlunparse( + self._get_staging_client().upload_fileobj( + f, + local_jar_path, + remote_path_prefix=self._staging_location, + remote_path_suffix=".jar", + ) + ) + + def offline_to_online_ingestion( + self, ingestion_job_params: BatchIngestionJobParameters + ) -> BatchIngestionJob: + """ + Submits a batch ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + BatchIngestionJob: wrapper around remote job that can be used to check when job completed. + """ + + jar_s3_path = self._upload_jar(ingestion_job_params.get_main_file_path()) + + job_id = _generate_job_id() + + resource = _prepare_job_resource( + job_template=self._resource_template, + job_id=job_id, + job_type=OFFLINE_TO_ONLINE_JOB_TYPE, + main_application_file=jar_s3_path, + main_class=ingestion_job_params.get_class_name(), + packages=[BQ_SPARK_PACKAGE], + jars=[], + extra_metadata={}, + arguments=ingestion_job_params.get_arguments(), + ) + + job_info = _submit_job( + api=self._api, resource=resource, namespace=self._namespace, + ) + + return cast(BatchIngestionJob, self._job_from_job_info(job_info)) + + def start_stream_to_online_ingestion( + self, ingestion_job_params: StreamIngestionJobParameters + ) -> StreamIngestionJob: + """ + Starts a stream ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + StreamIngestionJob: wrapper around remote job. + """ + + jar_s3_path = self._upload_jar(ingestion_job_params.get_main_file_path()) + + extra_jar_paths: List[str] = [] + for extra_jar in ingestion_job_params.get_extra_jar_paths(): + extra_jar_paths.append(self._upload_jar(extra_jar)) + + job_hash = ingestion_job_params.get_job_hash() + job_id = _generate_job_id() + + resource = _prepare_job_resource( + job_template=self._resource_template, + job_id=job_id, + job_type=STREAM_TO_ONLINE_JOB_TYPE, + main_application_file=jar_s3_path, + main_class=ingestion_job_params.get_class_name(), + packages=[BQ_SPARK_PACKAGE], + jars=extra_jar_paths, + extra_metadata={METADATA_JOBHASH: job_hash}, + arguments=ingestion_job_params.get_arguments(), + ) + + job_info = _submit_job( + api=self._api, resource=resource, namespace=self._namespace, + ) + + return cast(StreamIngestionJob, self._job_from_job_info(job_info)) + + def get_job_by_id(self, job_id: str) -> SparkJob: + job_info = _get_job_by_id(self._api, self._namespace, job_id) + if job_info is None: + raise KeyError(f"Job iwth id {job_id} not found") + else: + return self._job_from_job_info(job_info) + + def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + return [ + self._job_from_job_info(job) + for job in _list_jobs(self._api, self._namespace) + if include_terminated + or job.state not in (SparkJobStatus.COMPLETED, SparkJobStatus.FAILED) + ] diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py new file mode 100644 index 00000000000..7afaa64f65d --- /dev/null +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py @@ -0,0 +1,285 @@ +from copy import deepcopy +from typing import Any, Dict, List, NamedTuple, Optional, Tuple + +from kubernetes import client, config +from kubernetes.client.api import CustomObjectsApi + +from feast.pyspark.abc import SparkJobStatus + +__all__ = [ + "_get_api", + "_cancel_job_by_id", + "_prepare_job_resource", + "_list_jobs", + "_get_job_by_id", + "STREAM_TO_ONLINE_JOB_TYPE", + "OFFLINE_TO_ONLINE_JOB_TYPE", + "HISTORICAL_RETRIEVAL_JOB_TYPE", + "METADATA_JOBHASH", + "METADATA_OUTPUT_URI", + "JobInfo", +] + +STREAM_TO_ONLINE_JOB_TYPE = "STREAM_TO_ONLINE_JOB" +OFFLINE_TO_ONLINE_JOB_TYPE = "OFFLINE_TO_ONLINE_JOB" +HISTORICAL_RETRIEVAL_JOB_TYPE = "HISTORICAL_RETRIEVAL_JOB" + +LABEL_JOBID = "feast.dev/jobid" +LABEL_JOBTYPE = "feast.dev/type" + +# Can't store these bits of info in k8s labels due to 64-character limit, so we store them as +# sparkConf +METADATA_OUTPUT_URI = "dev.feast.outputuri" +METADATA_JOBHASH = "dev.feast.jobhash" + +METADATA_KEYS = set((METADATA_JOBHASH, METADATA_OUTPUT_URI)) + + +def _append_items(resource: Dict[str, Any], path: Tuple[str, ...], items: List[Any]): + """ A helper function to manipulate k8s resource configs. It updates an array in resource + definition given a jsonpath-like path. Will not update resource if items is empty. + Note that it updates resource dict in-place. + + Examples: + >>> _append_items({}, ("foo", "bar"), ["A", "B"]) + {'foo': {'bar': ['A', 'B']}} + + >>> _append_items({"foo": {"bar" : ["C"]}}, ("foo", "bar"), ["A", "B"]) + {'foo': {'bar': ['C', 'A', 'B']}} + + >>> _append_items({}, ("foo", "bar"), []) + {} + """ + + if not items: + return resource + + obj = resource + for i, p in enumerate(path): + if p not in obj: + if i == len(path) - 1: + obj[p] = [] + else: + obj[p] = {} + obj = obj[p] + assert isinstance(obj, list) + obj.extend(items) + return resource + + +def _add_keys(resource: Dict[str, Any], path: Tuple[str, ...], items: Dict[str, Any]): + """ A helper function to manipulate k8s resource configs. It will update a dict in resource + definition given a path (think jsonpath). Will ignore items set to None. Will not update + resource if all items are None. Note that it updates resource dict in-place. + + Examples: + >>> _add_keys({}, ("foo", "bar"), {"A": 1, "B": 2}) + {'foo': {'bar': {'A': 1, 'B': 2}}} + + >>> _add_keys({}, ("foo", "bar"), {"A": 1, "B": None}) + {'foo': {'bar': {'A': 1}}} + + >>> _add_keys({}, ("foo", "bar"), {"A": None, "B": None}) + {} + """ + + if not any(i is not None for i in items.values()): + return resource + + obj = resource + for p in path: + if p not in obj: + obj[p] = {} + obj = obj[p] + + for k, v in items.items(): + if v is not None: + obj[k] = v + return resource + + +def _job_id_to_resource_name(job_id: str) -> str: + return job_id + + +def _prepare_job_resource( + job_template: Dict[str, Any], + job_id: str, + job_type: str, + main_application_file: str, + main_class: Optional[str], + packages: List[str], + jars: List[str], + extra_metadata: Dict[str, str], + arguments: List[str], +) -> Dict[str, Any]: + """ Prepare SparkApplication custom resource configs """ + job = deepcopy(job_template) + + labels = {LABEL_JOBID: job_id, LABEL_JOBTYPE: job_type} + + _add_keys(job, ("metadata", "labels"), labels) + _add_keys(job, ("metadata",), dict(name=_job_id_to_resource_name(job_id))) + _add_keys(job, ("spec",), dict(mainClass=main_class)) + _add_keys(job, ("spec",), dict(mainApplicationFile=main_application_file)) + _add_keys(job, ("spec",), dict(arguments=arguments)) + + _add_keys(job, ("spec", "sparkConf"), extra_metadata) + + _append_items(job, ("spec", "deps", "packages"), packages) + _append_items(job, ("spec", "deps", "jars"), jars) + + return job + + +def _get_api(incluster: bool) -> CustomObjectsApi: + # Configs can be set in Configuration class directly or using helper utility + if not incluster: + config.load_kube_config() + else: + config.load_incluster_config() + + return client.CustomObjectsApi() + + +def _crd_args(namespace: str) -> Dict[str, str]: + return dict( + group="sparkoperator.k8s.io", + version="v1beta2", + namespace=namespace, + plural="sparkapplications", + ) + + +class JobInfo(NamedTuple): + job_id: str + job_type: str + namespace: str + extra_metadata: Dict[str, str] + state: SparkJobStatus + + +STATE_MAP = { + "": SparkJobStatus.STARTING, + "SUBMITTED": SparkJobStatus.STARTING, + "RUNNING": SparkJobStatus.IN_PROGRESS, + "COMPLETED": SparkJobStatus.COMPLETED, + "FAILED": SparkJobStatus.FAILED, + "SUBMISSION_FAILED": SparkJobStatus.FAILED, + "PENDING_RERUN": SparkJobStatus.STARTING, + "INVALIDATING": SparkJobStatus.STARTING, + "SUCCEEDING": SparkJobStatus.IN_PROGRESS, + "FAILING": SparkJobStatus.FAILED, +} + + +def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus: + return STATE_MAP[k8s_state] + + +def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo: + labels = resource["metadata"]["labels"] + sparkConf = resource["spec"].get("sparkConf") + + if "status" in resource: + state = _k8s_state_to_feast(resource["status"]["applicationState"]["state"]) + else: + state = _k8s_state_to_feast("") + + return JobInfo( + job_id=labels[LABEL_JOBID], + job_type=labels.get(LABEL_JOBTYPE, ""), + namespace=resource["metadata"].get("namespace", "default"), + extra_metadata={k: v for k, v in sparkConf.items() if k in METADATA_KEYS}, + state=state, + ) + + +def _submit_job(api: CustomObjectsApi, resource, namespace: str) -> JobInfo: + # create the resource + response = api.create_namespaced_custom_object( + **_crd_args(namespace), body=resource, + ) + return _resource_to_job_info(response) + + +def _list_jobs(api: CustomObjectsApi, namespace: str) -> List[JobInfo]: + response = api.list_namespaced_custom_object( + **_crd_args(namespace), label_selector=LABEL_JOBID, + ) + + result = [] + for item in response["items"]: + result.append(_resource_to_job_info(item)) + return result + + +def _get_job_by_id( + api: CustomObjectsApi, namespace: str, job_id: str +) -> Optional[JobInfo]: + try: + response = api.get_namespaced_custom_object( + **_crd_args(namespace), name=_job_id_to_resource_name(job_id) + ) + + return _resource_to_job_info(response) + except client.ApiException as e: + if e.status == 404: + return None + else: + raise + + +def _cancel_job_by_id(api: CustomObjectsApi, namespace: str, job_id: str): + try: + api.delete_namespaced_custom_object( + **_crd_args(namespace), name=_job_id_to_resource_name(job_id), + ) + except client.ApiException as e: + if e.status == 404: + return None + else: + raise + + +DEFAULT_JOB_TEMPLATE = """ + +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + namespace: default +spec: + type: Scala + mode: cluster + image: "gcr.io/kf-feast/spark-py:v3.0.1" + imagePullPolicy: Always + sparkVersion: "3.0.1" + timeToLiveSeconds: 3600 + pythonVersion: "3" + restartPolicy: + type: Never + volumes: + - name: "test-volume" + hostPath: + path: "/tmp" + type: Directory + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + labels: + version: 3.0.1 + serviceAccount: spark + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + executor: + cores: 1 + instances: 1 + memory: "512m" + labels: + version: 3.0.1 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" +""" diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 1dfab4d6770..88a0ee1cae2 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -10,6 +10,7 @@ from requests.exceptions import RequestException from feast.pyspark.abc import ( + BQ_SPARK_PACKAGE, BatchIngestionJob, BatchIngestionJobParameters, JobLauncher, @@ -222,8 +223,6 @@ class StandaloneClusterLauncher(JobLauncher): Submits jobs to a standalone Spark cluster in client mode. """ - BQ_CONNECTOR_VERSION = "2.12:0.18.0" - def __init__(self, master_url: str, spark_home: str = None): """ This launcher executes the spark-submit script in a subprocess. The subprocess @@ -273,7 +272,7 @@ def spark_submit( "--conf", "spark.sql.session.timeZone=UTC", # ignore local timezone "--packages", - f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}", + BQ_SPARK_PACKAGE, "--jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar," "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar," diff --git a/sdk/python/feast/staging/storage_client.py b/sdk/python/feast/staging/storage_client.py index d7c4055f40f..68406a27572 100644 --- a/sdk/python/feast/staging/storage_client.py +++ b/sdk/python/feast/staging/storage_client.py @@ -29,6 +29,7 @@ GS = "gs" S3 = "s3" +S3A = "s3a" LOCAL_FILE = "file" @@ -213,7 +214,7 @@ class S3Client(AbstractStagingClient): Implementation of AbstractStagingClient for Aws S3 storage """ - def __init__(self, endpoint_url: str = None): + def __init__(self, endpoint_url: str = None, url_scheme="s3"): try: import boto3 except ImportError: @@ -222,6 +223,7 @@ def __init__(self, endpoint_url: str = None): "run ```pip install boto3```" ) self.s3_client = boto3.client("s3", endpoint_url=endpoint_url) + self.url_scheme = url_scheme def download_file(self, uri: ParseResult) -> IO[bytes]: """ @@ -258,12 +260,12 @@ def list_files(self, bucket: str, path: str) -> List[str]: ) # File path should not be in path (file path must be longer than path) return [ - f"{S3}://{bucket}/{file}" + f"{self.url_scheme}://{bucket}/{file}" for file in [x["Key"] for x in blob_list["Contents"]] if re.match(regex, file) and file not in path ] else: - return [f"{S3}://{bucket}/{path.lstrip('/')}"] + return [f"{self.url_scheme}://{bucket}/{path.lstrip('/')}"] def _uri_to_bucket_key(self, remote_path: ParseResult) -> Tuple[str, str]: assert remote_path.hostname is not None @@ -367,6 +369,14 @@ def _s3_client(config: Config = None): return S3Client(endpoint_url=endpoint_url) +def _s3a_client(config: Config = None): + if config is None: + endpoint_url = None + else: + endpoint_url = config.get(opt.S3_ENDPOINT_URL, None) + return S3Client(endpoint_url=endpoint_url, url_scheme="s3a") + + def _gcs_client(config: Config = None): return GCSClient() @@ -375,7 +385,12 @@ def _local_fs_client(config: Config = None): return LocalFSClient() -storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client} +storage_clients = { + GS: _gcs_client, + S3: _s3_client, + S3A: _s3a_client, + LOCAL_FILE: _local_fs_client, +} def get_staging_client(scheme, config: Config = None) -> AbstractStagingClient: diff --git a/sdk/python/requirements-dev.txt b/sdk/python/requirements-dev.txt index 2266ee7c042..84a2ab39046 100644 --- a/sdk/python/requirements-dev.txt +++ b/sdk/python/requirements-dev.txt @@ -40,3 +40,4 @@ boto3 moto pyspark==3.0.1 pyspark-stubs==3.0.0.post1 +kubernetes==12.0.* diff --git a/sdk/python/setup.py b/sdk/python/setup.py index ef62707c128..a1be4b05807 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -44,6 +44,7 @@ "pyarrow==2.0.0", "numpy", "google", + "kubernetes==12.0.*", ] # README file from Feast repo root directory