Skip to content

Commit

Permalink
add support for reading from multiple kafka topics
Browse files Browse the repository at this point in the history
  • Loading branch information
baskaranz committed Dec 27, 2018
1 parent 034149d commit 85c853f
Showing 1 changed file with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.transform;

import com.google.common.base.Preconditions;
Expand All @@ -15,12 +32,22 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

public class FeatureRowKafkaIO {

static final String KAFKA_TYPE = "kafka";


/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow}
* proto messages from kafka one or more kafka topics.
*
*/
public static Read read(ImportSpec importSpec) {
return new Read(importSpec);
}
Expand All @@ -43,14 +70,16 @@ public PCollection<FeatureRow> expand(PInput input) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set");

String topic = importSpec.getOptionsMap().get("topic");
String topics = importSpec.getOptionsMap().get("topics");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(topic), "kafka topic must be set");
!Strings.isNullOrEmpty(topics), "kafka topic(s) must be set");

List<String> topicsList = new ArrayList<>(Arrays.asList(topics.split(",")));

KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader = KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(bootstrapServer)
.withTopic(topic)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);

Expand Down

0 comments on commit 85c853f

Please # to comment.