Skip to content

Commit

Permalink
Add spark k8s operator launcher (#1225)
Browse files Browse the repository at this point in the history
* support spark k8s operator launcher

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* make bq package path constant

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* address more comments

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev authored Dec 16, 2020
1 parent d1b1aeb commit 92800d9
Show file tree
Hide file tree
Showing 13 changed files with 725 additions and 33 deletions.
9 changes: 9 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ class ConfigOptions(metaclass=ConfigMeta):
#: No. of executor memory for Dataproc cluster
DATAPROC_EXECUTOR_MEMORY = "2g"

# namespace to use for Spark jobs launched using k8s spark operator
SPARK_K8S_NAMESPACE = "default"

# expect k8s spark operator to be running in the same cluster as Feast
SPARK_K8S_USE_INCLUSTER_CONFIG = True

# SparkApplication resource template
SPARK_K8S_JOB_TEMPLATE_PATH = None

#: File format of historical retrieval features
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"

Expand Down
15 changes: 11 additions & 4 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import json
import os
from base64 import b64encode
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
Expand All @@ -15,6 +16,9 @@ class SparkJobFailure(Exception):
pass


BQ_SPARK_PACKAGE = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0"


class SparkJobStatus(Enum):
STARTING = 0
IN_PROGRESS = 1
Expand Down Expand Up @@ -243,15 +247,18 @@ def get_main_file_path(self) -> str:
)

def get_arguments(self) -> List[str]:
def json_b64_encode(obj) -> str:
return b64encode(json.dumps(obj).encode("utf8")).decode("ascii")

return [
"--feature-tables",
json.dumps(self._feature_tables),
json_b64_encode(self._feature_tables),
"--feature-tables-sources",
json.dumps(self._feature_tables_sources),
json_b64_encode(self._feature_tables_sources),
"--entity-source",
json.dumps(self._entity_source),
json_b64_encode(self._entity_source),
"--destination",
json.dumps(self._destination),
json_b64_encode(self._destination),
]

def get_destination_path(self) -> str:
Expand Down
13 changes: 9 additions & 4 deletions sdk/python/feast/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import argparse
import json
from base64 import b64decode
from datetime import timedelta
from typing import Any, Dict, List, NamedTuple, Optional

Expand Down Expand Up @@ -794,13 +795,17 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable:
)


def json_b64_decode(s: str) -> Any:
return json.loads(b64decode(s.encode("ascii")))


if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
args = _get_args()
feature_tables_conf = json.loads(args.feature_tables)
feature_tables_sources_conf = json.loads(args.feature_tables_sources)
entity_source_conf = json.loads(args.entity_source)
destination_conf = json.loads(args.destination)
feature_tables_conf = json_b64_decode(args.feature_tables)
feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources)
entity_source_conf = json_b64_decode(args.entity_source)
destination_conf = json_b64_decode(args.destination)
start_job(
spark,
entity_source_conf,
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,22 @@ def _get_optional(option):
)


def _k8s_launcher(config: Config) -> JobLauncher:
from feast.pyspark.launchers import k8s

return k8s.KubernetesJobLauncher(
namespace=config.get(opt.SPARK_K8S_NAMESPACE),
resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None),
staging_location=config.get(opt.SPARK_STAGING_LOCATION),
incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG),
)


_launchers = {
"standalone": _standalone_launcher,
"dataproc": _dataproc_launcher,
"emr": _emr_launcher,
"k8s": _k8s_launcher,
}


Expand Down
10 changes: 4 additions & 6 deletions sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import yaml

from feast.pyspark.abc import BQ_SPARK_PACKAGE

__all__ = [
"FAILED_STEP_STATES",
"HISTORICAL_RETRIEVAL_JOB_TYPE",
Expand Down Expand Up @@ -107,7 +109,7 @@ def _sync_offline_to_online_step(
"--class",
"feast.ingestion.IngestionJob",
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0",
BQ_SPARK_PACKAGE,
jar_path,
]
+ args,
Expand Down Expand Up @@ -330,11 +332,7 @@ def _stream_ingestion_step(
],
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"]
+ jars_args
+ [
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0",
jar_path,
]
+ ["--packages", BQ_SPARK_PACKAGE, jar_path]
+ args,
"Jar": "command-runner.jar",
},
Expand Down
30 changes: 18 additions & 12 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import time
import uuid
Expand Down Expand Up @@ -265,7 +264,7 @@ def _stage_file(self, file_path: str, job_id: str) -> str:
return blob_uri_str

def dataproc_submit(
self, job_params: SparkJobParameters
self, job_params: SparkJobParameters, extra_properties: Dict[str, str]
) -> Tuple[Job, Callable[[], Job], Callable[[], None]]:
local_job_id = str(uuid.uuid4())
main_file_uri = self._stage_file(job_params.get_main_file_path(), local_job_id)
Expand All @@ -280,18 +279,22 @@ def dataproc_submit(
job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash()

if job_params.get_class_name():
properties = {
"spark.yarn.user.classpath.first": "true",
"spark.executor.instances": self.executor_instances,
"spark.executor.cores": self.executor_cores,
"spark.executor.memory": self.executor_memory,
}

properties.update(extra_properties)

job_config.update(
{
"spark_job": {
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
"main_class": job_params.get_class_name(),
"args": job_params.get_arguments(),
"properties": {
"spark.yarn.user.classpath.first": "true",
"spark.executor.instances": self.executor_instances,
"spark.executor.cores": self.executor_cores,
"spark.executor.memory": self.executor_memory,
},
"properties": properties,
}
}
)
Expand All @@ -302,6 +305,7 @@ def dataproc_submit(
"main_python_file_uri": main_file_uri,
"jar_file_uris": self.EXTERNAL_JARS,
"args": job_params.get_arguments(),
"properties": extra_properties if extra_properties else {},
}
}
)
Expand Down Expand Up @@ -332,21 +336,23 @@ def dataproc_cancel(self, job_id):
def historical_feature_retrieval(
self, job_params: RetrievalJobParameters
) -> RetrievalJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(job_params)
job, refresh_fn, cancel_fn = self.dataproc_submit(
job_params, {"dev.feast.outputuri": job_params.get_destination_path()}
)
return DataprocRetrievalJob(
job, refresh_fn, cancel_fn, job_params.get_destination_path()
)

def offline_to_online_ingestion(
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params)
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn)

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params)
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
job_hash = ingestion_job_params.get_job_hash()
return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash)

Expand All @@ -368,7 +374,7 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob:
cancel_fn = partial(self.dataproc_cancel, job_id)

if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower():
output_path = json.loads(job.pyspark_job.args[-1])["path"]
output_path = job.pyspark_job.properties.get("dev.feast.outputuri")
return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path)

if job_type == SparkJobType.BATCH_INGESTION.name.lower():
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/feast/pyspark/launchers/k8s/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .k8s import (
KubernetesBatchIngestionJob,
KubernetesJobLauncher,
KubernetesRetrievalJob,
KubernetesStreamIngestionJob,
)

__all__ = [
"KubernetesRetrievalJob",
"KubernetesBatchIngestionJob",
"KubernetesStreamIngestionJob",
"KubernetesJobLauncher",
]
Loading

0 comments on commit 92800d9

Please # to comment.