diff --git a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java index 27cca2ffb2e..4b744d0fe6b 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java @@ -24,8 +24,9 @@ import feast.store.serving.redis.RedisCustomIO.RedisMutation; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.DoFn; import org.slf4j.Logger; @@ -42,17 +43,24 @@ public FeatureRowToRedisMutationDoFn(Map featureSets) { private RedisKey getKey(FeatureRow featureRow) { FeatureSet featureSet = featureSets.get(featureRow.getFeatureSet()); - Set entityNames = + List entityNames = featureSet.getSpec().getEntitiesList().stream() .map(EntitySpec::getName) - .collect(Collectors.toSet()); + .sorted() + .collect(Collectors.toList()); + Map entityFields = new HashMap<>(); Builder redisKeyBuilder = RedisKey.newBuilder().setFeatureSet(featureRow.getFeatureSet()); for (Field field : featureRow.getFieldsList()) { if (entityNames.contains(field.getName())) { - redisKeyBuilder.addEntities(field); + entityFields.putIfAbsent( + field.getName(), + Field.newBuilder().setName(field.getName()).setValue(field.getValue()).build()); } } + for (String entityName : entityNames) { + redisKeyBuilder.addEntities(entityFields.get(entityName)); + } return redisKeyBuilder.build(); } diff --git a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java new file mode 100644 index 00000000000..92bb6e41c38 --- /dev/null +++ b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.redis; + +import static org.junit.Assert.*; + +import com.google.protobuf.Timestamp; +import feast.core.FeatureSetProto; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto.FeatureSpec; +import feast.storage.RedisProto.RedisKey; +import feast.store.serving.redis.RedisCustomIO.RedisMutation; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import feast.types.ValueProto.ValueType.Enum; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +public class FeatureRowToRedisMutationDoFnTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + private FeatureSetProto.FeatureSet fs = + FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(1) + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_primary") + .setValueType(Enum.INT32) + .build()) + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_secondary") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder() + .setName("feature_1") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder() + .setName("feature_2") + .setValueType(Enum.INT64) + .build())) + .build(); + + @Test + public void shouldConvertRowWithDuplicateEntitiesToValidKey() { + Map featureSets = new HashMap<>(); + featureSets.put("feature_set", fs); + + FeatureRow offendingRow = + FeatureRow.newBuilder() + .setFeatureSet("feature_set") + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addFields( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .build(); + + PCollection output = + p.apply(Create.of(Collections.singletonList(offendingRow))) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSets))); + + RedisKey expectedKey = + RedisKey.newBuilder() + .setFeatureSet("feature_set") + .addEntities( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addEntities( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .build(); + + PAssert.that(output) + .satisfies( + (SerializableFunction, Void>) + input -> { + input.forEach( + rm -> { + assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray())); + assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray())); + }); + return null; + }); + p.run(); + } + + @Test + public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() { + Map featureSets = new HashMap<>(); + featureSets.put("feature_set", fs); + + FeatureRow offendingRow = + FeatureRow.newBuilder() + .setFeatureSet("feature_set") + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .addFields( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .build(); + + PCollection output = + p.apply(Create.of(Collections.singletonList(offendingRow))) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSets))); + + RedisKey expectedKey = + RedisKey.newBuilder() + .setFeatureSet("feature_set") + .addEntities( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addEntities( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .build(); + + PAssert.that(output) + .satisfies( + (SerializableFunction, Void>) + input -> { + input.forEach( + rm -> { + assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray())); + assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray())); + }); + return null; + }); + p.run(); + } +} diff --git a/protos/feast/storage/Redis.proto b/protos/feast/storage/Redis.proto index ae287f4e6bf..f58b137e9c1 100644 --- a/protos/feast/storage/Redis.proto +++ b/protos/feast/storage/Redis.proto @@ -32,6 +32,7 @@ message RedisKey { string feature_set = 2; // List of fields containing entity names and their respective values - // contained within this feature row. + // contained within this feature row. The entities should be sorted + // by the entity name alphabetically in ascending order. repeated feast.types.Field entities = 3; }