Skip to content

Commit

Permalink
Merge pull request #18 from ryanlecompte/master
Browse files Browse the repository at this point in the history
Store updated channels/patterns in subscription actor
  • Loading branch information
etaty committed Oct 26, 2013
2 parents 631c37f + ccf6f08 commit 5eab94b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
19 changes: 7 additions & 12 deletions src/main/scala/redis/actors/RedisSubscriberActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import akka.util.ByteString
import redis.protocol.{MultiBulk, RedisReply}
import redis.api.pubsub._
import java.net.InetSocketAddress
import scala.collection.mutable
import redis.api.connection.Auth

class RedisSubscriberActorWithCallback(
Expand Down Expand Up @@ -37,27 +36,23 @@ abstract class RedisSubscriberActor(
/**
* Keep states of channels and actor in case of connection reset
*/
val channelsSubscribed = mutable.MutableList(channels: _*)
val patternsSubscribed = mutable.MutableList(patterns: _*)
var channelsSubscribed = channels.toSet
var patternsSubscribed = patterns.toSet

override def preStart() {
super.preStart()
write(SUBSCRIBE(channelsSubscribed: _*).toByteString)
write(PSUBSCRIBE(patternsSubscribed: _*).toByteString)
write(SUBSCRIBE(channelsSubscribed.toSeq: _*).toByteString)
write(PSUBSCRIBE(patternsSubscribed.toSeq: _*).toByteString)
}

def writing: Receive = {
case message: SubscribeMessage => {
write(message.toByteString)
message match {
case s: SUBSCRIBE => channelsSubscribed ++= s.channel
case u: UNSUBSCRIBE => channelsSubscribed.filter(c => {
u.channel.exists(_ == c)
})
case u: UNSUBSCRIBE => channelsSubscribed --= u.channel
case ps: PSUBSCRIBE => patternsSubscribed ++= ps.pattern
case pu: PUNSUBSCRIBE => patternsSubscribed.filter(c => {
pu.pattern.exists(_ == c)
})
case pu: PUNSUBSCRIBE => patternsSubscribed --= pu.pattern
}
}
}
Expand Down Expand Up @@ -101,4 +96,4 @@ abstract class RedisSubscriberActor(
def onDataReceivedOnClosingConnection(dataByteString: ByteString): Unit = decodeReplies(dataByteString)

def onClosingConnectionClosed(): Unit = {}
}
}
24 changes: 23 additions & 1 deletion src/test/scala/redis/RedisPubSubSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import redis.actors.RedisSubscriberActor
import java.net.InetSocketAddress
import akka.actor.{Props, ActorRef}
import akka.testkit.{TestActorRef, TestProbe}
import akka.util.ByteString

class RedisPubSubSpec extends RedisSpec {

Expand Down Expand Up @@ -72,6 +71,17 @@ class RedisPubSubSpec extends RedisSpec {
})
probeMock.expectMsgType[Message](5 seconds) mustEqual Message("channel2", "value")

subscriberActor.underlyingActor.unsubscribe("channel2")
system.scheduler.scheduleOnce(1 second)({
redis.publish("channel2", "value")
})
probeMock.expectNoMsg(3 seconds)

subscriberActor.underlyingActor.subscribe("channel2")
system.scheduler.scheduleOnce(1 second)({
redis.publish("channel2", "value")
})
probeMock.expectMsgType[Message](5 seconds) mustEqual Message("channel2", "value")

subscriberActor.underlyingActor.psubscribe("pattern2.*")
subscriberActor.underlyingActor.punsubscribe("pattern.*")
Expand All @@ -81,6 +91,18 @@ class RedisPubSubSpec extends RedisSpec {
redis.publish("pattern.*", "value")
})
probeMock.expectMsgType[PMessage](5 seconds) mustEqual PMessage("pattern2.*", "pattern2.match", "value")

subscriberActor.underlyingActor.punsubscribe("pattern2.*")
system.scheduler.scheduleOnce(2 seconds)({
redis.publish("pattern2.match", "value")
})
probeMock.expectNoMsg(3 seconds)

subscriberActor.underlyingActor.psubscribe("pattern.*")
system.scheduler.scheduleOnce(2 seconds)({
redis.publish("pattern.*", "value")
})
probeMock.expectMsgType[PMessage](5 seconds) mustEqual PMessage("pattern.*", "pattern.*", "value")
}
}

Expand Down

0 comments on commit 5eab94b

Please # to comment.