Skip to content

Added example for async websocket and rest calls #589

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 4 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import asyncio
import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union
from polygon import RESTClient, WebSocketClient
from polygon.websocket.models import Market, Feed


class ApiCallHandler:
def __init__(self):
self.api_call_queue = asyncio.Queue()
self.executor = ThreadPoolExecutor() # Thread pool for running synchronous code

async def enqueue_api_call(self, options_ticker):
await self.api_call_queue.put(options_ticker)

async def start_processing_api_calls(self):
while True:
options_ticker = await self.api_call_queue.get()
try:
# TODO:
# Here, you can process the rest api requets as needed
# Example: Get the options contract for this
contract = await asyncio.get_running_loop().run_in_executor(
self.executor, self.get_options_contract, options_ticker
)
print(contract) # Or process the contract data as needed
except Exception as e:
logging.error(f"Error processing API call for {options_ticker}: {e}")
finally:
self.api_call_queue.task_done()

def get_options_contract(self, options_ticker):
client = RESTClient() # Assumes POLYGON_API_KEY is set in the environment
return client.get_options_contract(options_ticker)


class MessageHandler:
def __init__(self, api_call_handler):
self.handler_queue = asyncio.Queue()
self.api_call_handler = api_call_handler

async def add(self, message_response: Optional[Union[str, bytes]]) -> None:
await self.handler_queue.put(message_response)

async def start_handling(self) -> None:
while True:
message_response = await self.handler_queue.get()
logging.info(f"Received message: {message_response}")
try:
# TODO:
# Here, you can process the websocket messages as needed
# Example: Extract ticker symbol and enqueue REST API call
# to get the options contract for this trade (non-blocking)
for trade in message_response:
ticker = self.extract_symbol(trade.symbol)
if ticker == "NVDA":
asyncio.create_task(
self.api_call_handler.enqueue_api_call(trade.symbol)
)
except Exception as e:
logging.error(f"Error handling message: {e}")
finally:
self.handler_queue.task_done()

def extract_symbol(self, input_string):
match = re.search(r"O:([A-Z]+)", input_string)
if match:
return match.group(1)
else:
return None


class MyClient:
def __init__(self, feed, market, subscriptions):
api_key = os.getenv("POLYGON_API_KEY")
self.polygon_websocket_client = WebSocketClient(
api_key=api_key,
feed=feed,
market=market,
verbose=True,
subscriptions=subscriptions,
)
self.api_call_handler = ApiCallHandler()
self.message_handler = MessageHandler(self.api_call_handler)

async def start_event_stream(self):
try:
await asyncio.gather(
self.polygon_websocket_client.connect(self.message_handler.add),
self.message_handler.start_handling(),
self.api_call_handler.start_processing_api_calls(),
)
except Exception as e:
logging.error(f"Error in event stream: {e}")


async def main():
logging.basicConfig(level=logging.INFO)
my_client = MyClient(
feed=Feed.RealTime, market=Market.Options, subscriptions=["T.*"]
)
await my_client.start_event_stream()


# Entry point for the asyncio program
asyncio.run(main())
16 changes: 16 additions & 0 deletions examples/tools/async_websocket_rest_handler/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Pattern for Non-Blocking WebSocket and REST Calls in Python

This script demonstrates a non-blocking pattern for handling WebSocket streams and REST API calls in Python using asyncio. It focuses on efficient, concurrent processing of real-time financial data and asynchronous fetching of additional information, ensuring that real-time data streams are managed without delays or blockages. The tutorial provides both theoretical insights and a practical, adaptable example, ideal for applications in financial data processing and similar real-time data handling scenarios.

Please see the [tutorial](https://polygon.io/blog/pattern-for-non-blocking-websocket-and-rest-calls-in-python) for more details.

### Prerequisites

- Python 3.x
- Polygon.io account and Options API key

### Running the Example

```
python3 async_websocket_rest_handler.py
```