Skip to content

Advanced WebSocket

voyz edited this page Apr 29, 2024 · 4 revisions

Advanced Lifecycle Management

IBKR WebSocket API may behave unexpectedly in the future if the previous connections are not closed gracefully. We need to ensure to always call IbkrWsClient.shutdown method when the program terminates.

To do so, use signal module to shutdown when the program is terminated.

import signal

# assuming we subscribe to Orders channel
ws_client.subscribe(channel='or', data=None, needs_confirmation=False)

# this is a callback used when the program terminates
def stop(_, _1):
    # we unsubscribe from the Orders channel
    ws_client.unsubscribe(channel='or', data=None, needs_confirmation=False)

    # we gracefully close the connection
    ws_client.shutdown()

# register the `stop` callback to be called when the program terminates
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

Advanced Subscribing and Unsubscribing

While most IBKR WebSocket API channels follow the payload structure described in the IbkrWsClient - Subscribing and Unsubscribing section, there are some exceptions that need to be handled on a case-by-case basis.

To facilitate this, the subscribe and unsubscribe methods accept an instance of SubscriptionProcessor as an optional argument.

It is an interface allowing the WsClient to translate our channel and data arguments into a payload string. Recall that these arguments are passed to the subscribe and unsubscribe methods.

class SubscriptionProcessor(ABC):
    def make_subscribe_payload(self, channel: str, data: dict = None) -> str:
        raise NotImplementedError()

    def make_unsubscribe_payload(self, channel: str, data: dict = None) -> str:
        raise NotImplementedError()

IbkrWsClient utilises the IbkrSubscriptionProcessor, which for example adds the s and u prefixes depending on whether we subscribe or unsubscribe.

class IbkrSubscriptionProcessor(SubscriptionProcessor):
    def make_subscribe_payload(self, channel: str, data: dict = None) -> str:
        payload = f"s{channel}"

        if data is not None or data == {}:
            payload += f"+{json.dumps(data)}"

        return payload

    def make_unsubscribe_payload(self, channel: str, data: dict = None) -> str:
        data = {} if data is None else data
        return f'u{channel}+{json.dumps(data)}'

We can specify a custom SubscriptionProcessor when we call the subscribe or unsubscribe methods in order to override using the default IbkrSubscriptionProcessor. There are some channels that require this to work, and the next section is going to go over one such use case.

Historical Market Data Channel

If we see the Historical Market Data WebSocket documentation, we can see that the payload required for subscribing and unsubscribing differs substantially.

Subscribing:

smh+conid+{"exchange":"exchange", "period":"period", "bar":"bar", "outsideRth":outsideRth, "source":"source", "format":"format"}

Unsubscribing:

umh+{serverId}

Additionally:

NOTE: Only a max of 5 concurrent historical data request available at a time.

NOTE: Historical data will only respond once, though customers will still need to unsubscribe from the endpoint.

There are several key challenges with this channel:

  • The parameters change: Note that the second parameter passed in the payloads changes from conid to serverId. This is unlike majority of the other endpoints, where unsubscribing either also requires the same conid parameter as subscribing, or requires no parameters at all.
  • Acquiring serverId: What's more, the serverId parameter expects an ID of the IBKR server that currently handles our subscription on IBKR side. This is an information that will be known to us only after subscribing and receiving first valid messages through the WebSocket channel, as the serverId field is attached to most Historical Market Data messages.
  • Connection limits: To make it even more complicated, we may only have up to five simultaneous Historical Market Data WebSocket servers connected to us, and these stay assigned until we explicitly unsubscribe from them. Hence, it's obligatory to build a reliable unsubscribing logic before we even start testing this channel.

The solution will involve:

  1. Recognising and storing the serverId data.
  2. Building a custom SubscriptionProcessor that adds the serverId to the payload instead of the conid.
  3. Adding a function that will loop over the existing serverId data stored and attempt unsubscribing.

Let's tackle it step by step.

1. Storing serverId

Since this is a known challenge, the IbkrWsClient handles the first step for us automatically.

All Historical Market Data channel messages are being parsed for the serverId field and stored internally along with the conid that the sever is sending data for.

We may access the list of currently stored serverId/conid pairs for a particular channel by calling the server_ids method, passing the appropriate IbkrWsKey of the channel as the argument.

ws_client.server_ids(IbkrWsKey.MARKET_HISTORY)

2. Custom SubscriptionProcessor

Tackling the second step requires us to write a custom SubscriptionProcessor, which will inherit making the subscription payload logic from the IbkrSubscriptionProcessor, but override it for making the unsubscription payload logic:

class MhSubscriptionProcessor(IbkrSubscriptionProcessor):
    def make_unsubscribe_payload(self, channel: str, server_id: dict = None) -> str:
        return f'umh+{server_id}'

3. Full unsubscribe logic

Finally, we can write the function that will unsubscribe from the Historical Market Data using our custom MhSubscriptionProcessor:

subscription_processor = MhSubscriptionProcessor()

def unsubscribe_market_history():
    # get serverId/conid pairs
    server_id_conid_pairs = ws_client.server_ids(IbkrWsKey.MARKET_HISTORY)

    # loop over
    for server_id, conid in server_id_conid_pairs.items():

        # unsubscribe using the custom SubscriptionProcessor
        confirmed = ws_client.unsubscribe(
            channel='mh', 
            server_id=server_id, 
            needs_confirmation=False, 
            subscription_processor=subscription_processor
        )

Following the advice from the Advanced Lifecycle Management section, we should ensure that this unsubscribe_market_history function is called every time our program terminates:

import signal

def stop(_, _1):
    unsubscribe_market_history()
    ws_client.shutdown()

signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

Only with this code implemented, it may be reasonable to start testing the Historical Market Data channel.