Skip to content

Commit

Permalink
fix pom version
Browse files Browse the repository at this point in the history
  • Loading branch information
palmere-google committed Jul 16, 2021
1 parent a9eb732 commit e13c4a3
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>1.13-SNAPSHOT</version>
<version>1.13.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void emitRecord(
SourceOutput<T> sourceOutput,
SubscriptionPartitionSplitState subscriptionPartitionSplitState) {
if (record.value().isPresent()) {
sourceOutput.collect(record.value().get(), record.timestamp());
sourceOutput.collect(record.value().get(), record.timestamp().toEpochMilli());
}
subscriptionPartitionSplitState.setCurrent(record.offset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplitState;
import java.time.Instant;
import java.util.Optional;
import org.apache.flink.api.connector.source.SourceOutput;
import org.junit.Test;
Expand All @@ -34,7 +35,7 @@ public class PubsubLiteRecordEmitterTest {
static final SubscriptionPartitionSplit split =
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), examplePartition(), Offset.of(0));
static final long exampleTime = 1000;
static final Instant exampleTime = Instant.ofEpochMilli(1000);

@Mock SourceOutput<Long> mockOutput;

Expand All @@ -47,7 +48,7 @@ public void testRecordEmitted() {
Record.create(Optional.of(100L), exampleOffset(), exampleTime), mockOutput, state);

assertThat(state.toSplit().start()).isEqualTo(exampleOffset());
verify(mockOutput).collect(100L, exampleTime);
verify(mockOutput).collect(100L, exampleTime.toEpochMilli());
}

@Test
Expand Down

0 comments on commit e13c4a3

Please # to comment.