Skip to content

Discussion: data/stream/rpc reliability #702

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions Sources/LiveKit/Core/Room+SendingPolicy.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

extension Room {
func isParticipantConnected(_ identity: Participant.Identity) -> Bool {
allParticipants.contains { $0.key == identity }
}

func isParticipantActive(_ identity: Participant.Identity) -> Bool {
allParticipants.contains { $0.key == identity && $0.value.state == .active }
}

func waitUntilActive(_ identity: Participant.Identity) async throws {
try await activeParticipantCompleters.completer(for: identity.stringValue).wait()
}

func validate(_ identity: Participant.Identity, against policy: SendingPolicy?) async throws {
let isActive = isParticipantActive(identity)
let policy = policy ?? _state.roomOptions.defaultSendingPolicy
switch (isActive, policy) {
case (true, _):
break
case (false, .disabled):
log("Sending to inactive Participant: \(identity.stringValue)", .warning)
case (false, .throwIfInactive):
throw LiveKitError(.participantInactive, message: "Participant inactive: \(identity.stringValue)")
case (false, .waitUntilActive):
guard isParticipantConnected(identity) else {
throw LiveKitError(.participantRemoved, message: "Participant removed: \(identity.stringValue)")
}
fallthrough
case (false, .waitUntilConnectedAndActive):
log("Waiting for inactive Participant: \(identity.stringValue)", .info)
try await waitUntilActive(identity)
log("Participant active: \(identity.stringValue)", .info)
try Task.checkCancellation()
}
}
}
2 changes: 2 additions & 0 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
let primaryTransportConnectedCompleter = AsyncCompleter<Void>(label: "Primary transport connect", defaultTimeout: .defaultTransportState)
let publisherTransportConnectedCompleter = AsyncCompleter<Void>(label: "Publisher transport connect", defaultTimeout: .defaultTransportState)

let activeParticipantCompleters = CompleterMapActor<Void>(label: "Participant active", defaultTimeout: .defaultParticipantActiveTimeout)

let signalClient = SignalClient()

// MARK: - DataChannels
Expand Down
1 change: 1 addition & 0 deletions Sources/LiveKit/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum LiveKitErrorType: Int, Sendable {
case stateMismatch = 504
case joinFailure = 505
case insufficientPermissions = 506
case participantInactive = 507

//
case serverPingTimedOut = 601
Expand Down
2 changes: 2 additions & 0 deletions Sources/LiveKit/Extensions/TimeInterval.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public extension TimeInterval {
static let defaultPublish: Self = 10
static let defaultCaptureStart: Self = 10

static let defaultParticipantActiveTimeout: Self = 10

/// Computes a retry delay based on an "easeOutCirc" curve between baseDelay and maxDelay.
///
/// The easeOutCirc curve provides a dramatic early acceleration followed by a gentler approach to the maximum,
Expand Down
3 changes: 3 additions & 0 deletions Sources/LiveKit/Participant/LocalParticipant+RPC.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ public extension LocalParticipant {
/// - Returns: The response payload
/// - Throws: RpcError on failure. Details in RpcError.message
func performRpc(destinationIdentity: Identity,
sendingPolicy: SendingPolicy? = nil,
method: String,
payload: String,
responseTimeout: TimeInterval = 10) async throws -> String
{
let room = try requireRoom()

try await room.validate(destinationIdentity, against: sendingPolicy)

guard payload.byteLength <= MAX_RPC_PAYLOAD_BYTES else {
throw RpcError.builtIn(.requestPayloadTooLarge)
}
Expand Down
11 changes: 11 additions & 0 deletions Sources/LiveKit/Participant/Participant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga
room.delegates.notify(label: { "room.didUpdate state: \(newState.state)" }) {
$0.room?(room, participant: self, didUpdateState: newState.state)
}

guard let identity = identity?.stringValue else { return }
if oldState.state != .active, newState.state == .active {
Task {
await room.activeParticipantCompleters.resume(returning: (), for: identity)
}
} else if oldState.state == .active, newState.state != .active {
Task {
await room.activeParticipantCompleters.resume(throwing: LiveKitError(.participantRemoved, message: "Participant removed \(identity)"), for: identity)
}
}
}

// connection quality updated
Expand Down
8 changes: 8 additions & 0 deletions Sources/LiveKit/Types/Options/RoomOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public final class RoomOptions: NSObject, Sendable {
@objc
public let defaultDataPublishOptions: DataPublishOptions

@objc
public let defaultSendingPolicy: SendingPolicy

/// AdaptiveStream lets LiveKit automatically manage quality of subscribed
/// video tracks to optimize for bandwidth and CPU.
/// When attached video elements are visible, it'll choose an appropriate
Expand Down Expand Up @@ -76,6 +79,7 @@ public final class RoomOptions: NSObject, Sendable {
defaultVideoPublishOptions = VideoPublishOptions()
defaultAudioPublishOptions = AudioPublishOptions()
defaultDataPublishOptions = DataPublishOptions()
defaultSendingPolicy = .disabled
adaptiveStream = false
dynacast = false
stopLocalTrackOnUnpublish = true
Expand All @@ -91,6 +95,7 @@ public final class RoomOptions: NSObject, Sendable {
defaultVideoPublishOptions: VideoPublishOptions = VideoPublishOptions(),
defaultAudioPublishOptions: AudioPublishOptions = AudioPublishOptions(),
defaultDataPublishOptions: DataPublishOptions = DataPublishOptions(),
defaultSendingPolicy: SendingPolicy = .disabled,
adaptiveStream: Bool = false,
dynacast: Bool = false,
stopLocalTrackOnUnpublish: Bool = true,
Expand All @@ -104,6 +109,7 @@ public final class RoomOptions: NSObject, Sendable {
self.defaultVideoPublishOptions = defaultVideoPublishOptions
self.defaultAudioPublishOptions = defaultAudioPublishOptions
self.defaultDataPublishOptions = defaultDataPublishOptions
self.defaultSendingPolicy = defaultSendingPolicy
self.adaptiveStream = adaptiveStream
self.dynacast = dynacast
self.stopLocalTrackOnUnpublish = stopLocalTrackOnUnpublish
Expand All @@ -122,6 +128,7 @@ public final class RoomOptions: NSObject, Sendable {
defaultVideoPublishOptions == other.defaultVideoPublishOptions &&
defaultAudioPublishOptions == other.defaultAudioPublishOptions &&
defaultDataPublishOptions == other.defaultDataPublishOptions &&
defaultSendingPolicy == other.defaultSendingPolicy &&
adaptiveStream == other.adaptiveStream &&
dynacast == other.dynacast &&
stopLocalTrackOnUnpublish == other.stopLocalTrackOnUnpublish &&
Expand All @@ -138,6 +145,7 @@ public final class RoomOptions: NSObject, Sendable {
hasher.combine(defaultVideoPublishOptions)
hasher.combine(defaultAudioPublishOptions)
hasher.combine(defaultDataPublishOptions)
hasher.combine(defaultSendingPolicy)
hasher.combine(adaptiveStream)
hasher.combine(dynacast)
hasher.combine(stopLocalTrackOnUnpublish)
Expand Down
25 changes: 25 additions & 0 deletions Sources/LiveKit/Types/Options/SendingPolicy.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

@objc
public enum SendingPolicy: Int, Sendable {
case disabled
case throwIfInactive
case waitUntilActive // this should be the default in most cases?
case waitUntilConnectedAndActive // this does not make sense without a long timeout?
}
Loading