diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto index baeae3ee884..d79cb9e104a 100644 --- a/protos/feast/core/JobService.proto +++ b/protos/feast/core/JobService.proto @@ -188,6 +188,7 @@ message StartStreamToOnlineIngestionJobResponse { message ListJobsRequest { bool include_terminated = 1; + string table_name = 2; } message ListJobsResponse { diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 9b7dedf9c28..31af5f56f2f 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -1211,11 +1211,15 @@ def start_stream_to_online_ingestion( response.log_uri, ) - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + def list_jobs( + self, include_terminated: bool, table_name: Optional[str] = None + ) -> List[SparkJob]: if not self._use_job_service: - return list_jobs(include_terminated, self) + return list_jobs(include_terminated, self, table_name) else: - request = ListJobsRequest(include_terminated=include_terminated) + request = ListJobsRequest( + include_terminated=include_terminated, table_name=table_name + ) response = self._job_service.ListJobs(request) return [ get_remote_job_from_proto( diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 786e6ea2fe6..bf9f67df584 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -188,7 +188,9 @@ def StartStreamToOnlineIngestionJob( def ListJobs(self, request, context): """List all types of jobs""" jobs = list_jobs( - include_terminated=request.include_terminated, client=self.client + include_terminated=request.include_terminated, + table_name=request.table_name, + client=self.client, ) return ListJobsResponse(jobs=[_job_to_proto(job) for job in jobs]) diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index 04aa697e3ee..5fc90e0ec43 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -608,5 +608,7 @@ def get_job_by_id(self, job_id: str) -> SparkJob: raise NotImplementedError @abc.abstractmethod - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + def list_jobs( + self, include_terminated: bool, table_name: Optional[str] + ) -> List[SparkJob]: raise NotImplementedError diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 89c5a0e942f..6363ebcaffb 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -1,7 +1,7 @@ import os import tempfile from datetime import datetime -from typing import TYPE_CHECKING, List, Union +from typing import TYPE_CHECKING, List, Optional, Union from urllib.parse import urlparse, urlunparse from feast.config import Config @@ -318,9 +318,13 @@ def start_stream_to_online_ingestion( ) -def list_jobs(include_terminated: bool, client: "Client") -> List[SparkJob]: +def list_jobs( + include_terminated: bool, client: "Client", table_name: Optional[str] = None +) -> List[SparkJob]: launcher = resolve_launcher(client._config) - return launcher.list_jobs(include_terminated=include_terminated) + return launcher.list_jobs( + include_terminated=include_terminated, table_name=table_name + ) def get_job_by_id(job_id: str, client: "Client") -> SparkJob: diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py index 486135d95d6..d9ddb7cdae2 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -352,12 +352,15 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: # We should never get here raise ValueError(f"Unknown job type {job_info.job_type}") - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + def list_jobs( + self, include_terminated: bool, table_name: Optional[str] = None + ) -> List[SparkJob]: """ Find EMR job by a string id. Args: include_terminated: whether to include terminated jobs. + table_name: FeatureTable name to filter by Returns: A list of SparkJob instances. @@ -366,7 +369,7 @@ def list_jobs(self, include_terminated: bool) -> List[SparkJob]: jobs = _list_jobs( emr_client=self._emr_client(), job_type=None, - table_name=None, + table_name=table_name, active_only=not include_terminated, ) diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index ea6ecdaae5a..e4e9b167fc6 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -24,6 +24,10 @@ from feast.staging.storage_client import get_staging_client +def _truncate_label(label: str) -> str: + return label[:63] + + class DataprocJobMixin: def __init__( self, @@ -311,16 +315,16 @@ def dataproc_submit( } if isinstance(job_params, StreamIngestionJobParameters): - job_config["labels"][ - self.FEATURE_TABLE_LABEL_KEY - ] = job_params.get_feature_table_name() + job_config["labels"][self.FEATURE_TABLE_LABEL_KEY] = _truncate_label( + job_params.get_feature_table_name() + ) # Add job hash to labels only for the stream ingestion job job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash() if isinstance(job_params, BatchIngestionJobParameters): - job_config["labels"][ - self.FEATURE_TABLE_LABEL_KEY - ] = job_params.get_feature_table_name() + job_config["labels"][self.FEATURE_TABLE_LABEL_KEY] = _truncate_label( + job_params.get_feature_table_name() + ) if job_params.get_class_name(): scala_job_properties = { @@ -477,10 +481,17 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob: raise ValueError(f"Unrecognized job type: {job_type}") - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + def list_jobs( + self, include_terminated: bool, table_name: Optional[str] = None + ) -> List[SparkJob]: job_filter = f"labels.{self.JOB_TYPE_LABEL_KEY} = * AND clusterName = {self.cluster_name}" + if table_name: + job_filter = ( + job_filter + + f" AND labels.{self.FEATURE_TABLE_LABEL_KEY} = {_truncate_label(table_name)}" + ) if not include_terminated: - job_filter = job_filter + "AND status.state = ACTIVE" + job_filter = job_filter + " AND status.state = ACTIVE" return [ self._dataproc_job_to_spark_job(job) for job in self.job_client.list_jobs( diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s.py b/sdk/python/feast/pyspark/launchers/k8s/k8s.py index 5a98a184171..398ebd49373 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s.py @@ -1,3 +1,4 @@ +import hashlib import random import string import time @@ -29,6 +30,7 @@ DEFAULT_JOB_TEMPLATE, HISTORICAL_RETRIEVAL_JOB_TYPE, LABEL_FEATURE_TABLE, + LABEL_FEATURE_TABLE_HASH, METADATA_JOBHASH, METADATA_OUTPUT_URI, OFFLINE_TO_ONLINE_JOB_TYPE, @@ -54,6 +56,14 @@ def _generate_job_id() -> str: ) +def _truncate_label(label: str) -> str: + return label[:63] + + +def _generate_table_hash(table_name: str) -> str: + return hashlib.md5(table_name.encode()).hexdigest() + + class KubernetesJobMixin: def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str): self._api = api @@ -327,7 +337,12 @@ def offline_to_online_ingestion( arguments=ingestion_job_params.get_arguments(), namespace=self._namespace, extra_labels={ - LABEL_FEATURE_TABLE: ingestion_job_params.get_feature_table_name() + LABEL_FEATURE_TABLE: _truncate_label( + ingestion_job_params.get_feature_table_name() + ), + LABEL_FEATURE_TABLE_HASH: _generate_table_hash( + ingestion_job_params.get_feature_table_name() + ), }, ) @@ -373,7 +388,12 @@ def start_stream_to_online_ingestion( arguments=ingestion_job_params.get_arguments(), namespace=self._namespace, extra_labels={ - LABEL_FEATURE_TABLE: ingestion_job_params.get_feature_table_name() + LABEL_FEATURE_TABLE: _truncate_label( + ingestion_job_params.get_feature_table_name() + ), + LABEL_FEATURE_TABLE_HASH: _generate_table_hash( + ingestion_job_params.get_feature_table_name() + ), }, ) @@ -390,10 +410,12 @@ def get_job_by_id(self, job_id: str) -> SparkJob: else: return self._job_from_job_info(job_info) - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + def list_jobs( + self, include_terminated: bool, table_name: Optional[str] = None + ) -> List[SparkJob]: return [ self._job_from_job_info(job) - for job in _list_jobs(self._api, self._namespace) + for job in _list_jobs(self._api, self._namespace, table_name) if include_terminated or job.state not in (SparkJobStatus.COMPLETED, SparkJobStatus.FAILED) ] diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py index 9630d082661..19338bc313f 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py @@ -1,3 +1,4 @@ +import hashlib from copy import deepcopy from datetime import datetime from typing import Any, Dict, List, NamedTuple, Optional, Tuple @@ -28,6 +29,7 @@ LABEL_JOBID = "feast.dev/jobid" LABEL_JOBTYPE = "feast.dev/type" LABEL_FEATURE_TABLE = "feast.dev/table" +LABEL_FEATURE_TABLE_HASH = "feast.dev/tablehash" # Can't store these bits of info in k8s labels due to 64-character limit, so we store them as # sparkConf @@ -222,12 +224,24 @@ def _submit_job(api: CustomObjectsApi, resource, namespace: str) -> JobInfo: return _resource_to_job_info(response) -def _list_jobs(api: CustomObjectsApi, namespace: str) -> List[JobInfo]: - response = api.list_namespaced_custom_object( - **_crd_args(namespace), label_selector=LABEL_JOBID, - ) - +def _list_jobs( + api: CustomObjectsApi, namespace: str, table_name: Optional[str] = None +) -> List[JobInfo]: result = [] + + # Batch, Streaming Ingestion jobs + if table_name: + table_name_hash = hashlib.md5(table_name.encode()).hexdigest() + response = api.list_namespaced_custom_object( + **_crd_args(namespace), + label_selector=f"{LABEL_FEATURE_TABLE_HASH}={table_name_hash}", + ) + else: + # Retrieval jobs + response = api.list_namespaced_custom_object( + **_crd_args(namespace), label_selector=LABEL_JOBID, + ) + for item in response["items"]: result.append(_resource_to_job_info(item)) return result diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 645b3e7ff16..11f2f8fa4ed 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -355,7 +355,9 @@ def start_stream_to_online_ingestion( def get_job_by_id(self, job_id: str) -> SparkJob: return global_job_cache.get_job_by_id(job_id) - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: + def list_jobs( + self, include_terminated: bool, table_name: Optional[str] + ) -> List[SparkJob]: if include_terminated is True: return global_job_cache.list_jobs() else: diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 88970210996..e3a7fd2d95f 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -9,7 +9,7 @@ pandas~=1.0.0 mock==2.0.0 pandavro==1.5.* moto -mypy +mypy==0.790 mypy-protobuf avro==1.10.0 gcsfs diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py index 6c2df4391a1..358c918da46 100644 --- a/tests/e2e/test_online_features.py +++ b/tests/e2e/test_online_features.py @@ -23,6 +23,7 @@ from feast.data_format import AvroFormat, ParquetFormat from feast.pyspark.abc import SparkJobStatus from feast.wait import wait_retry_backoff +from tests.e2e.utils.common import create_schema, start_job, stop_job from tests.e2e.utils.kafka import check_consumer_exist, ingest_and_retrieve @@ -128,7 +129,7 @@ def test_streaming_ingestion( job = feast_client.start_stream_to_online_ingestion(feature_table) assert job.get_feature_table() == feature_table.name wait_retry_backoff( - lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120 + lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 180 ) else: job = None @@ -191,6 +192,30 @@ def ingest_and_verify( ) +def test_list_jobs_long_table_name(feast_client: Client, kafka_server, pytestconfig): + kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}" + topic_name = f"avro-{uuid.uuid4()}" + + entity, feature_table = create_schema( + kafka_broker, + topic_name, + "just1a2featuretable3with4a5really6really7really8really9really10really11really12long13name", + ) + + feast_client.apply(entity) + feast_client.apply(feature_table) + + job = start_job(feast_client, feature_table, pytestconfig) + all_job_ids = [ + job.get_id() + for job in feast_client.list_jobs( + include_terminated=True, table_name=feature_table.name + ) + ] + assert job.get_id() in all_job_ids + stop_job(job, feast_client, feature_table) + + def avro_schema(): return json.dumps( { diff --git a/tests/e2e/test_validation.py b/tests/e2e/test_validation.py index dcb7ac8a1eb..48bc5611b86 100644 --- a/tests/e2e/test_validation.py +++ b/tests/e2e/test_validation.py @@ -1,4 +1,3 @@ -import json import time import uuid @@ -7,20 +6,11 @@ import pytest from great_expectations.dataset import PandasDataset -from feast import ( - Client, - Entity, - Feature, - FeatureTable, - FileSource, - KafkaSource, - ValueType, -) +from feast import Client from feast.contrib.validation.ge import apply_validation, create_validation_udf -from feast.data_format import AvroFormat, ParquetFormat -from feast.pyspark.abc import SparkJobStatus from feast.wait import wait_retry_backoff from tests.e2e.fixtures.statsd_stub import StatsDServer +from tests.e2e.utils.common import avro_schema, create_schema, start_job, stop_job from tests.e2e.utils.kafka import check_consumer_exist, ingest_and_retrieve @@ -44,45 +34,6 @@ def generate_test_data(): return df -def create_schema(kafka_broker, topic_name, feature_table_name): - entity = Entity(name="key", description="Key", value_type=ValueType.INT64) - feature_table = FeatureTable( - name=feature_table_name, - entities=["key"], - features=[Feature("num", ValueType.INT64), Feature("set", ValueType.STRING)], - batch_source=FileSource( - event_timestamp_column="event_timestamp", - file_format=ParquetFormat(), - file_url="/dev/null", - ), - stream_source=KafkaSource( - event_timestamp_column="event_timestamp", - bootstrap_servers=kafka_broker, - message_format=AvroFormat(avro_schema()), - topic=topic_name, - ), - ) - return entity, feature_table - - -def start_job(feast_client: Client, feature_table: FeatureTable, pytestconfig): - if pytestconfig.getoption("scheduled_streaming_job"): - return - - job = feast_client.start_stream_to_online_ingestion(feature_table) - wait_retry_backoff( - lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120 - ) - return job - - -def stop_job(job, feast_client: Client, feature_table: FeatureTable): - if job: - job.cancel() - else: - feast_client.delete_feature_table(feature_table.name) - - def test_validation_with_ge(feast_client: Client, kafka_server, pytestconfig): kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}" topic_name = f"avro-{uuid.uuid4()}" @@ -227,21 +178,3 @@ def test_validation_reports_metrics( + "\n" "Actual received metrics" + str(statsd_server.metrics), ) - - -def avro_schema(): - return json.dumps( - { - "type": "record", - "name": "TestMessage", - "fields": [ - {"name": "key", "type": "long"}, - {"name": "num", "type": "long"}, - {"name": "set", "type": "string"}, - { - "name": "event_timestamp", - "type": {"type": "long", "logicalType": "timestamp-micros"}, - }, - ], - } - ) diff --git a/tests/e2e/utils/common.py b/tests/e2e/utils/common.py new file mode 100644 index 00000000000..4663a610fde --- /dev/null +++ b/tests/e2e/utils/common.py @@ -0,0 +1,71 @@ +import json + +from feast import ( + Client, + Entity, + Feature, + FeatureTable, + FileSource, + KafkaSource, + ValueType, +) +from feast.data_format import AvroFormat, ParquetFormat +from feast.pyspark.abc import SparkJobStatus +from feast.wait import wait_retry_backoff + + +def create_schema(kafka_broker, topic_name, feature_table_name): + entity = Entity(name="key", description="Key", value_type=ValueType.INT64) + feature_table = FeatureTable( + name=feature_table_name, + entities=["key"], + features=[Feature("num", ValueType.INT64), Feature("set", ValueType.STRING)], + batch_source=FileSource( + event_timestamp_column="event_timestamp", + file_format=ParquetFormat(), + file_url="/dev/null", + ), + stream_source=KafkaSource( + event_timestamp_column="event_timestamp", + bootstrap_servers=kafka_broker, + message_format=AvroFormat(avro_schema()), + topic=topic_name, + ), + ) + return entity, feature_table + + +def start_job(feast_client: Client, feature_table: FeatureTable, pytestconfig): + if pytestconfig.getoption("scheduled_streaming_job"): + return + + job = feast_client.start_stream_to_online_ingestion(feature_table) + wait_retry_backoff( + lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 180 + ) + return job + + +def stop_job(job, feast_client: Client, feature_table: FeatureTable): + if job: + job.cancel() + else: + feast_client.delete_feature_table(feature_table.name) + + +def avro_schema(): + return json.dumps( + { + "type": "record", + "name": "TestMessage", + "fields": [ + {"name": "key", "type": "long"}, + {"name": "num", "type": "long"}, + {"name": "set", "type": "string"}, + { + "name": "event_timestamp", + "type": {"type": "long", "logicalType": "timestamp-micros"}, + }, + ], + } + ) diff --git a/tests/e2e/utils/kafka.py b/tests/e2e/utils/kafka.py index c6ae836c1db..130a59d50ad 100644 --- a/tests/e2e/utils/kafka.py +++ b/tests/e2e/utils/kafka.py @@ -84,5 +84,5 @@ def get_online_features(): out_df = pd.DataFrame.from_dict(features) return out_df, out_df[feature_names].count().min() >= expected_ingested_count - ingested = wait_retry_backoff(get_online_features, 120) + ingested = wait_retry_backoff(get_online_features, 180) return ingested