Skip to content

Commit

Permalink
Implement Unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
muzzammilshahid committed Oct 23, 2024
1 parent c125a22 commit e4ab1ea
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
23 changes: 22 additions & 1 deletion src/main/kotlin/io/xconn/xconn/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import io.xconn.wampproto.messages.Subscribe
import io.xconn.wampproto.messages.Subscribed
import io.xconn.wampproto.messages.Unregister
import io.xconn.wampproto.messages.Unregistered
import io.xconn.wampproto.messages.Unsubscribe
import io.xconn.wampproto.messages.Unsubscribed
import io.xconn.wampproto.messages.Yield
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -41,6 +43,7 @@ class Session(private val baseSession: BaseSession) {
private val publishRequests: MutableMap<Long, CompletableDeferred<Unit>> = mutableMapOf()
private val subscribeRequests: MutableMap<Long, SubscribeRequest> = mutableMapOf()
private val subscriptions: MutableMap<Long, (Event) -> Unit> = mutableMapOf()
private val unsubscribeRequests: MutableMap<Long, UnsubscribeRequest> = mutableMapOf()

private val goodbyeRequest: CompletableDeferred<Unit> = CompletableDeferred()

Expand Down Expand Up @@ -135,6 +138,13 @@ class Session(private val baseSession: BaseSession) {
endpoint(Event(message.args, message.kwargs, message.details))
}
}
is Unsubscribed -> {
val request = unsubscribeRequests.remove(message.requestID)
if (request != null) {
subscriptions.remove(request.subscriptionID)
request.completable.complete(Unit)
}
}
is Goodbye -> {
goodbyeRequest.complete(Unit)
}
Expand Down Expand Up @@ -183,7 +193,7 @@ class Session(private val baseSession: BaseSession) {
suspend fun register(
procedure: String,
endpoint: (Invocation) -> Result,
options: Map<String, Any>? = null,
options: Map<String, Any>? = emptyMap(),
): CompletableDeferred<Registration> {
val register = Register(nextID, procedure, options)

Expand Down Expand Up @@ -241,4 +251,15 @@ class Session(private val baseSession: BaseSession) {

return completable
}

suspend fun unsubscribe(sub: Subscription): CompletableDeferred<Unit> {
val unsubscribe = Unsubscribe(nextID, sub.subscriptionID)

val completable = CompletableDeferred<Unit>()
unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completable, sub.subscriptionID)

baseSession.send(wampSession.sendMessage(unsubscribe))

return completable
}
}
5 changes: 5 additions & 0 deletions src/main/kotlin/io/xconn/xconn/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ data class Event(
val kwargs: Map<String, Any>? = emptyMap(),
val details: Map<String, Any> = emptyMap(),
)

data class UnsubscribeRequest(
val completable: CompletableDeferred<Unit>,
val subscriptionID: Long,
)

0 comments on commit e4ab1ea

Please # to comment.