Skip to content

Commit

Permalink
Merge pull request #3 from muzzammilshahid/implement-pubsub
Browse files Browse the repository at this point in the history
Implement pubsub
  • Loading branch information
muzzammilshahid authored Oct 23, 2024
2 parents 6782de9 + e4ab1ea commit d7b5ccd
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
86 changes: 85 additions & 1 deletion src/main/kotlin/io/xconn/xconn/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ import io.xconn.wampproto.messages.Call
import io.xconn.wampproto.messages.Error
import io.xconn.wampproto.messages.Goodbye
import io.xconn.wampproto.messages.Message
import io.xconn.wampproto.messages.Publish
import io.xconn.wampproto.messages.Published
import io.xconn.wampproto.messages.Register
import io.xconn.wampproto.messages.Registered
import io.xconn.wampproto.messages.Subscribe
import io.xconn.wampproto.messages.Subscribed
import io.xconn.wampproto.messages.Unregister
import io.xconn.wampproto.messages.Unregistered
import io.xconn.wampproto.messages.Unsubscribe
import io.xconn.wampproto.messages.Unsubscribed
import io.xconn.wampproto.messages.Yield
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand All @@ -20,6 +26,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import kotlin.coroutines.cancellation.CancellationException
import io.xconn.wampproto.messages.Event as EventMsg
import io.xconn.wampproto.messages.Invocation as InvocationMsg
import io.xconn.wampproto.messages.Result as ResultMsg

Expand All @@ -32,6 +39,12 @@ class Session(private val baseSession: BaseSession) {
private val registerRequests: MutableMap<Long, RegisterRequest> = mutableMapOf()
private val registrations: MutableMap<Long, (Invocation) -> Result> = mutableMapOf()
private val unregisterRequests: MutableMap<Long, UnregisterRequest> = mutableMapOf()

private val publishRequests: MutableMap<Long, CompletableDeferred<Unit>> = mutableMapOf()
private val subscribeRequests: MutableMap<Long, SubscribeRequest> = mutableMapOf()
private val subscriptions: MutableMap<Long, (Event) -> Unit> = mutableMapOf()
private val unsubscribeRequests: MutableMap<Long, UnsubscribeRequest> = mutableMapOf()

private val goodbyeRequest: CompletableDeferred<Unit> = CompletableDeferred()

init {
Expand Down Expand Up @@ -108,6 +121,30 @@ class Session(private val baseSession: BaseSession) {
request.completable.complete(Unit)
}
}
is Published -> {
val request = publishRequests.remove(message.requestID)
request?.complete(Unit)
}
is Subscribed -> {
val request = subscribeRequests.remove(message.requestID)
if (request != null) {
subscriptions[message.subscriptionID] = request.endpoint
request.completable.complete(Subscription(message.subscriptionID))
}
}
is EventMsg -> {
val endpoint = subscriptions[message.subscriptionID]
if (endpoint != null) {
endpoint(Event(message.args, message.kwargs, message.details))
}
}
is Unsubscribed -> {
val request = unsubscribeRequests.remove(message.requestID)
if (request != null) {
subscriptions.remove(request.subscriptionID)
request.completable.complete(Unit)
}
}
is Goodbye -> {
goodbyeRequest.complete(Unit)
}
Expand Down Expand Up @@ -156,7 +193,7 @@ class Session(private val baseSession: BaseSession) {
suspend fun register(
procedure: String,
endpoint: (Invocation) -> Result,
options: Map<String, Any>? = null,
options: Map<String, Any>? = emptyMap(),
): CompletableDeferred<Registration> {
val register = Register(nextID, procedure, options)

Expand All @@ -178,4 +215,51 @@ class Session(private val baseSession: BaseSession) {

return completable
}

suspend fun publish(
topic: String,
args: List<Any>? = null,
kwargs: Map<String, Any>? = null,
options: Map<String, Any> = emptyMap(),
): CompletableDeferred<Unit>? {
val publish = Publish(nextID, topic, args, kwargs, options)

baseSession.send(wampSession.sendMessage(publish))

val ack = options["acknowledge"] as? Boolean ?: false
if (ack) {
val completer = CompletableDeferred<Unit>()
publishRequests[publish.requestID] = completer

return completer
}

return null
}

suspend fun subscribe(
topic: String,
endpoint: (Event) -> Unit,
options: Map<String, Any>? = emptyMap(),
): CompletableDeferred<Subscription> {
val subscribe = Subscribe(nextID, topic, options)

val completable = CompletableDeferred<Subscription>()
subscribeRequests[subscribe.requestID] = SubscribeRequest(completable, endpoint)

baseSession.send(wampSession.sendMessage(subscribe))

return completable
}

suspend fun unsubscribe(sub: Subscription): CompletableDeferred<Unit> {
val unsubscribe = Unsubscribe(nextID, sub.subscriptionID)

val completable = CompletableDeferred<Unit>()
unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completable, sub.subscriptionID)

baseSession.send(wampSession.sendMessage(unsubscribe))

return completable
}
}
18 changes: 18 additions & 0 deletions src/main/kotlin/io/xconn/xconn/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,21 @@ data class UnregisterRequest(
val completable: CompletableDeferred<Unit>,
val registrationID: Long,
)

data class Subscription(val subscriptionID: Long)

data class SubscribeRequest(
val completable: CompletableDeferred<Subscription>,
val endpoint: (Event) -> Unit,
)

data class Event(
val args: List<Any>? = emptyList(),
val kwargs: Map<String, Any>? = emptyMap(),
val details: Map<String, Any> = emptyMap(),
)

data class UnsubscribeRequest(
val completable: CompletableDeferred<Unit>,
val subscriptionID: Long,
)

0 comments on commit d7b5ccd

Please # to comment.