Skip to content

Commit

Permalink
Perf test: WebSockets (Vert.X) (#3527)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski authored Feb 23, 2024
1 parent ecab9ac commit 3e89449
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 20 deletions.
113 changes: 101 additions & 12 deletions perf-tests/src/main/scala/sttp/tapir/perf/vertx/Vertx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,115 @@ package sttp.tapir.perf.vertx

import _root_.cats.effect.IO
import _root_.cats.effect.kernel.Resource
import io.vertx.core.http.HttpServerOptions
import io.vertx.core.{Future => VFuture, Vertx}
import io.vertx.core.http.{HttpServerOptions, ServerWebSocket}
import io.vertx.core.streams.ReadStream
import io.vertx.core.{Future => VFuture, Handler, Vertx}
import io.vertx.ext.web.handler.BodyHandler
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.tapir.perf.Common._
import sttp.tapir.perf.apis.{Endpoints, ServerRunner}
import sttp.tapir.server.vertx.VertxFutureServerInterpreter
import sttp.tapir.server.vertx.VertxFutureServerOptions
import sttp.tapir.server.vertx.streams.VertxStreams
import sttp.tapir.server.vertx.{VertxFutureServerInterpreter, VertxFutureServerOptions}

import scala.concurrent.Future

object Tapir extends Endpoints {
def route(nRoutes: Int, withServerLog: Boolean = false): Router => Route = { router =>
val serverOptions = buildOptions(VertxFutureServerOptions.customiseInterceptors, withServerLog)
val interpreter = VertxFutureServerInterpreter(serverOptions)
genEndpointsFuture(nRoutes).map(interpreter.route(_)(router)).last
import sttp.tapir._
def route(nRoutes: Int, withServerLog: Boolean = false): Vertx => Router => Route = { vertx =>
router =>
val serverOptions = buildOptions(VertxFutureServerOptions.customiseInterceptors, withServerLog)
val interpreter = VertxFutureServerInterpreter(serverOptions)
val wsEndpoint = wsBaseEndpoint
.out(
webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](VertxStreams)
.concatenateFragmentedFrames(false)
)

val laggedTimestampPipe: ReadStream[Long] => ReadStream[Long] = { inputStream =>
new ReadStream[Long] {

override def fetch(amount: Long): ReadStream[Long] = this

private var dataHandler: Handler[Long] = _
private var endHandler: Handler[Void] = _
private var exceptionHandler: Handler[Throwable] = _

inputStream.handler(new Handler[Long] {
override def handle(event: Long): Unit = {
vertx.setTimer(
WebSocketSingleResponseLag.toMillis,
_ => {
if (dataHandler != null) dataHandler.handle(System.currentTimeMillis())
}
): Unit
}
})

inputStream.endHandler(new Handler[Void] {
override def handle(e: Void): Unit = {
if (endHandler != null) endHandler.handle(e)
}
})

inputStream.exceptionHandler(new Handler[Throwable] {
override def handle(e: Throwable): Unit = {
if (exceptionHandler != null) exceptionHandler.handle(e)
}
})

override def handler(handler: Handler[Long]): ReadStream[Long] = {
this.dataHandler = handler
this
}

override def pause(): ReadStream[Long] = this
override def resume(): ReadStream[Long] = this

override def endHandler(endHandler: Handler[Void]): ReadStream[Long] = {
this.endHandler = endHandler
this
}

override def exceptionHandler(exceptionHandler: Handler[Throwable]): ReadStream[Long] = {
this.exceptionHandler = exceptionHandler
this
}
}

}

val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Future] { _ =>
Future.successful {
laggedTimestampPipe
}
}
(wsServerEndpoint :: genEndpointsFuture(nRoutes)).map(interpreter.route(_)(router)).last
}
}
object Vanilla extends Endpoints {

def bodyHandler = BodyHandler.create(false).setBodyLimit(LargeInputSize + 100L)
def webSocketHandler(vertx: Vertx): Router => Route = { router =>
router.get("/ws/ts").handler { ctx =>
val wss = ctx.request().toWebSocket()
wss.map {
ws: ServerWebSocket =>
ws.textMessageHandler(_ => ())

// Set a periodic timer to send timestamps every 100 milliseconds
val timerId = vertx.setPeriodic(
WebSocketSingleResponseLag.toMillis,
{ _ =>
ws.writeTextMessage(System.currentTimeMillis().toString): Unit
}
)

def route: Int => Router => Route = { (nRoutes: Int) => router =>
// Close the timer when the WebSocket is closed
ws.closeHandler(_ => vertx.cancelTimer(timerId): Unit)
}: Unit
}
}
def route: Int => Vertx => Router => Route = { (nRoutes: Int) => _ => router =>
(0 until nRoutes).map { n =>
router.get(s"/path$n/:id").handler {
ctx: RoutingContext =>
Expand Down Expand Up @@ -70,14 +158,15 @@ object Vanilla extends Endpoints {
}

object VertxRunner {
def runServer(route: Router => Route): IO[ServerRunner.KillSwitch] = {
def runServer(route: Vertx => Router => Route, wsRoute: Option[Vertx => Router => Route] = None): IO[ServerRunner.KillSwitch] = {
Resource
.make(IO.delay(Vertx.vertx()))(vertx => IO.delay(vertx.close()).void)
.flatMap { vertx =>
val router = Router.router(vertx)
val server = vertx.createHttpServer(new HttpServerOptions().setPort(Port)).requestHandler(router)
val listenIO = vertxFutureToIo(server.listen(Port))
route.apply(router): Unit
wsRoute.foreach(r => r(vertx).apply(router))
route(vertx).apply(router): Unit
Resource.make(listenIO)(s => vertxFutureToIo(s.close()).void)
}
.allocated
Expand All @@ -100,5 +189,5 @@ object TapirMultiServer extends ServerRunner { override def start = VertxRunner.
object TapirInterceptorMultiServer extends ServerRunner {
override def start = VertxRunner.runServer(Tapir.route(128, withServerLog = true))
}
object VanillaServer extends ServerRunner { override def start = VertxRunner.runServer(Vanilla.route(1)) }
object VanillaServer extends ServerRunner { override def start = VertxRunner.runServer(Vanilla.route(1), Some(Vanilla.webSocketHandler)) }
object VanillaMultiServer extends ServerRunner { override def start = VertxRunner.runServer(Vanilla.route(128)) }
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@ package sttp.tapir.perf.vertx.cats

import cats.effect.IO
import cats.effect.std.Dispatcher
import io.vertx.ext.web.Route
import io.vertx.ext.web.Router
import fs2.Stream
import io.vertx.core.Vertx
import io.vertx.ext.web.{Route, Router}
import sttp.capabilities.fs2.Fs2Streams
import sttp.tapir.perf.Common._
import sttp.tapir.perf.apis.{Endpoints, ServerRunner}
import sttp.tapir.perf.vertx.VertxRunner
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter
import sttp.tapir.server.vertx.cats.VertxCatsServerOptions
import sttp.tapir.server.vertx.cats.{VertxCatsServerInterpreter, VertxCatsServerOptions}

object Tapir extends Endpoints {
def route(dispatcher: Dispatcher[IO], withServerLog: Boolean): Int => Router => Route = { (nRoutes: Int) => (router: Router) =>
val serverOptions = buildOptions(VertxCatsServerOptions.customiseInterceptors[IO](dispatcher), withServerLog)
val interpreter = VertxCatsServerInterpreter(serverOptions)
genEndpointsIO(nRoutes).map(interpreter.route(_)(router)).last
import sttp.tapir._
// Websocket response is returned with a lag, so that we can have more concurrent users talking to the server.
// This lag is not relevant for measurements, because the server returns a timestamp after having a response ready to send back,
// so the client can measure only the latency of the server stack handling the response.
val wsResponseStream = Stream.fixedRate[IO](WebSocketSingleResponseLag, dampen = false)

def wsLaggedPipe: fs2.Pipe[IO, Long, Long] = { requestStream =>
wsResponseStream.evalMap(_ => IO.realTime.map(_.toMillis)).concurrently(requestStream.as(()))
}
def route(dispatcher: Dispatcher[IO], withServerLog: Boolean): Int => Vertx => Router => Route = {
(nRoutes: Int) => _ => (router: Router) =>
val serverOptions = buildOptions(VertxCatsServerOptions.customiseInterceptors[IO](dispatcher), withServerLog)
val interpreter = VertxCatsServerInterpreter(serverOptions)
val wsServerEndpoint = wsBaseEndpoint
.out(
webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](Fs2Streams[IO])
.concatenateFragmentedFrames(false)
)
.serverLogicSuccess(_ => IO.pure(wsLaggedPipe))
(wsServerEndpoint :: genEndpointsIO(nRoutes)).map(interpreter.route(_)(router)).last
}
}

Expand Down

0 comments on commit 3e89449

Please # to comment.