-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmarket_data_collector.py
52 lines (43 loc) · 1.74 KB
/
market_data_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import logging
from data_classes import ExchangeDatastore
from market_scrapers import QTradeScraper, BittrexScraper, CCXTScraper
scraper_classes = {
"qtrade": QTradeScraper,
"bittrex": BittrexScraper,
"ccxt": CCXTScraper
}
log = logging.getLogger('mdc')
class MarketDataCollector:
def __init__(self, config):
# load config from yaml file
self.config = config
# load scrapers
self.scrapers = []
for name, cfg in self.config['scrapers'].items():
self.scrapers.append(
scraper_classes[name](exchange_name=name, **cfg))
def update_tickers(self):
log.debug("Updating tickers...")
for s in self.scrapers:
ExchangeDatastore.tickers[s.exchange_name] = s.scrape_ticker()
def update_midpoints(self): # be sure to update tickers first
log.debug("Updating midpoints...")
for exchange_name, markets in ExchangeDatastore.tickers.items():
for market, ticker in markets.items():
bid = ticker["bid"]
last = ticker["last"]
ExchangeDatastore.midpoints.setdefault(exchange_name, {})
ExchangeDatastore.midpoints[exchange_name][
market] = (bid + last) / 2
async def daemon(self):
log.info("Starting market data collector; interval period %s sec",
self.config['update_period'])
while True:
try:
log.info("Pulling market data...")
self.update_tickers()
self.update_midpoints()
await asyncio.sleep(self.config["update_period"])
except Exception:
log.warning("Market scraper loop exploded", exc_info=True)