Skip to content

Commit

Permalink
server commands (resolve #13)
Browse files Browse the repository at this point in the history
  • Loading branch information
Valerian Barbot committed Sep 21, 2013
1 parent 5534cfe commit afeb4ea
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 146 deletions.
139 changes: 139 additions & 0 deletions src/main/scala/redis/api/Servers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package redis.api.servers

import redis._
import akka.util.ByteString
import redis.protocol.{MultiBulk, Bulk}
import redis.api.ShutdownModifier

case object Bgrewriteaof extends RedisCommandStatusString {
val isMasterOnly = true
val encodedRequest: ByteString = encode("BGREWRITEAOF")
}

case object Bgsave extends RedisCommandStatusString {
val isMasterOnly = true
val encodedRequest: ByteString = encode("BGSAVE")
}

case class ClientKill(ip: String, port: Int) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("KILL"), ByteString(ip + ":" + port)))
}

case object ClientList extends RedisCommandBulk[Seq[Map[String, String]]] {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("LIST")))

def decodeReply(r: Bulk): Seq[Map[String, String]] = r.asOptByteString.map(bs => {
val s = bs.utf8String
val r = s.split('\n').map(line => {
line.split(' ').map(kv => {
val keyValue = kv.split('=')
val value = if (keyValue.length > 1) keyValue(1) else ""
(keyValue(0), value)
}).toMap
}).toSeq
r
}).getOrElse(Seq.empty)
}

case object ClientGetname extends RedisCommandBulkOptionByteString[String] {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("GETNAME")))
val deserializer: ByteStringDeserializer[String] = ByteStringDeserializer.String
}


case class ClientSetname(connectionName: String) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CLIENT", Seq(ByteString("SETNAME"), ByteString(connectionName)))
}

case class ConfigGet(parameter: String) extends RedisCommandMultiBulk[Map[String, String]] {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CONFIG", Seq(ByteString("GET"), ByteString(parameter)))

def decodeReply(r: MultiBulk): Map[String, String] = MultiBulkConverter.toMapString(r)
}

case class ConfigSet(parameter: String, value: String) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CONFIG", Seq(ByteString("SET"), ByteString(parameter), ByteString(value)))
}

case object ConfigResetstat extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("CONFIG", Seq(ByteString("RESETSTAT")))
}

case object Dbsize extends RedisCommandIntegerLong {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("DBSIZE")
}

case class DebugObject[K](key: K)(implicit redisKey: ByteStringSerializer[K]) extends RedisCommandStatusString {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("DEBUG", Seq(ByteString("OBJECT"), redisKey.serialize(key)))
}

case object DebugSegfault extends RedisCommandStatusString {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("DEBUG SEGFAULT")
}

case object Flushall extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("FLUSHALL")
}

case object Flushdb extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("FLUSHDB")
}

case class Info(section: Option[String] = None) extends RedisCommandBulk[String] {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("INFO", section.map(s => Seq(ByteString(s))).getOrElse(Seq()))

def decodeReply(r: Bulk): String = r.toOptString.get
}

case object Lastsave extends RedisCommandIntegerLong {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("LASTSAVE")
}

case object Save extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("SAVE")
}

case class Slaveof(ip: String, port: Int) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("SLAVEOF", Seq(ByteString(ip), ByteString(port.toString)))
}

case class Shutdown(modifier: Option[ShutdownModifier] = None) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("SHUTDOWN", modifier.map(m => Seq(ByteString(m.toString))).getOrElse(Seq.empty))
}

case object SlaveofNoOne extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("SLAVEOF NO ONE")
}

case object Time extends RedisCommandMultiBulk[(Long, Long)] {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("TIME")

def decodeReply(mb: MultiBulk): (Long, Long) = {
mb.responses.map(r => {
(r.head.toByteString.utf8String.toLong, r.tail.head.toByteString.utf8String.toLong)
}).get
}
}

//case class Log(id: Long, timestamp: Long, duration: Long, command: Seq[ByteString])

//case class Slowlog(subcommand: String, argurment: String)
9 changes: 8 additions & 1 deletion src/main/scala/redis/api/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ sealed trait ListPivot

case object AFTER extends ListPivot

case object BEFORE extends ListPivot
case object BEFORE extends ListPivot


sealed trait ShutdownModifier

case object SAVE extends ShutdownModifier

case object NOSAVE extends ShutdownModifier
82 changes: 34 additions & 48 deletions src/main/scala/redis/commands/Server.scala
Original file line number Diff line number Diff line change
@@ -1,88 +1,74 @@
package redis.commands

import redis.Request
import redis.api.servers._
import scala.concurrent.Future
import redis.api.ShutdownModifier

