-
Notifications
You must be signed in to change notification settings - Fork 142
/
Copy pathRedisProtocolReply.scala
188 lines (148 loc) · 5.36 KB
/
RedisProtocolReply.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package redis.protocol
import akka.util.ByteString
import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.Try
import redis.MultiBulkConverter
sealed trait RedisReply {
def toByteString: ByteString
def asOptByteString: Option[ByteString]
}
case class Status(status: ByteString) extends RedisReply {
def toBoolean: Boolean = status == Status.okByteString
override def toString = status.utf8String
def toByteString: ByteString = status
def asOptByteString: Option[ByteString] = Some(status)
}
object Status {
val okByteString = ByteString("OK")
}
case class Error(error: ByteString) extends RedisReply {
override def toString = error.utf8String
def toByteString: ByteString = error
def asOptByteString: Option[ByteString] = Some(error)
}
case class Integer(i: ByteString) extends RedisReply {
def toLong: Long = java.lang.Long.parseLong(i.utf8String)
def toInt: Int = ParseNumber.parseInt(i)
def toBoolean = i == Integer.trueByteString
override def toString = i.utf8String
def toByteString: ByteString = i
def asOptByteString: Option[ByteString] = Some(i)
}
object Integer {
val trueByteString = ByteString("1")
}
case class Bulk(response: Option[ByteString]) extends RedisReply {
// looks wrong
override def toString = response.map(_.utf8String).get
def toByteString: ByteString = response.get
def toOptString: Option[String] = response.map(_.utf8String)
def asOptByteString: Option[ByteString] = response
}
case class MultiBulk(responses: Option[Seq[RedisReply]]) extends RedisReply {
def toByteString: ByteString = throw new NoSuchElementException()
def asOptByteString: Option[ByteString] = throw new NoSuchElementException()
def asTry[A](implicit convert: MultiBulkConverter[A]): Try[A] = convert.to(this)
def asOpt[A](implicit convert: MultiBulkConverter[A]): Option[A] = asTry(convert).toOption
}
object RedisProtocolReply {
val ERROR = '-'
val STATUS = '+'
val INTEGER = ':'
val BULK = '$'
val MULTIBULK = '*'
val LS = "\r\n".getBytes("UTF-8")
def decodeReply(bs: ByteString): Option[(RedisReply, ByteString)] = {
if (bs.isEmpty) {
None
} else {
bs.head match {
case ERROR => decodeString(bs.tail).map(r => (Error(r._1), r._2))
case INTEGER => decodeInteger(bs.tail)
case STATUS => decodeString(bs.tail).map(r => (Status(r._1), r._2))
case BULK => decodeBulk(bs.tail)
case MULTIBULK => decodeMultiBulk(bs.tail)
case _ => throw new Exception("Redis Protocol error: Got " + bs.head + " as initial reply byte")
}
}
}
val decodeReplyPF: PartialFunction[ByteString, Option[(RedisReply, ByteString)]] = {
case bs if bs.head == INTEGER => decodeInteger(bs.tail)
case bs if bs.head == STATUS => decodeString(bs.tail).map(r => (Status(r._1), r._2))
case bs if bs.head == BULK => decodeBulk(bs.tail)
case bs if bs.head == MULTIBULK => decodeMultiBulk(bs.tail)
}
val decodeReplyStatus: PartialFunction[ByteString, Option[(Status, ByteString)]] = {
case bs if bs.head == STATUS => decodeString(bs.tail).map(r => (Status(r._1), r._2))
}
val decodeReplyInteger: PartialFunction[ByteString, Option[(Integer, ByteString)]] = {
case bs if bs.head == INTEGER => decodeInteger(bs.tail)
}
val decodeReplyBulk: PartialFunction[ByteString, Option[(Bulk, ByteString)]] = {
case bs if bs.head == BULK => decodeBulk(bs.tail)
}
val decodeReplyMultiBulk: PartialFunction[ByteString, Option[(MultiBulk, ByteString)]] = {
case bs if bs.head == MULTIBULK => decodeMultiBulk(bs.tail)
}
val decodeReplyError: PartialFunction[ByteString, Option[(Error, ByteString)]] = {
case bs if bs.head == ERROR => decodeString(bs.tail).map(r => (Error(r._1), r._2))
}
def decodeInteger(bs: ByteString): Option[(Integer, ByteString)] = {
decodeString(bs).map(r => {
val i = Integer(r._1)
(i, r._2)
})
}
def decodeString(bs: ByteString): Option[(ByteString, ByteString)] = {
val index = bs.indexOf('\n')
if (index >= 0 && bs.length >= index + 1) {
val reply = bs.take(index + 1 - LS.length)
val tail = bs.drop(index + 1)
Some(reply -> tail)
} else {
None
}
}
def decodeBulk(bs: ByteString): Option[(Bulk, ByteString)] = {
decodeInteger(bs).flatMap(r => {
val i = r._1.toInt
val tail = r._2
if (i < 0) {
Some(Bulk(None) -> tail)
} else if (tail.length < (i + LS.length)) {
None
} else {
val data = tail.take(i)
Some(Bulk(Some(data)) -> tail.drop(i).drop(LS.length))
}
})
}
def decodeMultiBulk(bs: ByteString): Option[(MultiBulk, ByteString)] = {
decodeInteger(bs).flatMap(r => {
val i = r._1.toInt
val tail = r._2
if (i < 0) {
Some(MultiBulk(None) -> tail)
} else if (i == 0) {
Some(MultiBulk(Some(Nil)) -> tail)
} else {
@tailrec
def bulks(bs: ByteString, i: Int, acc: mutable.Buffer[RedisReply]): Option[(MultiBulk, ByteString)] = {
if (i > 0) {
val reply = decodeReply(bs)
if (reply.nonEmpty) {
acc.append(reply.get._1)
bulks(reply.get._2, i - 1, acc)
} else {
None
}
} else {
Some(MultiBulk(Some(acc.toSeq)) -> bs)
}
}
bulks(tail, i, mutable.Buffer())
}
})
}
}