Skip to content

Commit

Permalink
Fix batch client call missing request, fix multi-entity lookup (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Nov 10, 2019
1 parent b90facd commit 43a69da
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
3 changes: 2 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ def get_batch_features(

# Retrieve serving information to determine store type and staging location
serving_info = (
self._serving_service_stub.GetFeastServingInfo()
self._serving_service_stub.GetFeastServingInfo(
GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT)
) # type: GetFeastServingInfoResponse

if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
Expand Down
65 changes: 44 additions & 21 deletions serving/src/main/resources/templates/bq_featureset_query.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
SELECT {{ fullEntitiesList | join(', ')}}, {% for featureSet in featureSets %}{% for featureName in featureSet.features %}{{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }}, {% endfor %}{% endfor %}event_timestamp FROM (WITH union_features AS (
SELECT
WITH union_features AS (SELECT
event_timestamp,
{% for featureSet in featureSets %}NULL as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% endfor %}{{ fullEntitiesList | join(', ')}},
Expand All @@ -24,38 +23,62 @@ SELECT
{% endif %}
{% endfor %}
false AS is_entity_table
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= '{{minTimestamp}}'
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
{% endfor %}
)
SELECT
event_timestamp,
{% for featureSet in featureSets %}
IF(event_timestamp >= {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, NULL) as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% endfor %}
{{ fullEntitiesList | join(', ')}}
FROM (
), ts_union AS (
{% for featureSet in featureSets %}
SELECT * FROM (
SELECT
event_timestamp,
{{ fullEntitiesList | join(', ')}},
{% for otherFeatureSet in featureSets %}
{% if otherFeatureSet.id == featureSet.id %}
LAST_VALUE({{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp,
{% else %}
{{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp,
{% endif %}
{% endfor %}
is_entity_table
FROM union_features
WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp, is_entity_table ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
) WHERE is_entity_table
{% if loop.last %}
{% else %}
UNION ALL
{% endif %}
{% endfor %}
), ts_coalesce AS (
SELECT
event_timestamp,
{{ fullEntitiesList | join(', ')}},
{% for featureSet in featureSets %}
LAST_VALUE({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% endfor %}
is_entity_table
FROM union_features
WINDOW w AS (PARTITION BY source ORDER BY event_timestamp ASC)
) WHERE is_entity_table)
ROW_NUMBER() over w as rn
FROM ts_union
WINDOW w AS (PARTITION BY {{ fullEntitiesList | join(', ')}}, event_timestamp ORDER BY event_timestamp)
), ts_final AS (
SELECT
event_timestamp,
{{ fullEntitiesList | join(', ')}},
{% for featureSet in featureSets %}
IF(event_timestamp >= {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, NULL) as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM ts_coalesce WHERE rn = 1
)
SELECT * FROM ts_final
{% for featureSet in featureSets %}
LEFT JOIN
(SELECT {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
LEFT JOIN
(SELECT {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% for featureName in featureSet.features %}{{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }},
{% endfor %}{{ featureSet.entities | join(', ') }}
FROM (SELECT
FROM (SELECT
event_timestamp as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{{ featureSet.entities | join(', ') }},
{% for featureName in featureSet.features %}
{{ featureName }} as {{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }},
{% endfor %}ROW_NUMBER() OVER(PARTITION BY event_timestamp, {{ featureSet.entities | join(', ') }} ORDER BY created_timestamp DESC) as {{ featureSet.name }}_v{{ featureSet.version }}_rown
FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= '{{minTimestamp}}'
FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
) WHERE {{ featureSet.name }}_v{{ featureSet.version }}_rown = 1
) USING ({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, source)
{% endfor %}
) USING ({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.entities | join(', ') }})
{% endfor %}

0 comments on commit 43a69da

Please # to comment.