Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add spark k8s operator launcher #1225

Merged
merged 3 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ class ConfigOptions(metaclass=ConfigMeta):
#: No. of executor memory for Dataproc cluster
DATAPROC_EXECUTOR_MEMORY = "2g"

# namespace to use for Spark jobs launched using k8s spark operator
SPARK_K8S_NAMESPACE = "default"

# expect k8s spark operator to be running in the same cluster as Feast
SPARK_K8S_USE_INCLUSTER_CONFIG = True

# SparkApplication resource template
SPARK_K8S_JOB_TEMPLATE_PATH = None

#: File format of historical retrieval features
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"

Expand Down
15 changes: 11 additions & 4 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import json
import os
from base64 import b64encode
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
Expand All @@ -15,6 +16,9 @@ class SparkJobFailure(Exception):
pass


BQ_SPARK_PACKAGE = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0"


class SparkJobStatus(Enum):
STARTING = 0
IN_PROGRESS = 1
Expand Down Expand Up @@ -243,15 +247,18 @@ def get_main_file_path(self) -> str:
)

def get_arguments(self) -> List[str]:
def json_b64_encode(obj) -> str:
return b64encode(json.dumps(obj).encode("utf8")).decode("ascii")

return [
"--feature-tables",
json.dumps(self._feature_tables),
json_b64_encode(self._feature_tables),
"--feature-tables-sources",
json.dumps(self._feature_tables_sources),
json_b64_encode(self._feature_tables_sources),
"--entity-source",
json.dumps(self._entity_source),
json_b64_encode(self._entity_source),
"--destination",
json.dumps(self._destination),
json_b64_encode(self._destination),
]

def get_destination_path(self) -> str:
Expand Down
13 changes: 9 additions & 4 deletions sdk/python/feast/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import argparse
import json
from base64 import b64decode
from datetime import timedelta
from typing import Any, Dict, List, NamedTuple, Optional

Expand Down Expand Up @@ -794,13 +795,17 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable:
)


def json_b64_decode(s: str) -> Any:
return json.loads(b64decode(s.encode("ascii")))


if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
args = _get_args()
feature_tables_conf = json.loads(args.feature_tables)
feature_tables_sources_conf = json.loads(args.feature_tables_sources)
entity_source_conf = json.loads(args.entity_source)
destination_conf = json.loads(args.destination)
feature_tables_conf = json_b64_decode(args.feature_tables)
feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources)
entity_source_conf = json_b64_decode(args.entity_source)
destination_conf = json_b64_decode(args.destination)
start_job(
spark,
entity_source_conf,
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,22 @@ def _get_optional(option):
)


def _k8s_launcher(config: Config) -> JobLauncher:
from feast.pyspark.launchers import k8s

return k8s.KubernetesJobLauncher(
namespace=config.get(opt.SPARK_K8S_NAMESPACE),
resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None),
staging_location=config.get(opt.SPARK_STAGING_LOCATION),
incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG),
)


_launchers = {
"standalone": _standalone_launcher,
"dataproc": _dataproc_launcher,
"emr": _emr_launcher,
"k8s": _k8s_launcher,
}


Expand Down
10 changes: 4 additions & 6 deletions sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import yaml

from feast.pyspark.abc import BQ_SPARK_PACKAGE

__all__ = [
"FAILED_STEP_STATES",
"HISTORICAL_RETRIEVAL_JOB_TYPE",
Expand Down Expand Up @@ -107,7 +109,7 @@ def _sync_offline_to_online_step(
"--class",
"feast.ingestion.IngestionJob",
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0",
BQ_SPARK_PACKAGE,
jar_path,
]
+ args,
Expand Down Expand Up @@ -330,11 +332,7 @@ def _stream_ingestion_step(
],
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"]
+ jars_args
+ [
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.0",
jar_path,
]
+ ["--packages", BQ_SPARK_PACKAGE, jar_path]
+ args,
"Jar": "command-runner.jar",
},
Expand Down
30 changes: 18 additions & 12 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import time
import uuid
Expand Down Expand Up @@ -265,7 +264,7 @@ def _stage_file(self, file_path: str, job_id: str) -> str:
return blob_uri_str

def dataproc_submit(
self, job_params: SparkJobParameters
self, job_params: SparkJobParameters, extra_properties: Dict[str, str]
Copy link
Member

@woop woop Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you saw #1198. Should we close that PR after yours has been merged, or first merge that one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No i haven't. It looks like it is solving a slightly different problem, namely user-specified extra options. Don't feel strongly which one to merge first (and which one to rebase).

) -> Tuple[Job, Callable[[], Job], Callable[[], None]]:
local_job_id = str(uuid.uuid4())
main_file_uri = self._stage_file(job_params.get_main_file_path(), local_job_id)
Expand All @@ -280,18 +279,22 @@ def dataproc_submit(
job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash()

if job_params.get_class_name():
properties = {
"spark.yarn.user.classpath.first": "true",
"spark.executor.instances": self.executor_instances,
"spark.executor.cores": self.executor_cores,
"spark.executor.memory": self.executor_memory,
}

properties.update(extra_properties)

job_config.update(
{
"spark_job": {
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
"main_class": job_params.get_class_name(),
"args": job_params.get_arguments(),
"properties": {
"spark.yarn.user.classpath.first": "true",
"spark.executor.instances": self.executor_instances,
"spark.executor.cores": self.executor_cores,
"spark.executor.memory": self.executor_memory,
},
"properties": properties,
}
}
)
Expand All @@ -302,6 +305,7 @@ def dataproc_submit(
"main_python_file_uri": main_file_uri,
"jar_file_uris": self.EXTERNAL_JARS,
"args": job_params.get_arguments(),
"properties": extra_properties if extra_properties else {},
}
}
)
Expand Down Expand Up @@ -332,21 +336,23 @@ def dataproc_cancel(self, job_id):
def historical_feature_retrieval(
self, job_params: RetrievalJobParameters
) -> RetrievalJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(job_params)
job, refresh_fn, cancel_fn = self.dataproc_submit(
job_params, {"dev.feast.outputuri": job_params.get_destination_path()}
)
return DataprocRetrievalJob(
job, refresh_fn, cancel_fn, job_params.get_destination_path()
)

def offline_to_online_ingestion(
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params)
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn)

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params)
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
job_hash = ingestion_job_params.get_job_hash()
return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash)

Expand All @@ -368,7 +374,7 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob:
cancel_fn = partial(self.dataproc_cancel, job_id)

if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower():
output_path = json.loads(job.pyspark_job.args[-1])["path"]
output_path = job.pyspark_job.properties.get("dev.feast.outputuri")
return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path)

if job_type == SparkJobType.BATCH_INGESTION.name.lower():
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/feast/pyspark/launchers/k8s/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .k8s import (
KubernetesBatchIngestionJob,
KubernetesJobLauncher,
KubernetesRetrievalJob,
KubernetesStreamIngestionJob,
)

__all__ = [
"KubernetesRetrievalJob",
"KubernetesBatchIngestionJob",
"KubernetesStreamIngestionJob",
"KubernetesJobLauncher",
]
Loading