From afeb4ea2ec32d6482a59c54e3e1d0224c617cfb1 Mon Sep 17 00:00:00 2001 From: Valerian Barbot Date: Sat, 21 Sep 2013 13:17:34 +0200 Subject: [PATCH] server commands (resolve #13) --- src/main/scala/redis/api/Servers.scala | 139 ++++++++++++ src/main/scala/redis/api/api.scala | 9 +- src/main/scala/redis/commands/Server.scala | 82 +++---- .../scala/redis/commands/ServerSpec.scala | 212 ++++++++++-------- 4 files changed, 296 insertions(+), 146 deletions(-) create mode 100644 src/main/scala/redis/api/Servers.scala diff --git a/src/main/scala/redis/api/Servers.scala b/src/main/scala/redis/api/Servers.scala new file mode 100644 index 00000000..8c7ddff8 --- /dev/null +++ b/src/main/scala/redis/api/Servers.scala @@ -0,0 +1,139 @@ +package redis.api.servers + +import redis._ +import akka.util.ByteString +import redis.protocol.{MultiBulk, Bulk} +import redis.api.ShutdownModifier + +case object Bgrewriteaof extends RedisCommandStatusString { + val isMasterOnly = true + val encodedRequest: ByteString = encode("BGREWRITEAOF") +} + +case object Bgsave extends RedisCommandStatusString { + val isMasterOnly = true + val encodedRequest: ByteString = encode("BGSAVE") +} + +case class ClientKill(ip: String, port: Int) extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("KILL"), ByteString(ip + ":" + port))) +} + +case object ClientList extends RedisCommandBulk[Seq[Map[String, String]]] { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("LIST"))) + + def decodeReply(r: Bulk): Seq[Map[String, String]] = r.asOptByteString.map(bs => { + val s = bs.utf8String + val r = s.split('\n').map(line => { + line.split(' ').map(kv => { + val keyValue = kv.split('=') + val value = if (keyValue.length > 1) keyValue(1) else "" + (keyValue(0), value) + }).toMap + }).toSeq + r + }).getOrElse(Seq.empty) +} + +case object ClientGetname extends RedisCommandBulkOptionByteString[String] { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("GETNAME"))) + val deserializer: ByteStringDeserializer[String] = ByteStringDeserializer.String +} + + +case class ClientSetname(connectionName: String) extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("SETNAME"), ByteString(connectionName))) +} + +case class ConfigGet(parameter: String) extends RedisCommandMultiBulk[Map[String, String]] { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CONFIG", Seq(ByteString("GET"), ByteString(parameter))) + + def decodeReply(r: MultiBulk): Map[String, String] = MultiBulkConverter.toMapString(r) +} + +case class ConfigSet(parameter: String, value: String) extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CONFIG", Seq(ByteString("SET"), ByteString(parameter), ByteString(value))) +} + +case object ConfigResetstat extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("CONFIG", Seq(ByteString("RESETSTAT"))) +} + +case object Dbsize extends RedisCommandIntegerLong { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("DBSIZE") +} + +case class DebugObject[K](key: K)(implicit redisKey: ByteStringSerializer[K]) extends RedisCommandStatusString { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("DEBUG", Seq(ByteString("OBJECT"), redisKey.serialize(key))) +} + +case object DebugSegfault extends RedisCommandStatusString { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("DEBUG SEGFAULT") +} + +case object Flushall extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("FLUSHALL") +} + +case object Flushdb extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("FLUSHDB") +} + +case class Info(section: Option[String] = None) extends RedisCommandBulk[String] { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("INFO", section.map(s => Seq(ByteString(s))).getOrElse(Seq())) + + def decodeReply(r: Bulk): String = r.toOptString.get +} + +case object Lastsave extends RedisCommandIntegerLong { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("LASTSAVE") +} + +case object Save extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("SAVE") +} + +case class Slaveof(ip: String, port: Int) extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("SLAVEOF", Seq(ByteString(ip), ByteString(port.toString))) +} + +case class Shutdown(modifier: Option[ShutdownModifier] = None) extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("SHUTDOWN", modifier.map(m => Seq(ByteString(m.toString))).getOrElse(Seq.empty)) +} + +case object SlaveofNoOne extends RedisCommandStatusBoolean { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("SLAVEOF NO ONE") +} + +case object Time extends RedisCommandMultiBulk[(Long, Long)] { + val isMasterOnly: Boolean = true + val encodedRequest: ByteString = encode("TIME") + + def decodeReply(mb: MultiBulk): (Long, Long) = { + mb.responses.map(r => { + (r.head.toByteString.utf8String.toLong, r.tail.head.toByteString.utf8String.toLong) + }).get + } +} + +//case class Log(id: Long, timestamp: Long, duration: Long, command: Seq[ByteString]) + +//case class Slowlog(subcommand: String, argurment: String) \ No newline at end of file diff --git a/src/main/scala/redis/api/api.scala b/src/main/scala/redis/api/api.scala index e8cc3892..81da0530 100644 --- a/src/main/scala/redis/api/api.scala +++ b/src/main/scala/redis/api/api.scala @@ -41,4 +41,11 @@ sealed trait ListPivot case object AFTER extends ListPivot -case object BEFORE extends ListPivot \ No newline at end of file +case object BEFORE extends ListPivot + + +sealed trait ShutdownModifier + +case object SAVE extends ShutdownModifier + +case object NOSAVE extends ShutdownModifier \ No newline at end of file diff --git a/src/main/scala/redis/commands/Server.scala b/src/main/scala/redis/commands/Server.scala index 63e72880..bfadf6c1 100644 --- a/src/main/scala/redis/commands/Server.scala +++ b/src/main/scala/redis/commands/Server.scala @@ -1,88 +1,74 @@ package redis.commands import redis.Request +import redis.api.servers._ +import scala.concurrent.Future +import redis.api.ShutdownModifier trait Server extends Request { -/* - def bgrewriteaof[A](): Future[String] = - send("BGREWRITEAOF").mapTo[Status].map(_.toString) + def bgrewriteaof(): Future[String] = send(Bgrewriteaof) - def bgsave(): Future[String] = - send("BGSAVE").mapTo[Status].map(_.toString) + def bgsave(): Future[String] = send(Bgsave) def clientKill(ip: String, port: Int): Future[Boolean] = - send("CLIENT", Seq(ByteString("KILL"), ByteString(ip + ':' + port.toString))).mapTo[Status].map(_.toBoolean) + send(ClientKill(ip, port)) - def clientList(): Future[String] = - send("CLIENT", Seq(ByteString("LIST"))).mapTo[Bulk].map(_.toString) + def clientList(): Future[Seq[Map[String, String]]] = + send(ClientList) def clientGetname(): Future[Option[String]] = - send("CLIENT", Seq(ByteString("GETNAME"))).mapTo[Bulk].map(_.toOptString) + send(ClientGetname) def clientSetname(connectionName: String): Future[Boolean] = - send("CLIENT", Seq(ByteString("SETNAME"), ByteString(connectionName))).mapTo[Status].map(_.toBoolean) + send(ClientSetname(connectionName)) - def configGet(parameter: String): Future[Option[String]] = - send("CONFIG GET", Seq(ByteString(parameter))).mapTo[Bulk].map(_.toOptString) + def configGet(parameter: String): Future[Map[String, String]] = + send(ConfigGet(parameter)) def configSet(parameter: String, value: String): Future[Boolean] = - send("CONFIG SET", Seq(ByteString(parameter), ByteString(value))).mapTo[Status].map(_.toBoolean) + send(ConfigSet(parameter, value)) - def configResetstat(parameter: String, value: String): Future[Boolean] = - send("CONFIG RESETSTAT").mapTo[Status].map(_.toBoolean) + def configResetstat(): Future[Boolean] = + send(ConfigResetstat) def dbsize(): Future[Long] = - send("DBSIZE").mapTo[Integer].map(_.toLong) + send(Dbsize) - def debugObject(key: String): Future[ByteString] = - send("DEBUG OBJECT", Seq(ByteString(key))).mapTo[Status].map(_.toByteString) + def debugObject(key: String): Future[String] = + send(DebugObject(key)) - def debugSegfault(): Future[ByteString] = - send("DEBUG SEGFAULT").mapTo[Status].map(_.toByteString) + def debugSegfault(): Future[String] = + send(DebugSegfault) def flushall(): Future[Boolean] = - send("FLUSHALL").mapTo[Status].map(_.toBoolean) + send(Flushall) def flushdb(): Future[Boolean] = - send("FLUSHDB").mapTo[Status].map(_.toBoolean) + send(Flushdb) def info(): Future[String] = - send("INFO").mapTo[Bulk].map(_.toString) + send(Info()) def info(section: String): Future[String] = - send("INFO", Seq(ByteString(section))).mapTo[Bulk].map(_.toString) + send(Info(Some(section))) def lastsave(): Future[Long] = - send("LASTSAVE").mapTo[Integer].map(_.toLong) - - def monitor(): Future[Long] = ??? // TODO blocking! + send(Lastsave) def save(): Future[Boolean] = - send("SAVE").mapTo[Status].map(_.toBoolean) + send(Save) def shutdown(): Future[Boolean] = - send("SHUTDOWN").mapTo[Status].map(_.toBoolean) + send(Shutdown()) - // timeout on success LOL def shutdown(modifier: ShutdownModifier): Future[Boolean] = - send("SHUTDOWN", Seq(ByteString(modifier.toString))).mapTo[Status].map(_.toBoolean) - - def slaveof(host: String, port: Int): Future[String] = - send("SLAVEOF", Seq(ByteString(host), ByteString(port.toString))).mapTo[Status].map(_.toString) - - def slowlog(subcommand: String, argument: String): Future[String] = - send("SLOWLOG", Seq(ByteString(subcommand), ByteString(argument))).mapTo[Status].map(_.toString) - - def sync(): Future[RedisReply] = - send("SYNC").mapTo[RedisReply] - - def time()(implicit convert: MultiBulkConverter[Seq[ByteString]]): Future[Try[Seq[ByteString]]] = - send("TIME").mapTo[MultiBulk].map(_.asTry[Seq[ByteString]]) -*/ -} + send(Shutdown(Some(modifier))) -sealed trait ShutdownModifier + def slaveof(host: String, port: Int): Future[Boolean] = + send(Slaveof(host, port)) -case object NOSAVE extends ShutdownModifier + def slaveofNoOne(): Future[Boolean] = send(SlaveofNoOne) -case object SAVE extends ShutdownModifier \ No newline at end of file + def time(): Future[(Long, Long)] = + send(Time) +} \ No newline at end of file diff --git a/src/test/scala/redis/commands/ServerSpec.scala b/src/test/scala/redis/commands/ServerSpec.scala index 9c94d116..5795c5b4 100644 --- a/src/test/scala/redis/commands/ServerSpec.scala +++ b/src/test/scala/redis/commands/ServerSpec.scala @@ -2,108 +2,126 @@ package redis.commands import redis._ import scala.concurrent.Await -import akka.util.ByteString -import redis.actors.ReplyErrorException +import redis.actors.{InvalidRedisReply, ReplyErrorException} +import redis.api.NOSAVE + +class ServerSpec extends RedisStandaloneServer { -class ServerSpec extends RedisSpec { -/* sequential "Server commands" should { - withRedisCluster((masterPort, slavePort, sentinelPort) => { - val redis = RedisClient(port = masterPort) - - "BGREWRITEAOF && BGSAVE" in { - val r = for { - bgrewriteaof <- redis.bgrewriteaof() - bgsave <- { - Thread.sleep(2000) // ERR Can't BGSAVE while AOF log rewriting is in progress - redis.bgsave() - } - } yield { - bgrewriteaof mustEqual "Background append only file rewriting started" - bgsave mustEqual "Background saving started" - } - - Await.result(r, timeOut) - } - - "CLIENT KILL" in { - Await.result(redis.clientKill("8.8.8.8", 53), timeOut) must throwA[ReplyErrorException]("ERR No such client") - } - - "CLIENT LIST" in { - Await.result(redis.clientList(), timeOut) must beAnInstanceOf[String] - } - - "CLIENT GETNAME" in { - Await.result(redis.clientGetname(), timeOut) mustEqual None - } - - "CLIENT SETNAME" in { - Await.result(redis.clientSetname("rediscala"), timeOut) mustEqual true - } - - "CONFIG GET" in { - Await.result(redis.configGet("*"), timeOut) must beAnInstanceOf[String] - - } - "CONFIG SET" in { - todo - } - - "CONFIG RESETSTAT" in { - todo - } - - "DBSIZE" in { - Await.result(redis.dbsize(), timeOut) must be_>=(0l) - } - - "DEBUG OBJECT" in { - todo - } - - "DEBUG SEGFAULT" in { - todo - } - - "FLUSHALL" in { - todo - } - - "FLUSHDB" in { - //TODO new redis connection / change db / set / flushdb - todo - } - - "INFO" in { - val r = for { - info <- redis.info() - infoCpu <- redis.info("cpu") - } yield { - info must beAnInstanceOf[String] - infoCpu must beAnInstanceOf[String] - } - Await.result(r, timeOut) - } - - "LASTSAVE" in { - Await.result(redis.lastsave(), timeOut) must be_>=(0l) - } - - "SAVE" in { - //Await.result(redis.save(), timeOut) mustEqual true - todo - } - - "SHUTDOWN" in { - todo - } - + "BGREWRITEAOF" in { + Await.result(redis.bgsave(), timeOut) mustEqual "Background saving started" + } + + "CLIENT KILL" in { + Await.result(redis.clientKill("8.8.8.8", 53), timeOut) must throwA[ReplyErrorException]("ERR No such client") + } + + "CLIENT LIST" in { + val list = Await.result(redis.clientList(), timeOut) + list must beAnInstanceOf[Seq[Map[String, String]]] + list must not beEmpty + } + + "CLIENT GETNAME" in { + Await.result(redis.clientGetname(), timeOut) mustEqual None + } + + "CLIENT SETNAME" in { + Await.result(redis.clientSetname("rediscala"), timeOut) mustEqual true + } + + "CONFIG GET" in { + val map = Await.result(redis.configGet("*"), timeOut) + map must beAnInstanceOf[Map[String, String]] + map must not beEmpty + + } + "CONFIG SET" in { + val r = for { + set <- redis.configSet("loglevel", "warning") + loglevel <- redis.configGet("loglevel") + } yield { + set must beTrue + loglevel.get("loglevel") must beSome("warning") + } + Await.result(r, timeOut) + } + + "CONFIG RESETSTAT" in { + Await.result(redis.configResetstat(), timeOut) must beTrue + } + + "DBSIZE" in { + Await.result(redis.dbsize(), timeOut) must be_>=(0l) + } + + "DEBUG OBJECT" in { + Await.result(redis.debugObject("serverDebugObj"), timeOut) must throwA[ReplyErrorException]("ERR no such key") + } + + "DEBUG SEGFAULT" in { + todo + } + + "FLUSHALL" in { + Await.result(redis.flushall(), timeOut) must beTrue + } + + "FLUSHDB" in { + Await.result(redis.flushdb(), timeOut) must beTrue + } + + "INFO" in { + val r = for { + info <- redis.info() + infoCpu <- redis.info("cpu") + } yield { + info must beAnInstanceOf[String] + infoCpu must beAnInstanceOf[String] + } + Await.result(r, timeOut) + } + + "LASTSAVE" in { + Await.result(redis.lastsave(), timeOut) must be_>=(0l) + } + + "SAVE" in { + Await.result(redis.save(), timeOut) must beTrue + } + + "SLAVE OF" in { + Await.result(redis.slaveof("server", 12345), timeOut) must beTrue + } + + "SLAVE OF NO ONE" in { + Await.result(redis.slaveofNoOne(), timeOut) must beTrue + } + + "TIME" in { + Await.result(redis.time(), timeOut) match { + case (t1: Long, t2: Long) => ok + case x => ko(x.toString()) + } + } + + "BGREWRITEAOF" in { + Await.result(redis.bgrewriteaof(), timeOut) mustEqual "Background append only file rewriting started" + } + + "SHUTDOWN" in { + Await.result(redis.shutdown(), timeOut) must throwA(InvalidRedisReply) + } + + "SHUTDOWN (with modifier)" in { + withRedisServer(port => { + val redis = RedisClient(port = port) + Await.result(redis.shutdown(NOSAVE), timeOut) must throwA(InvalidRedisReply) + }) + } - }) } - */ }