Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions algotrader/app/live_ats.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
import gevent
from gevent import monkey
monkey.patch_all()
from algotrader.app import Application
from algotrader.trading.config import Config, load_from_yaml
from algotrader.trading.context import ApplicationContext
from algotrader.utils.logging import logger


class ATSRunner(Application):
def init(self):
self.config = self.app_context.config

def run(self):
logger.info("starting ATS")

self.app_context.start()

self.portfolio = self.app_context.portf_mgr.get_or_new_portfolio(self.config.get_app_config("portfolioId"),
self.config.get_app_config(
"portfolioInitialcash"))
self.app_context.add_startable(self.portfolio)

self.strategy = self.app_context.stg_mgr.get_or_new_stg(self.config.get_app_config("stgId"),
self.config.get_app_config("stgCls"))
self.app_context.add_startable(self.strategy)
self.config.get_app_config("stgCls"))

def run(self):
logger.info("starting ATS")
self.app_context.add_startable(self.portfolio)
self.portfolio.start(self.app_context)

self.app_context.start()
self.app_context.add_startable(self.strategy)
self.strategy.start(self.app_context)

logger.info("ATS started, presss Ctrl-C to stop")

# TODO: how to handle the Ctrl-C gratefully and make sure all startables has called stop?
while True:
gevent.sleep(1)


def main():
config = Config(
load_from_yaml("../../config/live_ib.yaml"),
load_from_yaml("../../config/mvg_avg_force.yaml"))
load_from_yaml("../../config/simple_market_making.yaml"))
# load_from_yaml("../../config/mvg_avg_force.yaml"))
# load_from_yaml("../../config/down2%.yaml"))

app_context = ApplicationContext(config=config)
Expand Down
2 changes: 2 additions & 0 deletions algotrader/provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
from algotrader.provider.feed.pandas_web import PandasWebDataFeed
from algotrader.provider.feed.pandas_memory import PandasMemoryDataFeed
from algotrader.provider.feed.pandas_db import PandaDBDataFeed
from algotrader.provider.feed.high_freq import HighFrequencyFileFeed
from algotrader.provider.datastore.inmemory import InMemoryDataStore
from algotrader.provider.datastore.mongodb import MongoDBDataStore

Expand All @@ -36,6 +37,7 @@ def __init__(self):
self.add(PandasWebDataFeed())
self.add(PandasMemoryDataFeed())
self.add(PandaDBDataFeed())
self.add(HighFrequencyFileFeed())

def id(self):
return "ProviderManager"
16 changes: 9 additions & 7 deletions algotrader/provider/broker/ib/ib_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,11 @@ def _start(self, app_context: Context) -> None:
# time.sleep(1)

def poll(self):
logger.info("poll is called")
ok = True
while ok:
logger.info("checkMessages")
while True:
logger.debug("checkMessages")
ok = self.tws.checkMessages()
logger.info("ok = %s after tws checkMessage" % ok )
logger.debug("ok = %s after tws checkMessage" % ok )

if ok and (not self.tws or not self.tws.isConnected()):
logger.info("ok is now False")
Expand Down Expand Up @@ -273,8 +272,8 @@ def unsubscribe_mktdata(self, *sub_reqs):
def __req_mktdata(self, req_id, sub_req, contract):
self.tws.reqMktData(req_id, contract,
'', # genericTicks
False # snapshot
)
False, # snapshot
swigibpy.TagValueList())

