Skip to content

Advanced WebSocket

voyz edited this page Apr 29, 2024 · 4 revisions

Advanced WebSocket Functionalities

Advanced Lifecycle Management

IBKR WebSocket API may behave unexpectedly in the future if the previous connections are not closed gracefully. We need to ensure to always call IbkrWsClient.shutdown method when the program stops.

To do so, use signal module to shutdown when the program is terminated.

import signal

# assuming we subscribe to Orders channel
ws_client.subscribe(channel='or', data=None, needs_confirmation=False)

# this is a callback used when the program stops
def stop(_, _1):
    # we unsubscribe from the Orders channel
    ws_client.unsubscribe(channel='or', data=None, needs_confirmation=False)

    # we gracefully close the connection
    ws_client.shutdown()

# register the `stop` callback to be called when the program stops
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

Advanced Subscribing and Unsubscribing

While most IBKR WebSocket API channels follow the payload structure described in the IbkrWsClient - Subscribing and Unsubscribing section, there are some exceptions that need to be handled on a case-by-case basis.

To facilitate this, the subscribe and unsubscribe methods accept the subscription_processor argument of type SubscriptionProcessor.

The SubscriptionProcessor is an interface allowing the WsClient to translate our channel and data arguments into a payload string. Recall that these arguments are passed to the subscribe and unsubscribe methods.

class SubscriptionProcessor(ABC):
    def make_subscribe_payload(self, channel: str, data: dict = None) -> str:
        raise NotImplementedError()

    def make_unsubscribe_payload(self, channel: str, data: dict = None) -> str:
        raise NotImplementedError()

IbkrWsClient utilises the IbkrSubscriptionProcessor concrete implementation, which is responsible for preparing the payload string, for example by adding s and u prefixes depending on whether we subscribe or unsubscribe.

class IbkrSubscriptionProcessor(SubscriptionProcessor):
    def make_subscribe_payload(self, channel: str, data: dict = None) -> str:
        payload = f"s{channel}"

        if data is not None or data == {}:
            payload += f"+{json.dumps(data)}"

        return payload

    def make_unsubscribe_payload(self, channel: str, data: dict = None) -> str:
        data = {} if data is None else data
        return f'u{channel}+{json.dumps(data)}'