Skip to content

Commit

Permalink
0.3 dev serving api change (#253)
Browse files Browse the repository at this point in the history
* [WIP] Simplify serving API signature

* Add noop job service for online serving deployments

* Fix generated job name

* Ignore staleness if maxAge not set

* Add comments to new types
  • Loading branch information
Chen Zhiling authored and woop committed Oct 3, 2019
1 parent 5bca082 commit 40bdd06
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,6 @@ public void updateJobStatus(String jobId, JobStatus status) {
public String createJobId(String featureSetName, String storeName) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId = String.format("%s-to-%s", featureSetName, storeName) + dateSuffix;
return jobId.replaceAll("-", "_");
return jobId.replaceAll("_", "-");
}
}
70 changes: 14 additions & 56 deletions protos/feast/serving/ServingService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ message GetFeaturesRequest {
// List of feature sets and their features that are being retrieved
repeated FeatureSet feature_sets = 1;

// Dataset containing timestamp and entity id data. Used during retrieval of feature rows
// and for joining feature rows into a final dataset
EntityDataset entity_dataset = 2;
// List of entity rows, containing entity id and timestamp data.
// Used during retrieval of feature rows and for joining feature
// rows into a final dataset
repeated EntityRow entity_rows = 2;

message FeatureSet {
// Feature set name
Expand All @@ -89,67 +90,24 @@ message GetFeaturesRequest {
google.protobuf.Duration max_age = 4;
}

message EntityDataset {
// List of entity names contained within this incoming request. Each entity name is globally
// unique within Feast. The user is assumed to have used the exact column name in their
// EntityDataset if they are providing this dataset through a batch process.
repeated string entity_names = 1;

// List of Unix epoch entity_timestamp and entity_id values
repeated EntityDatasetRow entity_dataset_rows = 2;
}

// EntityDatasetRow specifies:
// - the timestamp range over which feature values should be retrieved (required for batch serving)
// - the specific entity ids that should be retrieved (required for online serving)
//
// If there are duplicate entity ids for the same timestamp range, only the
// one with the latest event_timestamp will be retrieved.
//
// Entity ids may be ommitted for batch features retrieval. In this case,
// all entities with distinct entity ids within the valid timestamp range
// will be retrieved.
message EntityDatasetRow {
// entity_timestamp is the upper bound of the timestamp range over
// which the feature values should be retrieved.
//
// For online serving entity_timestamp is optional (ignored), as the
// latest is always retrieved.
//
// The timestamp range is defined as follows:
// entity_timestamp - max_age <= event_timestamp <= entity_timestamp
message EntityRow {
// Request timestamp of this row. This value will be used, together with maxAge,
// to determine feature staleness.
google.protobuf.Timestamp entity_timestamp = 1;

// The entity ids for which the feature values should be retrieved.
//
// The order of the values should follow that in entity_names in EntityDataset.
// For online serving, it is required to specify entity_ids.
// For batch serving, it is optional.
repeated feast.types.Value entity_ids = 2;
// Map containing mapping of entity name to entity value.
map<string,feast.types.Value> fields = 2;
}
}

message GetOnlineFeaturesResponse {
// A FeatureDataSet is returned for each feature set in the incoming request
repeated FeatureDataset feature_datasets = 2;

// The FeatureDataSet contains information about the Feature Set in the incoming request,
// as well as feature data that can be joined to the incoming EntityDataSet. The row count
// for the returning FeatureDataSet will match that of the row count for the incoming
// EntityDataSet.
// If any of the keys do not have values, empty feature rows will be returned.
message FeatureDataset {
// Feature set name
string name = 1;

// Feature set version
int32 version = 2;
repeated FieldValues field_values = 1;

// Each feature data set contains a list of feature rows. The timestamps within the row
// are the original event timestamps from when that row was written to the backing store.
// When these FeatureRows are joined to the EntityDataSetRows, the FeatureRow timestamps
// will be dropped in favour of the EntityDataSetRow timestamp.
repeated feast.types.FeatureRow feature_rows = 3;
// TODO: update this comment
// does not include timestamp, includes features and entities
message FieldValues {
map<string,feast.types.Value> fields = 1;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import feast.core.StoreProto.Store.StoreType;
import feast.serving.FeastProperties;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
import feast.serving.service.RedisBackedJobService;
import feast.serving.service.SpecService;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -27,23 +28,15 @@ public JobServiceConfig(FeastProperties feastProperties) {

@Bean
public JobService jobService(SpecService specService) {
String jobStoreName = feastProperties.getJobStoreName();
GetStoresResponse storesResponse =
specService.getStores(
GetStoresRequest.newBuilder()
.setFilter(Filter.newBuilder().setName(jobStoreName).build())
.build());

if (storesResponse.getStoreCount() < 1) {
throw new IllegalArgumentException(
String.format(
"Cannot resolve Store from store name '%s'. Ensure the store name exists in Feast.",
jobStoreName));
String storeName = feastProperties.getStoreName();
Store store = getStore(specService, storeName);
if (store.getType() == StoreType.REDIS) {
return new NoopJobService();
}

assert storesResponse.getStoreCount() == 1;
Store store = storesResponse.getStore(0);
StoreType storeType = store.getType();
String jobStoreName = feastProperties.getJobStoreName();
Store jobStore = getStore(specService, jobStoreName);
StoreType storeType = jobStore.getType();
JobService jobService = null;

switch (storeType) {
Expand All @@ -63,4 +56,22 @@ public JobService jobService(SpecService specService) {

return jobService;
}

private Store getStore(SpecService specService, String jobStoreName) {
GetStoresResponse storesResponse =
specService.getStores(
GetStoresRequest.newBuilder()
.setFilter(Filter.newBuilder().setName(jobStoreName).build())
.build());

if (storesResponse.getStoreCount() < 1) {
throw new IllegalArgumentException(
String.format(
"Cannot resolve Store from store name '%s'. Ensure the store name exists in Feast.",
jobStoreName));
}

assert storesResponse.getStoreCount() == 1;
return storesResponse.getStore(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public ServingService servingService(
poolConfig.setMaxIdle(feastProperties.getRedisPoolMaxIdle());
JedisPool jedisPool =
new JedisPool(
poolConfig, store.getRedisConfig().getHost(), store.getRedisConfig().getPort());
poolConfig, redisConfig.getHost(), redisConfig.getPort());
servingService = new RedisServingService(jedisPool, specService, tracer);
break;
case BIGQUERY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void getBatchFeatures(
responseObserver.onError(e);
}
}

@Override
public void reloadJob(
ReloadJobRequest request, StreamObserver<ReloadJobResponse> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.common.collect.Lists;
import feast.core.CoreServiceProto.GetFeatureSetsRequest;
import feast.core.CoreServiceProto.GetFeatureSetsRequest.Filter;
import feast.core.FeatureSetProto.FeatureSetSpec;
Expand All @@ -20,7 +21,7 @@
import feast.serving.ServingAPIProto.GetFeastServingTypeRequest;
import feast.serving.ServingAPIProto.GetFeastServingTypeResponse;
import feast.serving.ServingAPIProto.GetFeaturesRequest;
import feast.serving.ServingAPIProto.GetFeaturesRequest.EntityDatasetRow;
import feast.serving.ServingAPIProto.GetFeaturesRequest.EntityRow;
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
import feast.serving.ServingAPIProto.JobStatus;
import feast.serving.ServingAPIProto.JobType;
Expand Down Expand Up @@ -102,16 +103,16 @@ public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesR
.asRuntimeException();
}

if (getFeaturesRequest.getEntityDataset().getEntityDatasetRowsCount() < 1) {
if (getFeaturesRequest.getEntityRowsCount() < 1) {
throw Status.INVALID_ARGUMENT
.withDescription(
"entity_dataset_rows is required for batch retrieval in order to filter the retrieved entities.")
.asRuntimeException();
}

for (EntityDatasetRow entityDatasetRow :
getFeaturesRequest.getEntityDataset().getEntityDatasetRowsList()) {
if (entityDatasetRow.getEntityTimestamp().getSeconds() == 0) {
for (EntityRow entityRow :
getFeaturesRequest.getEntityRowsList()) {
if (entityRow.getEntityTimestamp().getSeconds() == 0) {
throw Status.INVALID_ARGUMENT
.withDescription(
"entity_timestamp field in entity_dataset_row is required for batch retrieval.")
Expand All @@ -123,8 +124,8 @@ public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesR
BigQueryUtil.createQuery(
getFeaturesRequest.getFeatureSetsList(),
featureSetSpecs,
getFeaturesRequest.getEntityDataset().getEntityNamesList(),
getFeaturesRequest.getEntityDataset().getEntityDatasetRowsList(),
Lists.newArrayList(getFeaturesRequest.getEntityRows(0).getFieldsMap().keySet()),
getFeaturesRequest.getEntityRowsList(),
datasetId);
log.debug("Running BigQuery query: {}", query);

Expand Down
4 changes: 2 additions & 2 deletions serving/src/main/java/feast/serving/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public interface JobService {
* @param id job id
* @return feast.serving.ServingAPIProto.Job
*/
public Optional<Job> get(String id);
Optional<Job> get(String id);

/**
* Update or create a job (if not exists)
*
* @param job feast.serving.ServingAPIProto.Job
*/
public void upsert(Job job);
void upsert(Job job);
}
18 changes: 18 additions & 0 deletions serving/src/main/java/feast/serving/service/NoopJobService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package feast.serving.service;

import feast.serving.ServingAPIProto.Job;
import java.util.Optional;

// No-op implementation of the JobService, for online serving stores.
public class NoopJobService implements JobService {

@Override
public Optional<Job> get(String id) {
return Optional.empty();
}

@Override
public void upsert(Job job) {

}
}
Loading

0 comments on commit 40bdd06

Please # to comment.