trait Server extends Request {
/*
def bgrewriteaof[A](): Future[String] =
send("BGREWRITEAOF").mapTo[Status].map(_.toString)
def bgrewriteaof(): Future[String] = send(Bgrewriteaof)

def bgsave(): Future[String] =
send("BGSAVE").mapTo[Status].map(_.toString)
def bgsave(): Future[String] = send(Bgsave)

def clientKill(ip: String, port: Int): Future[Boolean] =
send("CLIENT", Seq(ByteString("KILL"), ByteString(ip + ':' + port.toString))).mapTo[Status].map(_.toBoolean)
send(ClientKill(ip, port))

def clientList(): Future[String] =
send("CLIENT", Seq(ByteString("LIST"))).mapTo[Bulk].map(_.toString)
def clientList(): Future[Seq[Map[String, String]]] =
send(ClientList)

def clientGetname(): Future[Option[String]] =
send("CLIENT", Seq(ByteString("GETNAME"))).mapTo[Bulk].map(_.toOptString)
send(ClientGetname)

def clientSetname(connectionName: String): Future[Boolean] =
send("CLIENT", Seq(ByteString("SETNAME"), ByteString(connectionName))).mapTo[Status].map(_.toBoolean)
send(ClientSetname(connectionName))

def configGet(parameter: String): Future[Option[String]] =
send("CONFIG GET", Seq(ByteString(parameter))).mapTo[Bulk].map(_.toOptString)
def configGet(parameter: String): Future[Map[String, String]] =
send(ConfigGet(parameter))

def configSet(parameter: String, value: String): Future[Boolean] =
send("CONFIG SET", Seq(ByteString(parameter), ByteString(value))).mapTo[Status].map(_.toBoolean)
send(ConfigSet(parameter, value))

def configResetstat(parameter: String, value: String): Future[Boolean] =
send("CONFIG RESETSTAT").mapTo[Status].map(_.toBoolean)
def configResetstat(): Future[Boolean] =
send(ConfigResetstat)

def dbsize(): Future[Long] =
send("DBSIZE").mapTo[Integer].map(_.toLong)
send(Dbsize)

def debugObject(key: String): Future[ByteString] =
send("DEBUG OBJECT", Seq(ByteString(key))).mapTo[Status].map(_.toByteString)
def debugObject(key: String): Future[String] =
send(DebugObject(key))

def debugSegfault(): Future[ByteString] =
send("DEBUG SEGFAULT").mapTo[Status].map(_.toByteString)
def debugSegfault(): Future[String] =
send(DebugSegfault)

def flushall(): Future[Boolean] =
send("FLUSHALL").mapTo[Status].map(_.toBoolean)
send(Flushall)

def flushdb(): Future[Boolean] =
send("FLUSHDB").mapTo[Status].map(_.toBoolean)
send(Flushdb)

def info(): Future[String] =
send("INFO").mapTo[Bulk].map(_.toString)
send(Info())

def info(section: String): Future[String] =
send("INFO", Seq(ByteString(section))).mapTo[Bulk].map(_.toString)
send(Info(Some(section)))

def lastsave(): Future[Long] =
send("LASTSAVE").mapTo[Integer].map(_.toLong)
def monitor(): Future[Long] = ??? // TODO blocking!
send(Lastsave)

def save(): Future[Boolean] =
send("SAVE").mapTo[Status].map(_.toBoolean)
send(Save)

def shutdown(): Future[Boolean] =
send("SHUTDOWN").mapTo[Status].map(_.toBoolean)
send(Shutdown())

// timeout on success LOL
def shutdown(modifier: ShutdownModifier): Future[Boolean] =
send("SHUTDOWN", Seq(ByteString(modifier.toString))).mapTo[Status].map(_.toBoolean)
def slaveof(host: String, port: Int): Future[String] =
send("SLAVEOF", Seq(ByteString(host), ByteString(port.toString))).mapTo[Status].map(_.toString)
def slowlog(subcommand: String, argument: String): Future[String] =
send("SLOWLOG", Seq(ByteString(subcommand), ByteString(argument))).mapTo[Status].map(_.toString)
def sync(): Future[RedisReply] =
send("SYNC").mapTo[RedisReply]
def time()(implicit convert: MultiBulkConverter[Seq[ByteString]]): Future[Try[Seq[ByteString]]] =
send("TIME").mapTo[MultiBulk].map(_.asTry[Seq[ByteString]])
*/
}
send(Shutdown(Some(modifier)))

sealed trait ShutdownModifier
def slaveof(host: String, port: Int): Future[Boolean] =
send(Slaveof(host, port))

case object NOSAVE extends ShutdownModifier
def slaveofNoOne(): Future[Boolean] = send(SlaveofNoOne)

case object SAVE extends ShutdownModifier
def time(): Future[(Long, Long)] =
send(Time)
}
Loading

0 comments on commit afeb4ea

Please # to comment.