Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

KafkaIO implementation for feast #19

Merged
merged 2 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 163 additions & 11 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<com.google.cloud.version>1.35.0</com.google.cloud.version>
<grpcVersion>1.2.0</grpcVersion>
<guice.version>4.1.0</guice.version>
<spring.kafka.version>2.2.2.RELEASE</spring.kafka.version>
</properties>

<build>
Expand Down Expand Up @@ -66,6 +67,125 @@
</plugins>
</build>

<profiles>
<profile>
<id>direct-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${org.apache.beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-direct</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>feast.ingestion.ImportJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-flink</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>feast.ingestion.ImportJob</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>dataflow-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-dataflow</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>feast.ingestion.ImportJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.hibernate.validator</groupId>
Expand Down Expand Up @@ -214,20 +334,19 @@

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<!-- Used for local execution (so not in test scope) -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

Expand Down Expand Up @@ -306,12 +425,6 @@
<version>42.2.5</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
Expand All @@ -325,6 +438,45 @@
<version>1.9.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>26.0-jre</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.1.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${org.apache.beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
17 changes: 10 additions & 7 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package feast.ingestion;

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.dataflow.DataflowScopes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand All @@ -39,7 +41,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
Expand All @@ -60,6 +61,7 @@
import org.joda.time.Duration;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;

Expand Down Expand Up @@ -104,13 +106,16 @@ public static void main(String[] args) {

public static PipelineResult mainWithResult(String[] args) {
log.info("Arguments: " + Arrays.toString(args));
ImportJobOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class);
ImportJobOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class);
if (options.getJobName().isEmpty()) {
options.setJobName(generateName());
}
log.info(options.toString());

try {
options.setGcpCredential(GoogleCredentials.getApplicationDefault().createScoped(DataflowScopes.all()));
} catch (IOException e) {
log.error("Exception while setting gcp credential manually : ", e.getMessage());
}
log.info("options: " + options.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't print out the credentials, right?

ImportSpec importSpec = new ImportSpecSupplier(options).get();
Injector injector =
Guice.createInjector(new ImportJobModule(options, importSpec), new PipelineModule());
Expand Down Expand Up @@ -206,8 +211,6 @@ private String retrieveId(PipelineResult result) {
Class<? extends PipelineRunner<?>> runner = options.getRunner();
if (runner.isAssignableFrom(DataflowRunner.class)) {
return ((DataflowPipelineJob) result).getJobId();
} else if (runner.isAssignableFrom(FlinkRunner.class)) {
throw new UnsupportedOperationException("Runner not yet supported.");
} else {
return this.options.getJobName();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRow> Protobuf message type
*/
public class FeatureRowDeserializer implements Deserializer<FeatureRow> {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public FeatureRow deserialize(String topic, byte[] data) {
try {
return FeatureRow.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRow from Protobuf message", e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.*;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRowKey> Protobuf message type
*/
public class FeatureRowKeyDeserializer implements Deserializer<FeatureRowKey> {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public FeatureRowKey deserialize(String topic, byte[] data) {
try {
return FeatureRowKey.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRowKey from Protobuf message", e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@

import com.google.auto.service.AutoService;
import java.util.Collections;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.options.Validation.Required;

public interface ImportJobOptions extends PipelineOptions {
public interface ImportJobOptions extends PipelineOptions, FlinkPipelineOptions, GcpOptions {
@Description("Import spec yaml file path")
@Required(groups = {"importSpec"})
String getImportSpecYamlFile();

void setImportSpecYamlFile(String value);

@Description("Import spec as native proto binary encoding conveted to Base64 string")
@Description("Import spec as native proto binary encoding converted to Base64 string")
@Required(groups = {"importSpec"})
String getImportSpecBase64();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package feast.ingestion.transform;

public class FeatureEnums {
public enum InputSource {
FILE,
BIGQUERY,
PUBSUB,
KAFKA
}

public enum FileFormat {
CSV,
JSON
}
}
Loading