Skip to content

Commit

Permalink
Reporting metrics from validation UDF (#1256)
Browse files Browse the repository at this point in the history
* report metrics from ge validation udf

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* e2e tests for validation metrics

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* parse metrics from prometheus

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* docker-compose test: pull js logs

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* better error message

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* enable statsd in docker compose

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* enable statsd in docker compose

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* enable statsd in docker compose

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* unique ft name

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* cleanup

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Jan 11, 2021
1 parent fae8b65 commit 1c4ba12
Show file tree
Hide file tree
Showing 12 changed files with 364 additions and 24 deletions.
11 changes: 10 additions & 1 deletion infra/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ services:
FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT: parquet
FEAST_REDIS_HOST: redis
FEAST_SPARK_INGESTION_JAR: ${INGESTION_JAR_PATH}
FEAST_STATSD_ENABLED: "true"
FEAST_STATSD_HOST: prometheus_statsd
FEAST_STATSD_PORT: 9125

jupyter:
image: gcr.io/kf-feast/feast-jupyter:${FEAST_VERSION}
Expand Down Expand Up @@ -106,4 +109,10 @@ services:
redis:
image: redis:5-alpine
ports:
- "6379:6379"
- "6379:6379"

prometheus_statsd:
image: prom/statsd-exporter:v0.12.1
ports:
- "9125:9125"
- "9102:9102"
3 changes: 2 additions & 1 deletion infra/scripts/test-docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ clean_up () {
ARG=$?

# Shut down docker-compose images

docker-compose down

exit $ARG
Expand Down Expand Up @@ -69,4 +70,4 @@ docker exec \
-e DISABLE_FEAST_SERVICE_FIXTURES=true \
--user root \
feast_jupyter_1 bash \
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --feast-version develop'
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --statsd-url prometheus_statsd:9125 --prometheus-url prometheus_statsd:9102 --feast-version develop'
69 changes: 66 additions & 3 deletions sdk/python/feast/contrib/validation/ge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import json
import os
from typing import TYPE_CHECKING
from urllib.parse import urlparse

Expand All @@ -10,7 +11,7 @@
from feast.staging.storage_client import get_staging_client

try:
from great_expectations.core import ExpectationSuite
from great_expectations.core import ExpectationConfiguration, ExpectationSuite
from great_expectations.dataset import PandasDataset
except ImportError:
raise ImportError(
Expand Down Expand Up @@ -41,7 +42,28 @@ def __init__(self, name: str, pickled_code: bytes):
self.pickled_code = pickled_code


def create_validation_udf(name: str, expectations: ExpectationSuite) -> ValidationUDF:
def drop_feature_table_prefix(
expectation_configuration: ExpectationConfiguration, prefix
):
kwargs = expectation_configuration.kwargs
for arg_name in ("column", "column_A", "column_B"):
if arg_name not in kwargs:
continue

if kwargs[arg_name].startswith(prefix):
kwargs[arg_name] = kwargs[arg_name][len(prefix) :]


def prepare_expectations(suite: ExpectationSuite, feature_table: "FeatureTable"):
for expectation in suite.expectations:
drop_feature_table_prefix(expectation, f"{feature_table.name}__")

return suite


def create_validation_udf(
name: str, expectations: ExpectationSuite, feature_table: "FeatureTable",
) -> ValidationUDF:
"""
Wraps your expectations into Spark UDF.
Expand All @@ -60,10 +82,25 @@ def create_validation_udf(name: str, expectations: ExpectationSuite) -> Validati
:param name
:param expectations: collection of expectation gathered on training dataset
:param feature_table
:return: ValidationUDF with serialized code
"""

expectations = prepare_expectations(expectations, feature_table)

def udf(df: pd.DataFrame) -> pd.Series:
from datadog.dogstatsd import DogStatsd

reporter = (
DogStatsd(
host=os.environ["STATSD_HOST"],
port=int(os.environ["STATSD_PORT"]),
telemetry_min_flush_interval=0,
)
if os.getenv("STATSD_HOST") and os.getenv("STATSD_PORT")
else DogStatsd()
)

ds = PandasDataset.from_dataset(df)
result = ds.validate(expectations, result_format="COMPLETE")
valid_rows = pd.Series([True] * df.shape[0])
Expand All @@ -72,6 +109,32 @@ def udf(df: pd.DataFrame) -> pd.Series:
if check.success:
continue

unexpected_count = (
check.result["unexpected_count"]
if "unexpected_count" in check.result
else df.shape[0]
)

check_kwargs = check.expectation_config.kwargs
check_kwargs.pop("result_format", None)
check_name = "_".join(
[check.expectation_config.expectation_type]
+ [
str(v)
for v in check_kwargs.values()
if isinstance(v, (str, int, float))
]
)

reporter.increment(
"feast_feature_validation_check_failed",
value=unexpected_count,
tags=[
f"feature_table:{os.getenv('FEAST_INGESTION_FEATURE_TABLE', 'unknown')}",
f"check:{check_name}",
],
)

if check.exception_info["raised_exception"]:
# ToDo: probably we should mark all rows as invalid
continue
Expand Down Expand Up @@ -106,7 +169,7 @@ def apply_validation(
staging_client = get_staging_client(staging_scheme, client._config)

pickled_code_fp = io.BytesIO(udf.pickled_code)
remote_path = f"{staging_location}/udfs/{udf.name}.pickle"
remote_path = f"{staging_location}/udfs/{feature_table.name}/{udf.name}.pickle"
staging_client.upload_fileobj(
pickled_code_fp, f"{udf.name}.pickle", remote_uri=urlparse(remote_path)
)
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/feature_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,6 @@ def _update_from_feature_table(self, feature_table):
self.stream_source = feature_table.stream_source
self._created_timestamp = feature_table.created_timestamp
self._last_updated_timestamp = feature_table.last_updated_timestamp

def __repr__(self):
return f"FeatureTable <{self.name}>"
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,19 @@ object StreamingPipeline extends BasePipeline with Serializable {
val fileName = validationConfig.pickledCodePath.split("/").last
val pickledCode = FileUtils.readFileToByteArray(new File(SparkFiles.get(fileName)))

val env = config.metrics match {
case Some(c: StatsDConfig) =>
Map(
"STATSD_HOST" -> c.host,
"STATSD_PORT" -> c.port.toString,
"FEAST_INGESTION_FEATURE_TABLE" -> config.featureTable.name
)
case _ => Map.empty[String, String]
}

UserDefinedPythonFunction(
validationConfig.name,
DynamicPythonFunction.create(pickledCode),
DynamicPythonFunction.create(pickledCode, env),
BooleanType,
pythonEvalType = 200, // SQL_SCALAR_PANDAS_UDF (original constant is in private object)
udfDeterministic = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ object DynamicPythonFunction {
)
}

def create(pickledCode: Array[Byte], includePath: String = "libs/"): PythonFunction = {
val envVars = new JHashMap[String, String]()
def create(
pickledCode: Array[Byte],
env: Map[String, String] = Map.empty,
includePath: String = "libs/"
): PythonFunction = {
val envVars = new JHashMap[String, String](env.asJava)
val broadcasts = new JArrayList[Broadcast[PythonBroadcast]]()

if (!sys.env.contains("SPARK_HOME")) {
Expand Down
9 changes: 8 additions & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def pytest_addoption(parser):
parser.addoption("--feast-version", action="store")
parser.addoption("--bq-project", action="store")
parser.addoption("--feast-project", action="store", default="default")
parser.addoption("--statsd-url", action="store", default="localhost:8125")
parser.addoption("--prometheus-url", action="store", default="localhost:9102")


def pytest_runtest_setup(item):
Expand All @@ -51,10 +53,15 @@ def pytest_runtest_setup(item):
kafka_port,
kafka_server,
redis_server,
statsd_server,
zookeeper_server,
)
else:
from .fixtures.external_services import kafka_server, redis_server # noqa
from .fixtures.external_services import ( # type: ignore # noqa
kafka_server,
redis_server,
statsd_server,
)

if not os.environ.get("DISABLE_FEAST_SERVICE_FIXTURES"):
from .fixtures.feast_services import * # type: ignore # noqa
Expand Down
5 changes: 5 additions & 0 deletions tests/e2e/fixtures/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from pytest_redis.executor import RedisExecutor

from feast import Client
from tests.e2e.fixtures.statsd_stub import StatsDServer


@pytest.fixture
def feast_client(
pytestconfig,
ingestion_job_jar,
redis_server: RedisExecutor,
statsd_server: StatsDServer,
feast_core: Tuple[str, int],
feast_serving: Tuple[str, int],
local_staging_path,
Expand Down Expand Up @@ -44,6 +46,9 @@ def feast_client(
local_staging_path, "historical_output"
),
ingestion_drop_invalid_rows=True,
statsd_enabled=True,
statsd_host=statsd_server.host,
statsd_port=statsd_server.port,
**job_service_env,
)

Expand Down
12 changes: 12 additions & 0 deletions tests/e2e/fixtures/external_services.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import pytest
from pytest_redis.executor import NoopRedis

from tests.e2e.fixtures.statsd_stub import PrometheusStatsDServer

__all__ = (
"feast_core",
"feast_serving",
"redis_server",
"kafka_server",
"enable_auth",
"feast_jobservice",
"statsd_server",
)


Expand Down Expand Up @@ -44,3 +47,12 @@ def enable_auth():
def feast_jobservice(pytestconfig):
host, port = pytestconfig.getoption("job_service_url").split(":")
return host, port


@pytest.fixture(scope="session")
def statsd_server(pytestconfig):
host, port = pytestconfig.getoption("statsd_url").split(":")
prometheus_host, prometheus_port = pytestconfig.getoption("prometheus_url").split(
":"
)
return PrometheusStatsDServer(host, port, prometheus_host, prometheus_port)
29 changes: 23 additions & 6 deletions tests/e2e/fixtures/services.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import pathlib
import shutil
import tempfile

import port_for
import pytest
import requests
from pytest_kafka import make_kafka_server, make_zookeeper_process
Expand All @@ -14,16 +15,23 @@
"zookeeper_server",
"postgres_server",
"redis_server",
"statsd_server",
)

from tests.e2e.fixtures.statsd_stub import StatsDStub


def download_kafka(version="2.12-2.6.0"):
r = requests.get(f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz")
temp_dir = pathlib.Path(tempfile.mkdtemp())
local_path = temp_dir / "kafka.tgz"
temp_dir = pathlib.Path("/tmp")
local_path = temp_dir / f"kafka_{version}.tgz"

if not os.path.isfile(local_path):
r = requests.get(
f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz"
)

with open(local_path, "wb") as f:
f.write(r.content)
with open(local_path, "wb") as f:
f.write(r.content)

shutil.unpack_archive(str(local_path), str(temp_dir))
return temp_dir / f"kafka_{version}" / "bin"
Expand All @@ -35,6 +43,15 @@ def kafka_server(kafka_port):
return "localhost", port


@pytest.fixture
def statsd_server():
port = port_for.select_random(None)
server = StatsDStub(port=port)
server.start()
yield server
server.stop()


postgres_server = pg_factories.postgresql_proc(password="password")
redis_server = redis_factories.redis_proc(
executable=shutil.which("redis-server"), timeout=3600
Expand Down
Loading

0 comments on commit 1c4ba12

Please # to comment.