From 590e1d8d0bf6417a681e3c0f5f3e66506ded1edd Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 12:39:44 -0700 Subject: [PATCH 1/8] Encode Multiple Timeout Approaches --- .../rediculous/RedisConnection.scala | 233 +++++++++++------- .../rediculous/RedisError.scala | 10 + .../rediculous/RedisPubSub.scala | 10 +- .../rediculous/BufferedSocket.scala | 8 +- .../rediculous/RedisConnectionSpec.scala | 4 +- 5 files changed, 164 insertions(+), 101 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 72db947..1c22ed9 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -28,9 +28,9 @@ trait RedisConnection[F[_]]{ } object RedisConnection{ - private[rediculous] case class Queued[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{ + private[rediculous] case class Queued[F[_]: Temporal](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]], commandTimeout: Duration) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, resp)))).flatMap{ c => queue.offer(c.map(_._2)) >> { val x: F[Chunk[Either[Throwable, Resp]]] = c.traverse{ case (d, _) => d.get } @@ -38,57 +38,64 @@ object RedisConnection{ y } } - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } - private[rediculous] case class PooledConnection[F[_]: Concurrent]( - pool: KeyPool[F, Unit, Socket[F]] + private[rediculous] case class PooledConnection[F[_]: Temporal]( + pool: KeyPool[F, Unit, Socket[F]], + commandTimeout: Duration, + redisRequestTimeout: Duration, ) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) - def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk) + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) + def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout) pool.take(()).use{ - m => withSocket(m.value).attempt.flatTap{ + m => withSocket(m.value).timeout(redisRequestTimeout).attempt.flatTap{ case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } }.rethrow - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } - private[rediculous] case class DirectConnection[F[_]: Concurrent](socket: Socket[F]) extends RedisConnection[F]{ + private[rediculous] case class DirectConnection[F[_]: Temporal](socket: Socket[F], commandTimeout: Duration, redisRequestTimeout: Duration) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) - def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk) + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) + def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout) withSocket(socket) - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } private[rediculous] case class Cluster[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { - val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) - chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c => + val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) + chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c => queue.offer(c.map(_._2)) >> { c.traverse(_._1.get).flatMap(_.sequence.liftTo[F].adaptError{case e => RedisError.QueuedExceptionError(e)}) } - } + } } } // Guarantees With Socket That Each Call Receives a Response // Chunk must be non-empty but to do so incurs a penalty - private[rediculous] def explicitPipelineRequest[F[_]: Concurrent](socket: Socket[F], calls: Chunk[Resp], maxBytes: Int = 16 * 1024 * 1024): F[Chunk[Resp]] = { + private[rediculous] def explicitPipelineRequest[F[_]: Temporal](socket: Socket[F], calls: Chunk[Resp], maxBytes: Int, redisRequestTimeout: Duration): F[Chunk[Resp]] = { val out = calls.flatMap(resp => Resp.CodecUtils.codec.encode(resp).toEither.traverse(bits => Chunk.byteVector(bits.bytes)) ).sequence.leftMap(err => new Throwable(s"Failed To Encode Response $err")).liftTo[F] - out.flatMap(socket.write) >> - Stream.eval(socket.read(maxBytes)) - .repeat - .unNoneTerminate - .unchunks - .through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte) - .take(calls.size.toLong) - .compile - .to(Chunk) + out.flatMap{bytes => + + val request = socket.write(bytes) >> + Stream.eval(socket.read(maxBytes)) + .repeat + .unNoneTerminate + .unchunks + .through(fs2.interop.scodec.StreamDecoder.many(Resp.CodecUtils.codec).toPipeByte) + .take(calls.size.toLong) + .compile + .to(Chunk) + + request.timeoutTo(redisRequestTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.RedisRequestTimeoutException(redisRequestTimeout)))) + } } def runRequestInternal[F[_]: Concurrent](connection: RedisConnection[F])( @@ -131,11 +138,21 @@ object RedisConnection{ val clusterUseDynamicRefreshSource: Boolean = true // Set to false to only use initially provided host for topology refresh val clusterCacheTopologySeconds: FiniteDuration = 1.second // How long topology will not be rechecked for after a succesful refresh val useTLS: Boolean = false - val requestTimeout: Duration = 60.seconds + // same as KeyPool.Builder.Defaults val idleTimeAllowedInPool: Duration = 30.seconds val maxIdle: Int = 100 val maxTotal: Int = 100 + + val commandTimeout: Duration = 30.seconds // If using a blocking operation this is likely inappropriate. + @deprecated("0.5.2", "Use Defaults.commandTimeout instead") + val requestTimeout: Duration = commandTimeout + + val redisRequestTimeout = 20.seconds // If using a blocking operation this is likely inappropriate. + + + // TODO config + private[rediculous] val maxBytes = 16 * 1024 * 1024 } def direct[F[_]: Temporal: Network]: DirectConnectionBuilder[F] = @@ -147,7 +164,8 @@ object RedisConnection{ TLSParameters.Default, None, Defaults.useTLS, - Defaults.requestTimeout + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -162,8 +180,9 @@ object RedisConnection{ private val tlsParameters: TLSParameters, private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, - ) { self => + private val commandTimeout: Duration, + private val redisRequestTimeout: Duration + ) { self => private def copy( sg: SocketGroup[F] = self.sg, @@ -173,7 +192,8 @@ object RedisConnection{ tlsParameters: TLSParameters = self.tlsParameters, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout, ): DirectConnectionBuilder[F] = new DirectConnectionBuilder( sg, host, @@ -182,7 +202,8 @@ object RedisConnection{ tlsParameters, auth, useTLS, - defaultTimeout + commandTimeout, + redisRequestTimeout, ) def withHost(host: Host) = copy(host = host) @@ -195,9 +216,12 @@ object RedisConnection{ def withoutAuth = copy(auth = None) def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + @deprecated("0.5.2", "Use withCommandTimeout") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) - def build: Resource[F,RedisConnection[F]] = + def build: Resource[F,RedisConnection[F]] = for { socket <- sg.client(SocketAddress(host,port), Nil) tlsContextOptWithDefault <- @@ -209,11 +233,11 @@ object RedisConnection{ _ <- Resource.eval(auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(out)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(out, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(out)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(out, commandTimeout, redisRequestTimeout)).void }) - } yield new TimeoutConnection(RedisConnection.DirectConnection(out), defaultTimeout) + } yield RedisConnection.DirectConnection(out, commandTimeout, redisRequestTimeout) } def pool[F[_]: Temporal: Network]: PooledConnectionBuilder[F] = @@ -225,10 +249,11 @@ object RedisConnection{ TLSParameters.Default, None, Defaults.useTLS, - Defaults.requestTimeout, Defaults.idleTimeAllowedInPool, Defaults.maxIdle, - Defaults.maxTotal + Defaults.maxTotal, + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -243,10 +268,12 @@ object RedisConnection{ private val tlsParameters: TLSParameters, private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, private val idleTimeAllowedInPool: Duration, private val maxIdle: Int, - private val maxTotal: Int + private val maxTotal: Int, + private val commandTimeout: Duration, + private val redisRequestTimeout: Duration, + ) { self => private def copy( @@ -257,10 +284,12 @@ object RedisConnection{ tlsParameters: TLSParameters = self.tlsParameters, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout, + idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool, maxIdle: Int = self.maxIdle, - maxTotal: Int = self.maxTotal + maxTotal: Int = self.maxTotal, + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout ): PooledConnectionBuilder[F] = new PooledConnectionBuilder( sg, host, @@ -269,10 +298,11 @@ object RedisConnection{ tlsParameters, auth, useTLS, - defaultTimeout, idleTimeAllowedInPool, maxIdle, - maxTotal + maxTotal, + commandTimeout, + redisRequestTimeout, ) def withHost(host: Host) = copy(host = host) @@ -285,7 +315,11 @@ object RedisConnection{ def withoutAuth = copy(auth = None) def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + + @deprecated("0.5.2", "Use withCommandTimeout") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration) def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle) @@ -304,9 +338,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void } ) } @@ -315,7 +349,7 @@ object RedisConnection{ .withMaxTotal(maxTotal) .withMaxPerKey(Function.const(maxTotal)) .build - } yield new TimeoutConnection(PooledConnection[F](kp), defaultTimeout) + } yield new PooledConnection[F](kp, commandTimeout, redisRequestTimeout) } @@ -331,10 +365,11 @@ object RedisConnection{ Defaults.chunkSizeLimit, None, Defaults.useTLS, - Defaults.requestTimeout, Defaults.idleTimeAllowedInPool, Defaults.maxIdle, - Defaults.maxTotal + Defaults.maxTotal, + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -352,11 +387,13 @@ object RedisConnection{ private val chunkSizeLimit: Int, private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, private val idleTimeAllowedInPool: Duration, private val maxIdle: Int, - private val maxTotal: Int - ) { self => + private val maxTotal: Int, + + private val commandTimeout: Duration, // Command Timeout + private val redisRequestTimeout: Duration, // Redis Interaction Timeout + ) { self => private def copy( sg: SocketGroup[F] = self.sg, @@ -369,10 +406,12 @@ object RedisConnection{ chunkSizeLimit: Int = self.chunkSizeLimit, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout, + idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool, maxIdle: Int = self.maxIdle, - maxTotal: Int = self.maxTotal + maxTotal: Int = self.maxTotal, + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout, ): QueuedConnectionBuilder[F] = new QueuedConnectionBuilder( sg, host, @@ -384,10 +423,11 @@ object RedisConnection{ chunkSizeLimit, auth, useTLS, - defaultTimeout, idleTimeAllowedInPool, maxIdle, - maxTotal + maxTotal, + commandTimeout, + redisRequestTimeout ) def withHost(host: Host) = copy(host = host) @@ -405,7 +445,10 @@ object RedisConnection{ def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + @deprecated("0.5.2", "Use withCommandTimeout instaead") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration) def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle) @@ -427,9 +470,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void } ) } @@ -444,10 +487,13 @@ object RedisConnection{ keypool.take(()).attempt.use{ case Right(m) => val out = chunk.map(_._2) - explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize - case Left(_) => m.canBeReused.set(Reusable.DontReuse) - case _ => Applicative[F].unit - } + explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout) + .attempt + .timeout(redisRequestTimeout) // Apply Timeout To Call to Redis, this is independent of the timeout on individual calls + .flatTap{// Currently Guarantee Chunk.size === returnSize + case Left(_) => m.canBeReused.set(Reusable.DontReuse) + case _ => Applicative[F].unit + } case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ case Right(n) => @@ -459,7 +505,7 @@ object RedisConnection{ } case e@Left(_) => chunk.traverse_{ case (deff, _) => deff(e.asInstanceOf[Either[Throwable, Resp]])} - }) + }) } else { Stream.empty } @@ -468,7 +514,7 @@ object RedisConnection{ .compile .drain .background - } yield new TimeoutConnection(Queued(queue, keypool.take(())), defaultTimeout) + } yield new Queued(queue, keypool.take(()), commandTimeout) } } @@ -487,10 +533,12 @@ object RedisConnection{ Defaults.clusterCacheTopologySeconds, None, Defaults.useTLS, - Defaults.requestTimeout, + Defaults.idleTimeAllowedInPool, Defaults.maxIdle, - Defaults.maxTotal + Defaults.maxTotal, + Defaults.commandTimeout, + Defaults.redisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -511,10 +559,13 @@ object RedisConnection{ private val cacheTopologySeconds: FiniteDuration, // How long topology will not be rechecked for after a succesful refresh private val auth: Option[(Option[String], String)], private val useTLS: Boolean, - private val defaultTimeout: Duration, + private val idleTimeAllowedInPool: Duration, private val maxIdle: Int, - private val maxTotal: Int + private val maxTotal: Int, + + private val commandTimeout: Duration, + private val redisRequestTimeout: Duration, ) { self => private def copy( @@ -531,10 +582,11 @@ object RedisConnection{ cacheTopologySeconds: FiniteDuration = self.cacheTopologySeconds, auth: Option[(Option[String], String)] = self.auth, useTLS: Boolean = self.useTLS, - defaultTimeout: Duration = self.defaultTimeout, idleTimeAllowedInPool: Duration = self.idleTimeAllowedInPool, maxIdle: Int = self.maxIdle, - maxTotal: Int = self.maxTotal + maxTotal: Int = self.maxTotal, + commandTimeout: Duration = self.commandTimeout, + redisRequestTimeout: Duration = self.redisRequestTimeout, ): ClusterConnectionBuilder[F] = new ClusterConnectionBuilder( sg, host, @@ -549,10 +601,11 @@ object RedisConnection{ cacheTopologySeconds, auth, useTLS, - defaultTimeout, idleTimeAllowedInPool, maxIdle, - maxTotal + maxTotal, + commandTimeout, + redisRequestTimeout, ) def withHost(host: Host) = copy(host = host) @@ -575,7 +628,11 @@ object RedisConnection{ def withTLS = copy(useTLS = true) def withoutTLS = copy(useTLS = false) - def withRequestTimeout(timeout: Duration) = copy(defaultTimeout = timeout) + + def withCommandTimeout(timeout: Duration) = copy(commandTimeout = timeout) + @deprecated("0.5.2", "Use withCommandTimeout") + def withRequestTimeout(timeout: Duration) = withCommandTimeout(timeout) + def withRedisRequestTimeout(timeout: Duration) = copy(redisRequestTimeout = timeout) def withIdleTimeAllowedInPool(duration: Duration) = copy(idleTimeAllowedInPool = duration) def withMaxIdle(maxIdle: Int) = copy(maxIdle = maxIdle) @@ -598,9 +655,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void } ) } @@ -610,7 +667,7 @@ object RedisConnection{ .withMaxPerKey(Function.const(maxTotal)).build // Cluster Topology Acquisition and Management - sockets <- Resource.eval(keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))) + sockets <- Resource.eval(keypool.take((host, port)).map(_.value).map(DirectConnection(_, commandTimeout, redisRequestTimeout)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))) now <- Resource.eval(Temporal[F].realTime.map(_.toMillis)) refreshLock <- Resource.eval(Semaphore[F](1L)) refTopology <- Resource.eval(Ref[F].of((sockets, now))) @@ -629,7 +686,7 @@ object RedisConnection{ case ((_, setAt), now) if setAt >= (now - cacheTopologySeconds.toMillis) => Applicative[F].unit case ((l, _), _) => val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) => - keypool.take((host, port)).map(_.value).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) + keypool.take((host, port)).map(_.value).map(DirectConnection(_, commandTimeout, redisRequestTimeout)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) } raceNThrowFirst(nelActions) .flatMap(s => Clock[F].realTime.map(_.toMillis).flatMap(now => refTopology.set((s,now)))) @@ -650,8 +707,8 @@ object RedisConnection{ case (server, rest) => keypool.take(server).attempt.use{ case Right(m) => - val out = Chunk.seq(rest.map(_._5)) - explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize + val out = Chunk.from(rest.map(_._5)) + explicitPipelineRequest(m.value, out, Defaults.maxBytes, commandTimeout).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } @@ -667,7 +724,7 @@ object RedisConnection{ // Offer To Have it reprocessed. // If the queue is full return the error to the user cluster.queue.tryOffer(Chunk.singleton((toSet, key, extractServer(s), retries + 1, initialCommand))) - .ifM( + .ifM( Applicative[F].unit, toSet(Either.right(e)).void ) @@ -707,7 +764,7 @@ object RedisConnection{ .compile .drain .background - } yield new TimeoutConnection(cluster, defaultTimeout) + } yield cluster } } @@ -733,10 +790,8 @@ object RedisConnection{ private def raceNThrowFirst[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[A] = Stream(Stream.emits(nel.toList).evalMap(identity)).covary[F].parJoinUnbounded.take(1).compile.lastOrError - private[rediculous] case class TimeoutConnection[F[_]: Temporal](rC: RedisConnection[F], duration: Duration) extends RedisConnection[F] { - - def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = - rC.runRequest(inputs, key).timeout(duration) - + // We create this to create custom Timeouts in timeO + private implicit def deferFromMonad[F[_]: cats.Monad]: cats.Defer[F] = new cats.Defer[F] { + def defer[A](fa: => F[A]): F[A] = Monad[F].unit.flatMap(_ => fa) } } diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala index 27e40c1..33d7a0d 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala @@ -1,5 +1,8 @@ package io.chrisdavenport.rediculous +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeoutException + /** Indicates a Error while processing for Rediculous */ trait RedisError extends RuntimeException { @@ -23,4 +26,11 @@ object RedisError { override val message: String = s"Error encountered in queue: ${baseCase.getMessage()}" override val cause: Option[Throwable] = Some(baseCase) } + + // TODO + trait RedisTimeoutException + + final case class CommandTimeoutException(timeout: Duration) extends TimeoutException(s"Redis Command Timed Out: $timeout") with RedisTimeoutException + final case class RedisRequestTimeoutException(timeout: Duration) extends TimeoutException(s"Redis Request Timed Out: $timeout") with RedisTimeoutException + } \ No newline at end of file diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala index fcb8f13..aaffdd0 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala @@ -220,20 +220,20 @@ object RedisPubSub { * connections to all nodes. **/ def fromConnection[F[_]: Concurrent](connection: RedisConnection[F], maxBytes: Int = 8096, clusterBroadcast: Boolean = false): Resource[F, RedisPubSub[F]] = connection match { - case RedisConnection.TimeoutConnection(conn, _) => fromConnection(conn, maxBytes, clusterBroadcast) - case RedisConnection.Queued(_, sockets) => + // case RedisConnection.TimeoutConnection(conn, _, _, _) => fromConnection(conn, maxBytes, clusterBroadcast) + case RedisConnection.Queued(_, sockets, _) => sockets.flatMap{managed => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit) - Resource.eval((messagesR, onNonMessageR, onUnhandledMessageR).tupled).flatMap{case (ref, onNonMessage, onUnhandledMessage) => + Resource.eval((messagesR, onNonMessageR, onUnhandledMessageR).tupled).flatMap{case (ref, onNonMessage, onUnhandledMessage) => Resource.makeCase(socket(connection, managed.value :: Nil, maxBytes, onNonMessage, onUnhandledMessage, ref).pure[F]){ case (_, Resource.ExitCase.Errored(_)) | (_, Resource.ExitCase.Canceled) => managed.canBeReused.set(Reusable.DontReuse) case (pubsub, Resource.ExitCase.Succeeded) => pubsub.unsubscribeAll } } } - case RedisConnection.PooledConnection(pool) => + case RedisConnection.PooledConnection(pool, _, _) => pool.take(()).flatMap{managed => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) @@ -245,7 +245,7 @@ object RedisPubSub { } } } - case RedisConnection.DirectConnection(s) => + case RedisConnection.DirectConnection(s, _, _) => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit) diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala index 3e44fcd..59378d8 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/BufferedSocket.scala @@ -1,11 +1,9 @@ package io.chrisdavenport.rediculous.util -import cats._ import cats.syntax.all._ import fs2._ import fs2.io.net.Socket import cats.effect._ -import cats.effect.std.Queue import com.comcast.ip4s.{IpAddress, SocketAddress} private[rediculous] trait BufferedSocket[F[_]] extends Socket[F]{ @@ -30,12 +28,12 @@ private[rediculous] object BufferedSocket{ // This can return more bytes than max bytes, may want to refine this later def read(maxBytes: Int): F[Option[Chunk[Byte]]] = takeBuffer.flatMap{ - case s@Some(value) => value.some.pure[F] + case Some(value) => value.some.pure[F] case None => socket.read(maxBytes) } def readN(numBytes: Int): F[Chunk[Byte]] = takeBuffer.flatMap{ - case s@Some(value) => value.pure[F] + case Some(value) => value.pure[F] case None => socket.readN(numBytes) } @@ -53,7 +51,7 @@ private[rediculous] object BufferedSocket{ def write(bytes: Chunk[Byte]): F[Unit] = socket.write(bytes) - def writes: Pipe[F,Byte,INothing] = socket.writes + def writes: Pipe[F,Byte,Nothing] = socket.writes } diff --git a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala index 6606863..db8ac38 100644 --- a/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala +++ b/core/shared/src/test/scala/io/chrisdavenport/rediculous/RedisConnectionSpec.scala @@ -18,7 +18,7 @@ class RedisConnectionSpec extends RediculousCrossSuite { def endOfInput: IO[Unit] = ??? def endOfOutput: IO[Unit] = ??? - + def isOpen: IO[Boolean] = ??? def remoteAddress: IO[SocketAddress[IpAddress]] = ??? @@ -37,7 +37,7 @@ class RedisConnectionSpec extends RediculousCrossSuite { def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): fs2.Stream[IO,Socket[IO]] = ??? def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[IO,(SocketAddress[IpAddress], fs2.Stream[IO,Socket[IO]])] = ??? - + } RedisConnection.queued[IO].withSocketGroup(sg).build From 88735a9046ed62452888fafef3dc0ee72aa494aa Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 12:47:06 -0700 Subject: [PATCH 2/8] Updates --- .../io/chrisdavenport/rediculous/RedisConnection.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 1c22ed9..cb1aabd 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -65,7 +65,7 @@ object RedisConnection{ }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } - private[rediculous] case class Cluster[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{ + private[rediculous] case class Cluster[F[_]: Temporal](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]], commandTimeout: Duration) extends RedisConnection[F]{ def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = { val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c => @@ -73,7 +73,7 @@ object RedisConnection{ c.traverse(_._1.get).flatMap(_.sequence.liftTo[F].adaptError{case e => RedisError.QueuedExceptionError(e)}) } } - } + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) } // Guarantees With Socket That Each Call Receives a Response @@ -693,7 +693,7 @@ object RedisConnection{ } ) queue <- Resource.eval(Queue.bounded[F, Chunk[(Either[Throwable,Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]](maxQueued)) - cluster = Cluster(queue, refTopology.get.map(_._1), {case(host, port) => keypool.take((host, port))}) + cluster = Cluster(queue, refTopology.get.map(_._1), {case(host, port) => keypool.take((host, port))}, commandTimeout) _ <- Stream.fromQueueUnterminatedChunk(queue, chunkSizeLimit).chunks.map{chunk => val s = if (chunk.nonEmpty) { @@ -708,7 +708,7 @@ object RedisConnection{ keypool.take(server).attempt.use{ case Right(m) => val out = Chunk.from(rest.map(_._5)) - explicitPipelineRequest(m.value, out, Defaults.maxBytes, commandTimeout).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize + explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize case Left(_) => m.canBeReused.set(Reusable.DontReuse) case _ => Applicative[F].unit } From d02575cb1c407813e45792b46d6d0f8737864595 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 12:53:15 -0700 Subject: [PATCH 3/8] Fix matcher --- .../main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala index aaffdd0..b194b54 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisPubSub.scala @@ -245,7 +245,7 @@ object RedisPubSub { } } } - case RedisConnection.DirectConnection(s, _, _) => + case RedisConnection.DirectConnection(s, _, _) => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit) @@ -254,7 +254,7 @@ object RedisPubSub { pubsub => pubsub.unsubscribeAll } } - case RedisConnection.Cluster(_, topology, sockets) => + case RedisConnection.Cluster(_, topology, sockets, _) => val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]()) val onNonMessageR = Concurrent[F].ref((_: PubSubReply) => Applicative[F].unit) val onUnhandledMessageR = Concurrent[F].ref((_: PubSubMessage) => Applicative[F].unit) From 05ad767dc2ca8a768309ac043ffbf751428d171f Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 13:03:51 -0700 Subject: [PATCH 4/8] Fix Inference --- .../io/chrisdavenport/rediculous/RedisConnection.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index cb1aabd..98bdc29 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -38,7 +38,7 @@ object RedisConnection{ y } } - }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.CommandTimeoutException(commandTimeout)))) } private[rediculous] case class PooledConnection[F[_]: Temporal]( pool: KeyPool[F, Unit, Socket[F]], @@ -54,7 +54,7 @@ object RedisConnection{ case _ => Applicative[F].unit } }.rethrow - }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.CommandTimeoutException(commandTimeout)))) } private[rediculous] case class DirectConnection[F[_]: Temporal](socket: Socket[F], commandTimeout: Duration, redisRequestTimeout: Duration) extends RedisConnection[F]{ @@ -62,7 +62,7 @@ object RedisConnection{ val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest)) def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout) withSocket(socket) - }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.CommandTimeoutException(commandTimeout)))) } private[rediculous] case class Cluster[F[_]: Temporal](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]], commandTimeout: Duration) extends RedisConnection[F]{ @@ -73,7 +73,7 @@ object RedisConnection{ c.traverse(_._1.get).flatMap(_.sequence.liftTo[F].adaptError{case e => RedisError.QueuedExceptionError(e)}) } } - }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.CommandTimeoutException(commandTimeout)))) + }.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.CommandTimeoutException(commandTimeout)))) } // Guarantees With Socket That Each Call Receives a Response @@ -94,7 +94,7 @@ object RedisConnection{ .compile .to(Chunk) - request.timeoutTo(redisRequestTimeout, Defer[F].defer(Temporal[F].raiseError(RedisError.RedisRequestTimeoutException(redisRequestTimeout)))) + request.timeoutTo(redisRequestTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.RedisRequestTimeoutException(redisRequestTimeout)))) } } From 186f69f58a4c73a78fb02ca0ee2dc021de2ab5e6 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 13:10:35 -0700 Subject: [PATCH 5/8] Add Mima Exclusions --- build.sbt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/build.sbt b/build.sbt index 36ca251..6d481ed 100644 --- a/build.sbt +++ b/build.sbt @@ -61,6 +61,10 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#ClusterConnectionBuilder.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#PooledConnectionBuilder.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#QueuedConnectionBuilder.this"), + + ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#DirectConnectionBuilder.this"), + ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection"), + ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection$") ) ).jsSettings( scalaJSLinkerConfig ~= { _.withModuleKind(ModuleKind.CommonJSModule)} From 93643a68ee52924f739b2672673507c3062d0989 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 13:17:46 -0700 Subject: [PATCH 6/8] More Mima --- build.sbt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 6d481ed..b2fce90 100644 --- a/build.sbt +++ b/build.sbt @@ -64,7 +64,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#DirectConnectionBuilder.this"), ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection"), - ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection$") + ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection.explicitPipelineRequest"), + ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection.explicitPipelineRequest$default$3"), + ProblemFilters.exclude[MissingFieldProblem]("io.chrisdavenport.rediculous.RedisConnection.TimeoutConnection"), + ) ).jsSettings( scalaJSLinkerConfig ~= { _.withModuleKind(ModuleKind.CommonJSModule)} From b64d82da04a270dd1589cf075a06e5f6ed2a3c6c Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 29 Mar 2024 13:53:05 -0700 Subject: [PATCH 7/8] Split defaults for owned vs shared connections --- .../rediculous/RedisConnection.scala | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 98bdc29..537b39e 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -144,11 +144,21 @@ object RedisConnection{ val maxIdle: Int = 100 val maxTotal: Int = 100 - val commandTimeout: Duration = 30.seconds // If using a blocking operation this is likely inappropriate. - @deprecated("0.5.2", "Use Defaults.commandTimeout instead") - val requestTimeout: Duration = commandTimeout + // If using a blocking operation these is likely inappropriate. + // You want Command Timeout to be higher than RedisRequestTimeout + val ownedConnectionCommandTimeout: Duration = 10.seconds + val ownedConnectionRedisRequestTimeout: Duration = 5.seconds + + // If using a blocking operation this is likely inappropriate. + val sharedConnectionCommandTimeout: Duration = 5.seconds + val sharedConnectionRedisRequestTimeout: Duration = 5.seconds + + + @deprecated("0.5.2", "Use Defaults.ownedConnectionCommandTimeout or Defaults. instead") + val requestTimeout: Duration = 30.seconds + + - val redisRequestTimeout = 20.seconds // If using a blocking operation this is likely inappropriate. // TODO config @@ -164,8 +174,8 @@ object RedisConnection{ TLSParameters.Default, None, Defaults.useTLS, - Defaults.commandTimeout, - Defaults.redisRequestTimeout, + Defaults.ownedConnectionCommandTimeout, + Defaults.ownedConnectionRedisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -233,9 +243,9 @@ object RedisConnection{ _ <- Resource.eval(auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(out, commandTimeout, redisRequestTimeout)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(out, Duration.Inf, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(out, commandTimeout, redisRequestTimeout)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(out, Duration.Inf, redisRequestTimeout)).void }) } yield RedisConnection.DirectConnection(out, commandTimeout, redisRequestTimeout) } @@ -252,8 +262,8 @@ object RedisConnection{ Defaults.idleTimeAllowedInPool, Defaults.maxIdle, Defaults.maxTotal, - Defaults.commandTimeout, - Defaults.redisRequestTimeout, + Defaults.ownedConnectionCommandTimeout, + Defaults.ownedConnectionRedisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -338,9 +348,9 @@ object RedisConnection{ auth match { case None => ().pure[F] case Some((Some(username), password)) => - RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void + RedisCommands.auth[Redis[F, *]](username, password).run(DirectConnection(socket, Duration.Inf, redisRequestTimeout)).void case Some((None, password)) => - RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, commandTimeout, redisRequestTimeout)).void + RedisCommands.auth[Redis[F, *]](password).run(DirectConnection(socket, Duration.Inf, redisRequestTimeout)).void } ) } @@ -368,8 +378,8 @@ object RedisConnection{ Defaults.idleTimeAllowedInPool, Defaults.maxIdle, Defaults.maxTotal, - Defaults.commandTimeout, - Defaults.redisRequestTimeout, + Defaults.sharedConnectionCommandTimeout, + Defaults.sharedConnectionRedisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") @@ -537,8 +547,8 @@ object RedisConnection{ Defaults.idleTimeAllowedInPool, Defaults.maxIdle, Defaults.maxTotal, - Defaults.commandTimeout, - Defaults.redisRequestTimeout, + Defaults.sharedConnectionCommandTimeout, + Defaults.sharedConnectionRedisRequestTimeout, ) @deprecated("Use overload that takes a Network", "0.4.1") From 52768a81912313f900b96dba604ad0f3fc9e77e4 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 9 Apr 2024 12:13:28 -0700 Subject: [PATCH 8/8] Minor Version Bump for Stability --- build.sbt | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index b2fce90..9dccc22 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ import com.typesafe.tools.mima.core._ -ThisBuild / tlBaseVersion := "0.5" // your current series x.y +ThisBuild / tlBaseVersion := "0.6" // your current series x.y ThisBuild / organization := "io.chrisdavenport" ThisBuild / organizationName := "Christopher Davenport" @@ -56,20 +56,6 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "org.scalameta" %%% "munit-scalacheck" % "1.0.0-M10" % Test, ), libraryDependencies += "org.scodec" %%% "scodec-core" % (if (scalaVersion.value.startsWith("2.")) "1.11.10" else "2.2.2"), - - mimaBinaryIssueFilters ++= Seq( - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#ClusterConnectionBuilder.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#PooledConnectionBuilder.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#QueuedConnectionBuilder.this"), - - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection#DirectConnectionBuilder.this"), - ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection"), - ProblemFilters.exclude[MissingClassProblem]("io.chrisdavenport.rediculous.RedisConnection$TimeoutConnection$"), - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection.explicitPipelineRequest"), - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.rediculous.RedisConnection.explicitPipelineRequest$default$3"), - ProblemFilters.exclude[MissingFieldProblem]("io.chrisdavenport.rediculous.RedisConnection.TimeoutConnection"), - - ) ).jsSettings( scalaJSLinkerConfig ~= { _.withModuleKind(ModuleKind.CommonJSModule)} ).jvmSettings(