diff --git a/algotrader/app/live_ats.py b/algotrader/app/live_ats.py index 45cb196..0f7bf84 100644 --- a/algotrader/app/live_ats.py +++ b/algotrader/app/live_ats.py @@ -1,35 +1,45 @@ +import gevent +from gevent import monkey +monkey.patch_all() from algotrader.app import Application from algotrader.trading.config import Config, load_from_yaml from algotrader.trading.context import ApplicationContext from algotrader.utils.logging import logger - class ATSRunner(Application): def init(self): self.config = self.app_context.config + def run(self): + logger.info("starting ATS") + + self.app_context.start() + self.portfolio = self.app_context.portf_mgr.get_or_new_portfolio(self.config.get_app_config("portfolioId"), self.config.get_app_config( "portfolioInitialcash")) - self.app_context.add_startable(self.portfolio) self.strategy = self.app_context.stg_mgr.get_or_new_stg(self.config.get_app_config("stgId"), - self.config.get_app_config("stgCls")) - self.app_context.add_startable(self.strategy) + self.config.get_app_config("stgCls")) - def run(self): - logger.info("starting ATS") + self.app_context.add_startable(self.portfolio) + self.portfolio.start(self.app_context) - self.app_context.start() + self.app_context.add_startable(self.strategy) self.strategy.start(self.app_context) logger.info("ATS started, presss Ctrl-C to stop") + # TODO: how to handle the Ctrl-C gratefully and make sure all startables has called stop? + while True: + gevent.sleep(1) + def main(): config = Config( load_from_yaml("../../config/live_ib.yaml"), - load_from_yaml("../../config/mvg_avg_force.yaml")) + load_from_yaml("../../config/simple_market_making.yaml")) + # load_from_yaml("../../config/mvg_avg_force.yaml")) # load_from_yaml("../../config/down2%.yaml")) app_context = ApplicationContext(config=config) diff --git a/algotrader/provider/__init__.py b/algotrader/provider/__init__.py index f0532f4..a95cc48 100644 --- a/algotrader/provider/__init__.py +++ b/algotrader/provider/__init__.py @@ -18,6 +18,7 @@ def __init__(self): from algotrader.provider.feed.pandas_web import PandasWebDataFeed from algotrader.provider.feed.pandas_memory import PandasMemoryDataFeed from algotrader.provider.feed.pandas_db import PandaDBDataFeed +from algotrader.provider.feed.high_freq import HighFrequencyFileFeed from algotrader.provider.datastore.inmemory import InMemoryDataStore from algotrader.provider.datastore.mongodb import MongoDBDataStore @@ -36,6 +37,7 @@ def __init__(self): self.add(PandasWebDataFeed()) self.add(PandasMemoryDataFeed()) self.add(PandaDBDataFeed()) + self.add(HighFrequencyFileFeed()) def id(self): return "ProviderManager" diff --git a/algotrader/provider/broker/ib/ib_broker.py b/algotrader/provider/broker/ib/ib_broker.py index a2add27..984f851 100644 --- a/algotrader/provider/broker/ib/ib_broker.py +++ b/algotrader/provider/broker/ib/ib_broker.py @@ -197,12 +197,11 @@ def _start(self, app_context: Context) -> None: # time.sleep(1) def poll(self): - logger.info("poll is called") ok = True - while ok: - logger.info("checkMessages") + while True: + logger.debug("checkMessages") ok = self.tws.checkMessages() - logger.info("ok = %s after tws checkMessage" % ok ) + logger.debug("ok = %s after tws checkMessage" % ok ) if ok and (not self.tws or not self.tws.isConnected()): logger.info("ok is now False") @@ -273,8 +272,8 @@ def unsubscribe_mktdata(self, *sub_reqs): def __req_mktdata(self, req_id, sub_req, contract): self.tws.reqMktData(req_id, contract, '', # genericTicks - False # snapshot - ) + False, # snapshot + swigibpy.TagValueList()) def __cancel_mktdata(self, req_id): self.tws.cancelMktData(req_id) @@ -380,6 +379,7 @@ def tickPrice(self, tickerId, field, price, canAutoExecute): """ TickerId tickerId, TickType field, double price, int canAutoExecute """ + logger.debug("%s,%s,%s,%s" % (tickerId, field, price, canAutoExecute)) record = self.data_sub_reg.get_data_record(tickerId) if record: prev = price @@ -430,7 +430,9 @@ def tickSize(self, tickerId, field, size): def __emit_market_data(self, field, record): if record.quote_req and ( field == swigibpy.BID or field == swigibpy.BID_SIZE or field == swigibpy.ASK or field == swigibpy.ASK_SIZE) and record.bid > 0 and record.ask > 0: - self.data_event_bus.on_next(Quote(inst_id=record.inst_id, timestamp=self.app_context.clock.now(), + self.data_event_bus.on_next(Quote(inst_id=record.inst_id, + timestamp=self.app_context.clock.now(), + provider_id=Broker.IB, bid=record.bid, bid_size=record.bid_size, ask=record.ask, diff --git a/algotrader/provider/broker/ib/ib_model_factory.py b/algotrader/provider/broker/ib/ib_model_factory.py index 2f49362..6b12e44 100644 --- a/algotrader/provider/broker/ib/ib_model_factory.py +++ b/algotrader/provider/broker/ib/ib_model_factory.py @@ -266,4 +266,4 @@ def convert_ib_md_side(self, ib_md_side): return self.ib_md_side_map[ib_md_side] def convert_ib_ord_status(self, ib_ord_status): - return self.ib_ord_status_map.get(ib_ord_status, OrderStatus.UNKNOWN) + return self.ib_ord_status_map.get(ib_ord_status, UnknownStatus) diff --git a/algotrader/provider/feed/__init__.py b/algotrader/provider/feed/__init__.py index 7037076..d39a2d8 100644 --- a/algotrader/provider/feed/__init__.py +++ b/algotrader/provider/feed/__init__.py @@ -19,6 +19,7 @@ class Feed(Provider): Yahoo = "Yahoo" Google = "Google" Quandl = "Quandl" + HF = "HF" __metaclass__ = abc.ABCMeta diff --git a/algotrader/provider/feed/high_freq.py b/algotrader/provider/feed/high_freq.py new file mode 100644 index 0000000..36cda65 --- /dev/null +++ b/algotrader/provider/feed/high_freq.py @@ -0,0 +1,79 @@ +import csv +import os +import re +from datetime import datetime + +import numpy as np +import pandas as pd +from algotrader import Context +from algotrader.model.model_factory import ModelFactory +from algotrader.provider.feed import Feed +from algotrader.utils.date import datestr_to_unixtimemillis, datetime_to_unixtimemillis + +# this is a particular parser +def level_one_parser(row, inst_id, provider_id): + ts = datetime_to_unixtimemillis(datetime.strptime(row[0], '%Y%m%d%H%M%S%f')) + raw_quote_row = row[1:] + + raw_bid_book = [x for x in raw_quote_row if x[0] == 'B'] + raw_ask_book = [x for x in raw_quote_row if x[0] == 'A'] + + bid_split = [re.split('@', x) for x in raw_bid_book] + bid_flattened = list(map(list, zip(*bid_split))) + bid_size = np.array([x[1:] for x in bid_flattened[0]], dtype='float') + bid_array = np.array([x for x in bid_flattened[1]], dtype='float') + + ask_split = [re.split('@', x) for x in raw_ask_book] + ask_flattened = list(map(list, zip(*ask_split))) + ask_size = np.array([x[1:] for x in ask_flattened[0]], dtype='float') + ask_array = np.array([x for x in ask_flattened[1]], dtype='float') + + # TODO review the instrument Id and the idx + return ModelFactory.build_quote(timestamp=ts, + inst_id=inst_id, + provider_id=provider_id, + bid=bid_array[0], bid_size=bid_size[0], + ask=ask_array[0], ask_size=ask_size[0]) + + +class HighFrequencyFileFeed(Feed): + dateparse = lambda x: pd.datetime.strptime(x, '%Y-%m-%d') + + def __init__(self): + super(HighFrequencyFileFeed, self).__init__() + + def _start(self, app_context : Context) -> None: + self.path = self._get_feed_config("path") + + def id(self): + return Feed.HF + + def subscribe_mktdata(self, *sub_reqs): + # self._verify_subscription(*sub_reqs); + sub_req_ranges = {} + insts = {} + for sub_req in sub_reqs: + insts[sub_req.inst_id] = self.app_context.ref_data_mgr.get_inst(inst_id=sub_req.inst_id) + sub_req_ranges[sub_req.inst_id] = ( + datestr_to_unixtimemillis(str(sub_req.from_date)), datestr_to_unixtimemillis(str(sub_req.to_date))) + + dfs = self.__load_csv(insts, *sub_reqs) + + + def unsubscribe_mktdata(self, *sub_reqs): + pass + + def __load_csv(self, insts, *sub_reqs): + dfs = [] + for sub_req in sub_reqs: + inst = insts[sub_req.inst_id] + filename = '%s/%s.csv' % (self.path, inst.symbol.lower()) + with open(filename, 'r') as csvfile: + reader = csv.reader(csvfile, delimiter=' ') + for row in reader: + if row[1][0] == 'B': + # TODO:, this call particular parser, generalize it! + quote = level_one_parser(row, inst_id=inst.inst_id, provider_id=self.id()) + self.app_context.event_bus.data_subject.on_next(quote) + + diff --git a/algotrader/strategy/__init__.py b/algotrader/strategy/__init__.py index d20226b..ee6d80e 100644 --- a/algotrader/strategy/__init__.py +++ b/algotrader/strategy/__init__.py @@ -16,6 +16,7 @@ from algotrader.model.market_data_pb2 import * from algotrader.model.trade_data_pb2 import * +from algotrader.utils.logging import logger class Strategy(HasPositions, ExecutionEventHandler, Startable, HasId): @@ -67,6 +68,7 @@ def _start(self, app_context: Context) -> None: self.feed.subscribe_mktdata(*sub_req) def _stop(self): + logger.debug("[%s] %s" % (self.__class__.__name__, "_stop is called!")) if self.event_subscription: self.event_subscription.dispose() diff --git a/algotrader/strategy/simple_market_making.py b/algotrader/strategy/simple_market_making.py new file mode 100644 index 0000000..5056c2a --- /dev/null +++ b/algotrader/strategy/simple_market_making.py @@ -0,0 +1,54 @@ + +from algotrader import Context +from algotrader.model.trade_data_pb2 import * +from algotrader.model.market_data_pb2 import * + +from algotrader.strategy import Strategy +from algotrader.technical.function_wrapper import sklearn_trasformer_function +from algotrader.technical.talib_wrapper import talib_function +from algotrader.technical.mvg_avg_force import MovingAvgForceProcess +from algotrader.utils.market_data import build_bar_frame_id, M1, D1 +from algotrader.utils.logging import logger + + +class SimpleMarketMaking(Strategy): + def __init__(self, stg_id: str, stg_cls: str, state: StrategyState = None): + super(SimpleMarketMaking, self).__init__(stg_id=stg_id, stg_cls=stg_cls, state=state) + self.qty = None + self.tick = 1 + self.buy_order = None + self.sell_order = None + + def _start(self, app_context, **kwargs): + logger.info("strategy started!") + # self.qty = self.get_stg_config_value("qty", 1) + # self.tick = self.get_stg_config_value("tick", 1) + + self.qty = self._get_stg_config("qty", default=1) + self.tick = self._get_stg_config("tick", default=1) + + # self.close = self.app_context.inst_data_mgr.get_series( + # "Bar.%s.Time.86400" % self.app_context.app_config.instrument_ids[0]) + # self.close.start(app_context) + super(SimpleMarketMaking, self)._start(app_context, **kwargs) + + def _stop(self): + super(SimpleMarketMaking, self)._stop() + + def on_quote(self, quote: Quote): + pass + # logger.info("on_quote is called with %s" % quote) + # multi = 2 + #if self.buy_order is None: + # self.buy_order = self.limit_order(quote.inst_id, Buy, self.qty, quote.bid - multi * self.tick) + + # if self.sell_order is None: + # self.sell_order = self.limit_order(quote.inst_id, Sell, self.qty, quote.ask + multi * self.tick) + + def on_trade(self, trade: Trade): + logger.info("on_quote is called with %s" % trade) + + + + + diff --git a/algotrader/trading/instrument_data.py b/algotrader/trading/instrument_data.py index 197349c..11b81b8 100644 --- a/algotrader/trading/instrument_data.py +++ b/algotrader/trading/instrument_data.py @@ -137,13 +137,12 @@ def on_bar(self, bar): self.store.save_bar(bar) def on_quote(self, quote): - logger.debug("[%s] %s" % (self.__class__.__name__, quote)) + logger.info("[%s] %s" % (self.__class__.__name__, quote)) self.__quote_dict[quote.inst_id] = quote - self.get_series(get_series_id(quote)).add( - timestamp=quote.timestamp, - value={"bid": quote.bid, "ask": quote.ask, "bid_size": quote.bid_size, - "ask_size": quote.ask_size}) + self.get_frame(get_frame_id(quote), inst_id=quote.inst_id, provider_id=quote.provider_id, + columns=['bid', 'ask', 'bid_size', 'ask_size']).append_row(index=quote.timestamp, + value={"bid": quote.bid, "ask": quote.ask, "bid_size": quote.bid_size, "ask_size": quote.ask_size}) if self._is_realtime_persist(): self.store.save_quote(quote) @@ -151,9 +150,10 @@ def on_quote(self, quote): def on_trade(self, trade): logger.debug("[%s] %s" % (self.__class__.__name__, trade)) self.__trade_dict[trade.inst_id] = trade - self.get_series(get_series_id(trade)).add( - timestamp=trade.timestamp, - value={"price": trade.price, "size": trade.size}) + + self.get_frame(get_frame_id(trade), inst_id=trade.inst_id, provider_id=trade.provider_id, + columns=['price', 'size']).append_row(index=trade.timestamp, + value={"price": trade.price, "size": trade.size}) if self._is_realtime_persist(): self.store.save_trade(trade) diff --git a/config/live_ib.yaml b/config/live_ib.yaml index 6746828..7714db5 100644 --- a/config/live_ib.yaml +++ b/config/live_ib.yaml @@ -11,6 +11,7 @@ Application: feedId: "IB" brokerId: "IB" + portfolioId: "test1" DataStore: @@ -44,7 +45,8 @@ Broker: port: 4001 clientId: 0 account: 1 - daemon: true + daemon: false useGevent: true nextRequestId: 1 - nextOrderId: 1 \ No newline at end of file + nextOrderId: 1 + geventSleep: 0 diff --git a/config/simple_market_making.yaml b/config/simple_market_making.yaml new file mode 100644 index 0000000..faf23db --- /dev/null +++ b/config/simple_market_making.yaml @@ -0,0 +1,45 @@ +Application: + stgId: "simplemm" + stgCls: "algotrader.strategy.simple_market_making.SimpleMarketMaking" + instrumentIds: +# - "HSIZ12@HKFE" +# - "HSIF10@HKFE" +# - "HSIG10@HKFE" +# - "HSIH18@HKFE" +# - "NQH18@GLOBEX" +# - "AAPL@NASDAQ" +# - "SPY@ARCA" +# - "EEM@ARCA" +# - "XLF@ARCA" +# - "VXX@ARCA" +# - "QQQ@ARCA" +# - "GCX@ARCA" +# - "IWM@ARCA" +# - "FXI@ARCA" +# - "XLU@ARCA" +# - "EWZ@ARCA" +# - "USO@ARCA" +# - "HYG@ARCA" + - "1@SEHK" + - "2@SEHK" + - "3@SEHK" + - "4@SEHK" + - "5@SEHK" + - "11@SEHK" + - "12@SEHK" + - "16@SEHK" + - "17@SEHK" + - "700@SEHK" + - "2318@SEHK" + - "2628@SEHK" + subscriptionTypes: +# - "Bar.IB.Time.M1" + - "Quote.IB" + - "Trade.IB" +# - "Quote.HF" + subscriptions: + +Strategy: + simplemm: + qty: 1 + tick: 2