Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add feature table name filter to jobs list api #1282

Merged
merged 10 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ message StartStreamToOnlineIngestionJobResponse {

message ListJobsRequest {
bool include_terminated = 1;
string table_name = 2;
}

message ListJobsResponse {
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,11 +1211,15 @@ def start_stream_to_online_ingestion(
response.log_uri,
)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
def list_jobs(
self, include_terminated: bool, table_name: Optional[str] = None
) -> List[SparkJob]:
if not self._use_job_service:
return list_jobs(include_terminated, self)
return list_jobs(include_terminated, self, table_name)
else:
request = ListJobsRequest(include_terminated=include_terminated)
request = ListJobsRequest(
include_terminated=include_terminated, table_name=table_name
)
response = self._job_service.ListJobs(request)
return [
get_remote_job_from_proto(
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ def StartStreamToOnlineIngestionJob(
def ListJobs(self, request, context):
"""List all types of jobs"""
jobs = list_jobs(
include_terminated=request.include_terminated, client=self.client
include_terminated=request.include_terminated,
table_name=request.table_name,
client=self.client,
)
return ListJobsResponse(jobs=[_job_to_proto(job) for job in jobs])

Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,5 +608,7 @@ def get_job_by_id(self, job_id: str) -> SparkJob:
raise NotImplementedError

@abc.abstractmethod
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
def list_jobs(
self, include_terminated: bool, table_name: Optional[str]
) -> List[SparkJob]:
raise NotImplementedError
10 changes: 7 additions & 3 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import tempfile
from datetime import datetime
from typing import TYPE_CHECKING, List, Union
from typing import TYPE_CHECKING, List, Optional, Union
from urllib.parse import urlparse, urlunparse

from feast.config import Config
Expand Down Expand Up @@ -318,9 +318,13 @@ def start_stream_to_online_ingestion(
)


def list_jobs(include_terminated: bool, client: "Client") -> List[SparkJob]:
def list_jobs(
include_terminated: bool, client: "Client", table_name: Optional[str] = None
) -> List[SparkJob]:
launcher = resolve_launcher(client._config)
return launcher.list_jobs(include_terminated=include_terminated)
return launcher.list_jobs(
include_terminated=include_terminated, table_name=table_name
)


def get_job_by_id(job_id: str, client: "Client") -> SparkJob:
Expand Down
7 changes: 5 additions & 2 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,15 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob:
# We should never get here
raise ValueError(f"Unknown job type {job_info.job_type}")

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
def list_jobs(
self, include_terminated: bool, table_name: Optional[str] = None
) -> List[SparkJob]:
"""
Find EMR job by a string id.

Args:
include_terminated: whether to include terminated jobs.
table_name: FeatureTable name to filter by

Returns:
A list of SparkJob instances.
Expand All @@ -366,7 +369,7 @@ def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
jobs = _list_jobs(
emr_client=self._emr_client(),
job_type=None,
table_name=None,
table_name=table_name,
active_only=not include_terminated,
)

Expand Down
27 changes: 19 additions & 8 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from feast.staging.storage_client import get_staging_client


def _truncate_label(label: str) -> str:
return label[:63]


class DataprocJobMixin:
def __init__(
self,
Expand Down Expand Up @@ -311,16 +315,16 @@ def dataproc_submit(
}

if isinstance(job_params, StreamIngestionJobParameters):
job_config["labels"][
self.FEATURE_TABLE_LABEL_KEY
] = job_params.get_feature_table_name()
job_config["labels"][self.FEATURE_TABLE_LABEL_KEY] = _truncate_label(
job_params.get_feature_table_name()
)
# Add job hash to labels only for the stream ingestion job
job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash()

if isinstance(job_params, BatchIngestionJobParameters):
job_config["labels"][
self.FEATURE_TABLE_LABEL_KEY
] = job_params.get_feature_table_name()
job_config["labels"][self.FEATURE_TABLE_LABEL_KEY] = _truncate_label(
job_params.get_feature_table_name()
)

if job_params.get_class_name():
scala_job_properties = {
Expand Down Expand Up @@ -477,10 +481,17 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob:

raise ValueError(f"Unrecognized job type: {job_type}")

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
def list_jobs(
self, include_terminated: bool, table_name: Optional[str] = None
) -> List[SparkJob]:
job_filter = f"labels.{self.JOB_TYPE_LABEL_KEY} = * AND clusterName = {self.cluster_name}"
if table_name:
job_filter = (
job_filter
+ f" AND labels.{self.FEATURE_TABLE_LABEL_KEY} = {_truncate_label(table_name)}"
)
if not include_terminated:
job_filter = job_filter + "AND status.state = ACTIVE"
job_filter = job_filter + " AND status.state = ACTIVE"
return [
self._dataproc_job_to_spark_job(job)
for job in self.job_client.list_jobs(
Expand Down
30 changes: 26 additions & 4 deletions sdk/python/feast/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import random
import string
import time
Expand Down Expand Up @@ -29,6 +30,7 @@
DEFAULT_JOB_TEMPLATE,
HISTORICAL_RETRIEVAL_JOB_TYPE,
LABEL_FEATURE_TABLE,
LABEL_FEATURE_TABLE_HASH,
METADATA_JOBHASH,
METADATA_OUTPUT_URI,
OFFLINE_TO_ONLINE_JOB_TYPE,
Expand All @@ -54,6 +56,14 @@ def _generate_job_id() -> str:
)


def _truncate_label(label: str) -> str:
return label[:63]


def _generate_table_hash(table_name: str) -> str:
return hashlib.md5(table_name.encode()).hexdigest()


class KubernetesJobMixin:
def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str):
self._api = api
Expand Down Expand Up @@ -327,7 +337,12 @@ def offline_to_online_ingestion(
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
extra_labels={
LABEL_FEATURE_TABLE: ingestion_job_params.get_feature_table_name()
LABEL_FEATURE_TABLE: _truncate_label(
ingestion_job_params.get_feature_table_name()
),
LABEL_FEATURE_TABLE_HASH: _generate_table_hash(
ingestion_job_params.get_feature_table_name()
),
},
)

Expand Down Expand Up @@ -373,7 +388,12 @@ def start_stream_to_online_ingestion(
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
extra_labels={
LABEL_FEATURE_TABLE: ingestion_job_params.get_feature_table_name()
LABEL_FEATURE_TABLE: _truncate_label(
ingestion_job_params.get_feature_table_name()
),
LABEL_FEATURE_TABLE_HASH: _generate_table_hash(
ingestion_job_params.get_feature_table_name()
),
},
)

Expand All @@ -390,10 +410,12 @@ def get_job_by_id(self, job_id: str) -> SparkJob:
else:
return self._job_from_job_info(job_info)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
def list_jobs(
self, include_terminated: bool, table_name: Optional[str] = None
) -> List[SparkJob]:
return [
self._job_from_job_info(job)
for job in _list_jobs(self._api, self._namespace)
for job in _list_jobs(self._api, self._namespace, table_name)
if include_terminated
or job.state not in (SparkJobStatus.COMPLETED, SparkJobStatus.FAILED)
]
24 changes: 19 additions & 5 deletions sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
Expand Down Expand Up @@ -28,6 +29,7 @@
LABEL_JOBID = "feast.dev/jobid"
LABEL_JOBTYPE = "feast.dev/type"
LABEL_FEATURE_TABLE = "feast.dev/table"
LABEL_FEATURE_TABLE_HASH = "feast.dev/tablehash"

# Can't store these bits of info in k8s labels due to 64-character limit, so we store them as
# sparkConf
Expand Down Expand Up @@ -222,12 +224,24 @@ def _submit_job(api: CustomObjectsApi, resource, namespace: str) -> JobInfo:
return _resource_to_job_info(response)


def _list_jobs(api: CustomObjectsApi, namespace: str) -> List[JobInfo]:
response = api.list_namespaced_custom_object(
**_crd_args(namespace), label_selector=LABEL_JOBID,
)

def _list_jobs(
api: CustomObjectsApi, namespace: str, table_name: Optional[str] = None
) -> List[JobInfo]:
result = []

# Batch, Streaming Ingestion jobs
if table_name:
table_name_hash = hashlib.md5(table_name.encode()).hexdigest()
response = api.list_namespaced_custom_object(
**_crd_args(namespace),
label_selector=f"{LABEL_FEATURE_TABLE_HASH}={table_name_hash}",
)
else:
# Retrieval jobs
response = api.list_namespaced_custom_object(
**_crd_args(namespace), label_selector=LABEL_JOBID,
)

for item in response["items"]:
result.append(_resource_to_job_info(item))
return result
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ def start_stream_to_online_ingestion(
def get_job_by_id(self, job_id: str) -> SparkJob:
return global_job_cache.get_job_by_id(job_id)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
def list_jobs(
self, include_terminated: bool, table_name: Optional[str]
) -> List[SparkJob]:
if include_terminated is True:
return global_job_cache.list_jobs()
else:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pandas~=1.0.0
mock==2.0.0
pandavro==1.5.*
moto
mypy
mypy==0.790
mypy-protobuf
avro==1.10.0
gcsfs
Expand Down
27 changes: 26 additions & 1 deletion tests/e2e/test_online_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from feast.data_format import AvroFormat, ParquetFormat
from feast.pyspark.abc import SparkJobStatus
from feast.wait import wait_retry_backoff
from tests.e2e.utils.common import create_schema, start_job, stop_job
from tests.e2e.utils.kafka import check_consumer_exist, ingest_and_retrieve


Expand Down Expand Up @@ -128,7 +129,7 @@ def test_streaming_ingestion(
job = feast_client.start_stream_to_online_ingestion(feature_table)
assert job.get_feature_table() == feature_table.name
wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 180
)
else:
job = None
Expand Down Expand Up @@ -191,6 +192,30 @@ def ingest_and_verify(
)


def test_list_jobs_long_table_name(feast_client: Client, kafka_server, pytestconfig):
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
topic_name = f"avro-{uuid.uuid4()}"

entity, feature_table = create_schema(
kafka_broker,
topic_name,
"just1a2featuretable3with4a5really6really7really8really9really10really11really12long13name",
)

feast_client.apply(entity)
feast_client.apply(feature_table)

job = start_job(feast_client, feature_table, pytestconfig)
all_job_ids = [
job.get_id()
for job in feast_client.list_jobs(
include_terminated=True, table_name=feature_table.name
)
]
assert job.get_id() in all_job_ids
stop_job(job, feast_client, feature_table)


def avro_schema():
return json.dumps(
{
Expand Down
Loading