Skip to content

Commit

Permalink
feat: Restructure flink connector for new internal APIs and reduce AP…
Browse files Browse the repository at this point in the history
…I surface (#173)

* Restructure flink connector for new internal APIs and reduce API surface.

* Restructure flink connector for new internal APIs and reduce API surface.

* Restructure flink connector for new internal APIs and reduce API surface.

* Restructure flink connector for new internal APIs and reduce API surface.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] authored Jan 20, 2023
1 parent 0cfef6a commit 2d287cb
Showing 53 changed files with 516 additions and 1,222 deletions.
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -5,15 +5,15 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>1.6.3</version>
<version>1.9.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>google-cloud-pubsublite-flink</artifactId>
<version>0.1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.2</flink.version>
<pubsublite.version>1.2.0</pubsublite.version>
<flink.version>1.12.5</flink.version>
<pubsublite.version>${project.parent.version}</pubsublite.version>
</properties>
<build>
<extensions>
@@ -107,12 +107,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<artifactId>flink-runtime_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
@@ -144,13 +144,13 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<artifactId>flink-runtime_2.12</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Original file line number Diff line number Diff line change
@@ -22,19 +22,17 @@

public interface MessageTimestampExtractor extends Serializable {
static MessageTimestampExtractor publishTimeExtractor() {
return (MessageTimestampExtractor)
m -> Timestamp.fromProto(m.publishTime()).toDate().toInstant();
return m -> Timestamp.fromProto(m.publishTime()).toDate().toInstant();
}

static MessageTimestampExtractor eventTimeExtractor() {
return (MessageTimestampExtractor)
m -> {
if (m.message().eventTime().isPresent()) {
return Timestamp.fromProto(m.message().eventTime().get()).toDate().toInstant();
}
return Timestamp.fromProto(m.publishTime()).toDate().toInstant();
};
return m -> {
if (m.message().eventTime().isPresent()) {
return Timestamp.fromProto(m.message().eventTime().get()).toDate().toInstant();
}
return Timestamp.fromProto(m.publishTime()).toDate().toInstant();
};
}

Instant timestamp(SequencedMessage m) throws Exception;
Instant timestamp(SequencedMessage m);
}
Original file line number Diff line number Diff line change
@@ -22,13 +22,16 @@
import com.google.cloud.pubsublite.flink.internal.sink.SerializingPublisher;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Instant;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class PubsubLiteSink<T> extends RichSinkFunction<T> implements CheckpointedFunction {
private static final long serialVersionUID = 849752028745098L;

private final PubsubLiteSinkSettings<T> settings;

@GuardedBy("this")
@@ -39,8 +42,7 @@ public PubsubLiteSink(PubsubLiteSinkSettings<T> settings) {
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext)
throws Exception {}
public void initializeState(FunctionInitializationContext functionInitializationContext) {}

@Override
public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
@@ -60,11 +62,15 @@ public synchronized void invoke(T value, Context context) throws Exception {
@Override
public synchronized void open(Configuration parameters) throws Exception {
super.open(parameters);
settings
.serializationSchema()
.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
publisher =
new SerializingPublisher<>(
new MessagePublisher(
PerServerPublisherCache.getOrCreate(settings.getPublisherConfig()),
settings.maxBytesOutstanding()),
PerServerPublisherCache.getOrCreate(settings), settings.maxBytesOutstanding()),
settings.serializationSchema());
}

Original file line number Diff line number Diff line change
@@ -18,11 +18,11 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.flink.internal.sink.PublisherOptions;
import java.io.Serializable;

@AutoValue
public abstract class PubsubLiteSinkSettings<InputT> implements Serializable {
private static final long serialVersionUID = 24356890238740987L;
public static final int DEFAULT_MAX_BYTES_OUTSTANDING = 100 * 1024 * 1024;
// Create a builder which will accept messages of type InputT and serialize them using the
// provided serialization schema.
@@ -46,10 +46,6 @@ public static Builder<Message> messagesBuilder() {
// Internal.
abstract PubsubLiteSerializationSchema<InputT> serializationSchema();

PublisherOptions getPublisherConfig() {
return PublisherOptions.create(topicPath());
}

@AutoValue.Builder
public abstract static class Builder<InputT> {
// Required.
Original file line number Diff line number Diff line change
@@ -15,23 +15,26 @@
*/
package com.google.cloud.pubsublite.flink;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.flink.internal.enumerator.PartitionAssigner;
import com.google.cloud.pubsublite.flink.internal.enumerator.PubsubLiteSplitEnumerator;
import com.google.cloud.pubsublite.flink.internal.enumerator.SingleSubscriptionSplitDiscovery;
import com.google.cloud.pubsublite.flink.internal.enumerator.SplitDiscovery;
import com.google.cloud.pubsublite.flink.internal.enumerator.SplitEnumeratorCheckpointSerializer;
import com.google.cloud.pubsublite.flink.internal.enumerator.UniformPartitionAssigner;
import com.google.cloud.pubsublite.flink.internal.reader.PubsubLiteRecordEmitter;
import com.google.cloud.pubsublite.flink.internal.reader.PubsubLiteSourceReader;
import com.google.cloud.pubsublite.flink.internal.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.flink.internal.split.SubscriptionPartitionSplitSerializer;
import com.google.cloud.pubsublite.flink.internal.source.SourceAssembler;
import com.google.cloud.pubsublite.flink.internal.source.enumerator.PartitionAssigner;
import com.google.cloud.pubsublite.flink.internal.source.enumerator.PubsubLiteSplitEnumerator;
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SingleSubscriptionSplitDiscovery;
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SplitDiscovery;
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SplitEnumeratorCheckpointSerializer;
import com.google.cloud.pubsublite.flink.internal.source.enumerator.UniformPartitionAssigner;
import com.google.cloud.pubsublite.flink.internal.source.reader.PubsubLiteRecordEmitter;
import com.google.cloud.pubsublite.flink.internal.source.reader.PubsubLiteSourceReader;
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplitSerializer;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -49,7 +52,7 @@ public PubsubLiteSource(PubsubLiteSourceSettings<OutputT> settings) {

@Override
public Boundedness getBoundedness() {
return settings.boundedness();
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
@@ -65,37 +68,30 @@ public MetricGroup getMetricGroup() {

@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
return null;
}
});
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
return new PubsubLiteSourceReader<>(
new PubsubLiteRecordEmitter<>(),
settings.getCursorClient(),
settings.getSplitReaderSupplier(),
assembler.getCursorClientRemoveThis(),
assembler.getSplitReaderSupplier(),
new Configuration(),
readerContext);
}

@Override
public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> createEnumerator(
SplitEnumeratorContext<SubscriptionPartitionSplit> enumContext) {
TopicPath topic;
try (AdminClient adminClient = settings.getAdminClient()) {
topic =
TopicPath.parse(
adminClient.getSubscription(settings.subscriptionPath()).get().getTopic());
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t).underlying;
}
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
return new PubsubLiteSplitEnumerator(
enumContext,
UniformPartitionAssigner.create(),
SingleSubscriptionSplitDiscovery.create(
settings.getAdminClient(),
settings.getCursorClient(),
topic,
settings.subscriptionPath()),
settings.boundedness());
assembler.newAdminClient(),
assembler.getCursorClientRemoveThis(),
assembler.getTopicPath(),
settings.subscriptionPath()));
}

@Override
@@ -104,13 +100,14 @@ public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> re
SplitEnumeratorCheckpoint checkpoint) {
PartitionAssigner assigner =
UniformPartitionAssigner.fromCheckpoint(checkpoint.getAssignmentsList());
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
SplitDiscovery discovery =
SingleSubscriptionSplitDiscovery.fromCheckpoint(
checkpoint.getDiscovery(),
assigner.listSplits(),
settings.getAdminClient(),
settings.getCursorClient());
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery, settings.boundedness());
assembler.newAdminClient(),
assembler.getCursorClientRemoveThis());
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery);
}

@Override
Loading

0 comments on commit 2d287cb

Please # to comment.