Skip to content

Commit

Permalink
Convert Pub/Sub Lite spark integration to spark 3.
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed May 10, 2022
1 parent 2b65dc4 commit 9af0319
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
38 changes: 35 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
<version>1.5.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
<version>0.3.5-SNAPSHOT</version><!-- {x-version-update:pubsublite-spark-sql-streaming:current} -->
<packaging>jar</packaging>
Expand All @@ -19,7 +18,8 @@
<encoding>UTF-8</encoding>
<scala.version>2.12.15</scala.version>
<scala.version.short>2.12</scala.version.short>
<spark.version>3.2.1</spark.version>
<spark.version>3.1.2</spark.version>
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencyManagement>
<dependencies>
Expand All @@ -37,13 +37,23 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.apache.yetus</groupId>
<artifactId>audience-annotations</artifactId>
Expand All @@ -59,6 +69,26 @@
<artifactId>jetty-server</artifactId>
<version>9.4.43.v20210629</version>
</dependency>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>commons.io</groupId>
<artifactId>commons.io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -287,6 +317,8 @@
<ignoreClass>org.apache.hadoop.yarn.*</ignoreClass>
<ignoreClass>javax.ws.rs.*</ignoreClass>
<ignoreClass>javax.annotation.*</ignoreClass>
<ignoreClass>javax.activation.*</ignoreClass>
<ignoreClass>javax.xml.bind.*</ignoreClass>
</ignoreClasses>
<ignoreWhenIdentical>true</ignoreWhenIdentical>
</banDuplicateClasses>
Expand Down
10 changes: 10 additions & 0 deletions renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@
"^com.fasterxml.jackson.core"
],
"groupName": "jackson dependencies"
},
{
"packagePatterns": [
"^spark.version",
"^org.apache.spark",
"^scala.version",
"^org.scala-lang"
],
"enabled": false,
"groupName": "spark and scala pinned dependencies"
}
],
"semanticCommits": true,
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/com/google/cloud/pubsublite/spark/PslWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.apache.spark.sql.types.StructType;

public class PslWrite implements Write, WriteBuilder, BatchWrite, StreamingWrite {
public class PslWrite implements WriteBuilder, BatchWrite, StreamingWrite {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final StructType inputSchema;
Expand Down Expand Up @@ -86,17 +85,12 @@ public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo
}

@Override
public BatchWrite toBatch() {
public BatchWrite buildForBatch() {
return this;
}

@Override
public StreamingWrite toStreaming() {
return this;
}

@Override
public Write build() {
public StreamingWrite buildForStreaming() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testAbort() {
@Test
public void testCreateFactory() {
PhysicalWriteInfo info = new PhysicalWriteInfoImpl(42);
writer.toBatch().createBatchWriterFactory(info);
writer.toStreaming().createStreamingWriterFactory(info);
writer.createBatchWriterFactory(info);
writer.createStreamingWriterFactory(info);
}
}

0 comments on commit 9af0319

Please # to comment.