Skip to content

Commit

Permalink
[Spark] Remove waiting for a fixed time for Delta Connect Server to b…
Browse files Browse the repository at this point in the history
…e available in Delta Connect testing (#3576)

## Description
For local E2E Delta Connect testing, we also designed an [util
class](https://github.com/delta-io/delta/blob/01bf60743b77c47147843e9083129320490f1629/spark-connect/client/src/test/scala-spark-master/io/delta/connect/tables/RemoteSparkSession.scala#L62)
to start a local server in a different process similar to
[SparkConnect](https://github.com/apache/spark/blob/ba208b9ca99990fa329c36b28d0aa2a5f4d0a77e/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala#L37).

We noticed that the server takes a random amount of seconds to start up,
and back then we received the error `INVALID_HANDLE.SESSION_NOT_FOUND]
The handle 746e6c86-9fa9-4b08-9572-388c20eaed47 is invalid. Session not
found. SQLSTATE: HY000"`, so what we did is to add a 10s `Thread.sleep`
before starting the client.

This is not robust, so we are removing the `Thread.sleep`. This should
work because:
1. The SparkSession's builder here already uses the default
[Configuration](https://github.com/apache/spark/blob/3edc9c23a723a92c5a951cea0436529de65c640a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala#L891)
of the `SparkConnectClient` which includes a default retry policy.
2. Spark patches the error `INVALID_HANDLE.SESSION_NOT_FOUND` in this
[PR](apache/spark#46971) at some point, so we
should be able to retry even if encountering this error.

## How was this patch tested?
Existing UTs.
  • Loading branch information
longvu-db authored Aug 21, 2024
1 parent 0c188c2 commit 56d057c
Showing 1 changed file with 1 addition and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ trait RemoteSparkSession extends BeforeAndAfterAll { self: Suite =>
override def beforeAll(): Unit = {
super.beforeAll()
server
// TODO: Instead of sleeping for a fixed time, which is a bit brittle,
// we should repeatedly check when the server is ready.
Thread.sleep(10000)
spark = SparkSession.builder().remote(s"sc://localhost:$serverPort").build()
spark = SparkSession.builder().remote(s"sc://localhost:$serverPort").create()
}

override def afterAll(): Unit = {
Expand Down

0 comments on commit 56d057c

Please # to comment.