def __cancel_mktdata(self, req_id):
self.tws.cancelMktData(req_id)
Expand Down Expand Up @@ -380,6 +379,7 @@ def tickPrice(self, tickerId, field, price, canAutoExecute):
"""
TickerId tickerId, TickType field, double price, int canAutoExecute
"""
logger.debug("%s,%s,%s,%s" % (tickerId, field, price, canAutoExecute))
record = self.data_sub_reg.get_data_record(tickerId)
if record:
prev = price
Expand Down Expand Up @@ -430,7 +430,9 @@ def tickSize(self, tickerId, field, size):
def __emit_market_data(self, field, record):
if record.quote_req and (
field == swigibpy.BID or field == swigibpy.BID_SIZE or field == swigibpy.ASK or field == swigibpy.ASK_SIZE) and record.bid > 0 and record.ask > 0:
self.data_event_bus.on_next(Quote(inst_id=record.inst_id, timestamp=self.app_context.clock.now(),
self.data_event_bus.on_next(Quote(inst_id=record.inst_id,
timestamp=self.app_context.clock.now(),
provider_id=Broker.IB,
bid=record.bid,
bid_size=record.bid_size,
ask=record.ask,
Expand Down
2 changes: 1 addition & 1 deletion algotrader/provider/broker/ib/ib_model_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,4 @@ def convert_ib_md_side(self, ib_md_side):
return self.ib_md_side_map[ib_md_side]

def convert_ib_ord_status(self, ib_ord_status):
return self.ib_ord_status_map.get(ib_ord_status, OrderStatus.UNKNOWN)
return self.ib_ord_status_map.get(ib_ord_status, UnknownStatus)
1 change: 1 addition & 0 deletions algotrader/provider/feed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Feed(Provider):
Yahoo = "Yahoo"
Google = "Google"
Quandl = "Quandl"
HF = "HF"

__metaclass__ = abc.ABCMeta

Expand Down
79 changes: 79 additions & 0 deletions algotrader/provider/feed/high_freq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import csv
import os
import re
from datetime import datetime

import numpy as np
import pandas as pd
from algotrader import Context
from algotrader.model.model_factory import ModelFactory
from algotrader.provider.feed import Feed
from algotrader.utils.date import datestr_to_unixtimemillis, datetime_to_unixtimemillis

# this is a particular parser
def level_one_parser(row, inst_id, provider_id):
ts = datetime_to_unixtimemillis(datetime.strptime(row[0], '%Y%m%d%H%M%S%f'))
raw_quote_row = row[1:]

raw_bid_book = [x for x in raw_quote_row if x[0] == 'B']
raw_ask_book = [x for x in raw_quote_row if x[0] == 'A']

bid_split = [re.split('@', x) for x in raw_bid_book]
bid_flattened = list(map(list, zip(*bid_split)))
bid_size = np.array([x[1:] for x in bid_flattened[0]], dtype='float')
bid_array = np.array([x for x in bid_flattened[1]], dtype='float')

ask_split = [re.split('@', x) for x in raw_ask_book]
ask_flattened = list(map(list, zip(*ask_split)))
ask_size = np.array([x[1:] for x in ask_flattened[0]], dtype='float')
ask_array = np.array([x for x in ask_flattened[1]], dtype='float')

# TODO review the instrument Id and the idx
return ModelFactory.build_quote(timestamp=ts,
inst_id=inst_id,
provider_id=provider_id,
bid=bid_array[0], bid_size=bid_size[0],
ask=ask_array[0], ask_size=ask_size[0])


class HighFrequencyFileFeed(Feed):
dateparse = lambda x: pd.datetime.strptime(x, '%Y-%m-%d')

def __init__(self):
super(HighFrequencyFileFeed, self).__init__()

def _start(self, app_context : Context) -> None:
self.path = self._get_feed_config("path")

def id(self):
return Feed.HF

def subscribe_mktdata(self, *sub_reqs):
# self._verify_subscription(*sub_reqs);
sub_req_ranges = {}
insts = {}
for sub_req in sub_reqs:
insts[sub_req.inst_id] = self.app_context.ref_data_mgr.get_inst(inst_id=sub_req.inst_id)
sub_req_ranges[sub_req.inst_id] = (
datestr_to_unixtimemillis(str(sub_req.from_date)), datestr_to_unixtimemillis(str(sub_req.to_date)))

dfs = self.__load_csv(insts, *sub_reqs)


def unsubscribe_mktdata(self, *sub_reqs):
pass

def __load_csv(self, insts, *sub_reqs):
dfs = []
for sub_req in sub_reqs:
inst = insts[sub_req.inst_id]
filename = '%s/%s.csv' % (self.path, inst.symbol.lower())
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=' ')
for row in reader:
if row[1][0] == 'B':
# TODO:, this call particular parser, generalize it!
quote = level_one_parser(row, inst_id=inst.inst_id, provider_id=self.id())
self.app_context.event_bus.data_subject.on_next(quote)


2 changes: 2 additions & 0 deletions algotrader/strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from algotrader.model.market_data_pb2 import *
from algotrader.model.trade_data_pb2 import *
from algotrader.utils.logging import logger


class Strategy(HasPositions, ExecutionEventHandler, Startable, HasId):
Expand Down Expand Up @@ -67,6 +68,7 @@ def _start(self, app_context: Context) -> None:
self.feed.subscribe_mktdata(*sub_req)

def _stop(self):
logger.debug("[%s] %s" % (self.__class__.__name__, "_stop is called!"))
if self.event_subscription:
self.event_subscription.dispose()

Expand Down
54 changes: 54 additions & 0 deletions algotrader/strategy/simple_market_making.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@

from algotrader import Context
from algotrader.model.trade_data_pb2 import *
from algotrader.model.market_data_pb2 import *

from algotrader.strategy import Strategy
from algotrader.technical.function_wrapper import sklearn_trasformer_function
from algotrader.technical.talib_wrapper import talib_function
from algotrader.technical.mvg_avg_force import MovingAvgForceProcess
from algotrader.utils.market_data import build_bar_frame_id, M1, D1
from algotrader.utils.logging import logger


class SimpleMarketMaking(Strategy):
def __init__(self, stg_id: str, stg_cls: str, state: StrategyState = None):
super(SimpleMarketMaking, self).__init__(stg_id=stg_id, stg_cls=stg_cls, state=state)
self.qty = None
self.tick = 1
self.buy_order = None
self.sell_order = None

def _start(self, app_context, **kwargs):
logger.info("strategy started!")
# self.qty = self.get_stg_config_value("qty", 1)
# self.tick = self.get_stg_config_value("tick", 1)

self.qty = self._get_stg_config("qty", default=1)
self.tick = self._get_stg_config("tick", default=1)

# self.close = self.app_context.inst_data_mgr.get_series(
# "Bar.%s.Time.86400" % self.app_context.app_config.instrument_ids[0])
# self.close.start(app_context)
super(SimpleMarketMaking, self)._start(app_context, **kwargs)

def _stop(self):
super(SimpleMarketMaking, self)._stop()

def on_quote(self, quote: Quote):
pass
# logger.info("on_quote is called with %s" % quote)
# multi = 2
#if self.buy_order is None:
# self.buy_order = self.limit_order(quote.inst_id, Buy, self.qty, quote.bid - multi * self.tick)

# if self.sell_order is None:
# self.sell_order = self.limit_order(quote.inst_id, Sell, self.qty, quote.ask + multi * self.tick)

def on_trade(self, trade: Trade):
logger.info("on_quote is called with %s" % trade)





16 changes: 8 additions & 8 deletions algotrader/trading/instrument_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,23 @@ def on_bar(self, bar):
self.store.save_bar(bar)

def on_quote(self, quote):
logger.debug("[%s] %s" % (self.__class__.__name__, quote))
logger.info("[%s] %s" % (self.__class__.__name__, quote))
self.__quote_dict[quote.inst_id] = quote

self.get_series(get_series_id(quote)).add(
timestamp=quote.timestamp,
value={"bid": quote.bid, "ask": quote.ask, "bid_size": quote.bid_size,
"ask_size": quote.ask_size})
self.get_frame(get_frame_id(quote), inst_id=quote.inst_id, provider_id=quote.provider_id,
columns=['bid', 'ask', 'bid_size', 'ask_size']).append_row(index=quote.timestamp,
value={"bid": quote.bid, "ask": quote.ask, "bid_size": quote.bid_size, "ask_size": quote.ask_size})

if self._is_realtime_persist():
self.store.save_quote(quote)

def on_trade(self, trade):
logger.debug("[%s] %s" % (self.__class__.__name__, trade))
self.__trade_dict[trade.inst_id] = trade
self.get_series(get_series_id(trade)).add(
timestamp=trade.timestamp,
value={"price": trade.price, "size": trade.size})

self.get_frame(get_frame_id(trade), inst_id=trade.inst_id, provider_id=trade.provider_id,
columns=['price', 'size']).append_row(index=trade.timestamp,
value={"price": trade.price, "size": trade.size})

if self._is_realtime_persist():
self.store.save_trade(trade)
Expand Down
6 changes: 4 additions & 2 deletions config/live_ib.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Application:

feedId: "IB"
brokerId: "IB"
portfolioId: "test1"


DataStore:
Expand Down Expand Up @@ -44,7 +45,8 @@ Broker:
port: 4001
clientId: 0
account: 1
daemon: true
daemon: false
useGevent: true
nextRequestId: 1
nextOrderId: 1
nextOrderId: 1
geventSleep: 0
45 changes: 45 additions & 0 deletions config/simple_market_making.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
Application:
stgId: "simplemm"
stgCls: "algotrader.strategy.simple_market_making.SimpleMarketMaking"
instrumentIds:
# - "HSIZ12@HKFE"
# - "HSIF10@HKFE"
# - "HSIG10@HKFE"
# - "HSIH18@HKFE"
# - "NQH18@GLOBEX"
# - "AAPL@NASDAQ"
# - "SPY@ARCA"
# - "EEM@ARCA"
# - "XLF@ARCA"
# - "VXX@ARCA"
# - "QQQ@ARCA"
# - "GCX@ARCA"
# - "IWM@ARCA"
# - "FXI@ARCA"
# - "XLU@ARCA"
# - "EWZ@ARCA"
# - "USO@ARCA"
# - "HYG@ARCA"
- "1@SEHK"
- "2@SEHK"
- "3@SEHK"
- "4@SEHK"
- "5@SEHK"
- "11@SEHK"
- "12@SEHK"
- "16@SEHK"
- "17@SEHK"
- "700@SEHK"
- "2318@SEHK"
- "2628@SEHK"
subscriptionTypes:
# - "Bar.IB.Time.M1"
- "Quote.IB"
- "Trade.IB"
# - "Quote.HF"
subscriptions:

Strategy:
simplemm:
qty: 1
tick: 2