Skip to content

Commit 167979c

Browse files
committed
feat: Rewrite order matching strategy
Refactors the current trading component into a clearly separated orderbook component and a trade execution component. The linking part is the `ExecutableMatch` which can be derived from the matches stored into the database. At the moment we assume optimistically that the trade execution will succeed. However, we should consider that a pending match may never get filled or it fails at execution in such a scenario we would need to rollback the matched orders.
1 parent f2996aa commit 167979c

File tree

20 files changed

+1287
-1315
lines changed

20 files changed

+1287
-1315
lines changed

coordinator/src/bin/coordinator.rs

+24-19
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ use coordinator::run_migration;
2525
use coordinator::scheduler::NotificationScheduler;
2626
use coordinator::settings::Settings;
2727
use coordinator::storage::CoordinatorTenTenOneStorage;
28+
use coordinator::trade;
2829
use coordinator::trade::websocket::InternalPositionUpdateMessage;
2930
use diesel::r2d2;
3031
use diesel::r2d2::ConnectionManager;
3132
use diesel::PgConnection;
3233
use lnd_bridge::LndBridge;
3334
use rand::thread_rng;
3435
use rand::RngCore;
36+
use rust_decimal::prelude::FromPrimitive;
37+
use rust_decimal::Decimal;
3538
use std::backtrace::Backtrace;
3639
use std::net::IpAddr;
3740
use std::net::Ipv4Addr;
@@ -261,21 +264,22 @@ async fn main() -> Result<()> {
261264

262265
let (tx_orderbook_feed, _rx) = broadcast::channel(100);
263266

264-
let (_handle, trading_sender) = trading::start(
265-
node.clone(),
266-
tx_orderbook_feed.clone(),
267-
auth_users_notifier.clone(),
267+
let trade_executor = trade::spawn_trade_executor(node.clone(), auth_users_notifier.clone())?;
268+
269+
let order_matching_fee_rate =
270+
Decimal::from_f32(node.settings.read().await.order_matching_fee_rate).expect("to fit");
271+
272+
let orderbook_sender = trading::spawn_orderbook(
273+
node.pool.clone(),
268274
notification_service.get_sender(),
269-
network,
270-
node.inner.oracle_pubkey,
271-
);
272-
let _handle = async_match::monitor(
273-
node.clone(),
274-
node_event_handler.subscribe(),
275-
auth_users_notifier.clone(),
276-
network,
277-
node.inner.oracle_pubkey,
278-
);
275+
trade_executor.clone(),
276+
tx_orderbook_feed.clone(),
277+
order_matching_fee_rate,
278+
)?;
279+
280+
let _handle =
281+
async_match::monitor(node.clone(), node_event_handler.subscribe(), trade_executor);
282+
279283
let _handle = rollover::monitor(
280284
pool.clone(),
281285
node_event_handler.subscribe(),
@@ -294,11 +298,12 @@ async fn main() -> Result<()> {
294298

295299
tokio::spawn({
296300
let node = node.clone();
297-
let trading_sender = trading_sender.clone();
301+
let orderbook_sender = orderbook_sender.clone();
298302
async move {
299303
loop {
300304
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
301-
if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await
305+
if let Err(e) =
306+
expired_positions::close(node.clone(), orderbook_sender.clone()).await
302307
{
303308
tracing::error!("Failed to close expired positions! Error: {e:#}");
304309
}
@@ -308,11 +313,11 @@ async fn main() -> Result<()> {
308313

309314
tokio::spawn({
310315
let node = node.clone();
311-
let trading_sender = trading_sender.clone();
316+
let orderbook_sender = orderbook_sender.clone();
312317
async move {
313318
loop {
314319
tokio::time::sleep(LIQUIDATED_POSITION_SYNC_INTERVAL).await;
315-
liquidated_positions::monitor(node.clone(), trading_sender.clone()).await
320+
liquidated_positions::monitor(node.clone(), orderbook_sender.clone()).await
316321
}
317322
}
318323
});
@@ -325,7 +330,7 @@ async fn main() -> Result<()> {
325330
settings.clone(),
326331
exporter,
327332
NODE_ALIAS,
328-
trading_sender,
333+
orderbook_sender,
329334
tx_orderbook_feed,
330335
tx_position_feed,
331336
tx_user_feed,

coordinator/src/node/expired_positions.rs

+13-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use crate::db;
22
use crate::node::Node;
33
use crate::orderbook;
4-
use crate::orderbook::db::orders;
5-
use crate::orderbook::trading::NewOrderMessage;
4+
use crate::orderbook::trading::OrderbookMessage;
65
use crate::position::models::Position;
76
use crate::position::models::PositionState;
8-
use anyhow::anyhow;
97
use anyhow::Context;
108
use anyhow::Result;
119
use rust_decimal::prelude::FromPrimitive;
@@ -18,14 +16,15 @@ use xxi_node::commons::average_execution_price;
1816
use xxi_node::commons::Match;
1917
use xxi_node::commons::MatchState;
2018
use xxi_node::commons::NewMarketOrder;
19+
use xxi_node::commons::NewOrder;
2120
use xxi_node::commons::OrderReason;
2221
use xxi_node::commons::OrderState;
2322

2423
/// The timeout before we give up on closing an expired position collaboratively. This value should
2524
/// not be larger than our refund transaction time lock.
2625
pub const EXPIRED_POSITION_TIMEOUT: Duration = Duration::days(7);
2726

28-
pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) -> Result<()> {
27+
pub async fn close(node: Node, orderbook_sender: mpsc::Sender<OrderbookMessage>) -> Result<()> {
2928
let mut conn = node.pool.get()?;
3029

3130
let positions = db::positions::Position::get_all_open_positions(&mut conn)
@@ -50,8 +49,9 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->
5049

5150
if order.expiry < OffsetDateTime::now_utc() {
5251
tracing::warn!(trader_id, order_id, "Matched order expired! Giving up on that position, looks like the corresponding dlc channel has to get force closed.");
52+
// TODO(holzeis): It's not ideal that the order and match are updated by the trade
53+
// executor. This should rather get updated by the orderbook.
5354
orderbook::db::orders::set_order_state(&mut conn, order.id, OrderState::Expired)?;
54-
5555
orderbook::db::matches::set_match_state_by_order_id(
5656
&mut conn,
5757
order.id,
@@ -75,11 +75,13 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->
7575

7676
tracing::debug!(trader_pk=%position.trader, %position.expiry_timestamp, "Attempting to close expired position");
7777

78+
let order_id = uuid::Uuid::new_v4();
79+
let trader_pubkey = position.trader;
7880
let new_order = NewMarketOrder {
79-
id: uuid::Uuid::new_v4(),
81+
id: order_id,
8082
contract_symbol: position.contract_symbol,
8183
quantity: Decimal::try_from(position.quantity).expect("to fit into decimal"),
82-
trader_id: position.trader,
84+
trader_id: trader_pubkey,
8385
direction: position.trader_direction.opposite(),
8486
leverage: Decimal::from_f32(position.trader_leverage).expect("to fit into decimal"),
8587
// This order can basically not expire, but if the user does not come back online within
@@ -89,18 +91,13 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->
8991
stable: position.stable,
9092
};
9193

92-
let order = orders::insert_market_order(&mut conn, new_order.clone(), OrderReason::Expired)
93-
.map_err(|e| anyhow!(e))
94-
.context("Failed to insert expired order into DB")?;
95-
96-
let message = NewOrderMessage {
97-
order,
98-
channel_opening_params: None,
94+
let message = OrderbookMessage::NewOrder {
95+
new_order: NewOrder::Market(new_order),
9996
order_reason: OrderReason::Expired,
10097
};
10198

102-
if let Err(e) = trading_sender.send(message).await {
103-
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}");
99+
if let Err(e) = orderbook_sender.send(message).await {
100+
tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing expired position. Error: {e:#}");
104101
continue;
105102
}
106103
}

coordinator/src/node/liquidated_positions.rs

+13-24
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use crate::db;
22
use crate::node::Node;
33
use crate::orderbook;
4-
use crate::orderbook::db::orders;
5-
use crate::orderbook::trading::NewOrderMessage;
4+
use crate::orderbook::trading::OrderbookMessage;
65
use anyhow::Result;
76
use rust_decimal::prelude::FromPrimitive;
87
use rust_decimal::Decimal;
@@ -17,16 +16,17 @@ use xxi_node::commons::Direction;
1716
use xxi_node::commons::Match;
1817
use xxi_node::commons::MatchState;
1918
use xxi_node::commons::NewMarketOrder;
19+
use xxi_node::commons::NewOrder;
2020
use xxi_node::commons::OrderReason;
2121
use xxi_node::commons::OrderState;
2222

2323
/// The timeout before we give up on closing a liquidated position collaboratively. This value
2424
/// should not be larger than our refund transaction time lock.
2525
pub const LIQUIDATION_POSITION_TIMEOUT: Duration = Duration::days(7);
2626

27-
pub async fn monitor(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) {
27+
pub async fn monitor(node: Node, orderbook_sender: mpsc::Sender<OrderbookMessage>) {
2828
if let Err(e) =
29-
check_if_positions_need_to_get_liquidated(trading_sender.clone(), node.clone()).await
29+
check_if_positions_need_to_get_liquidated(orderbook_sender.clone(), node.clone()).await
3030
{
3131
tracing::error!("Failed to check if positions need to get liquidated. Error: {e:#}");
3232
}
@@ -35,7 +35,7 @@ pub async fn monitor(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>)
3535
/// For all open positions, check if the maintenance margin has been reached. Send a liquidation
3636
/// async match to the traders whose positions have been liquidated.
3737
async fn check_if_positions_need_to_get_liquidated(
38-
trading_sender: mpsc::Sender<NewOrderMessage>,
38+
orderbook_sender: mpsc::Sender<OrderbookMessage>,
3939
node: Node,
4040
) -> Result<()> {
4141
let mut conn = node.pool.get()?;
@@ -121,11 +121,13 @@ async fn check_if_positions_need_to_get_liquidated(
121121
}
122122
}
123123

124+
let trader_pubkey = position.trader;
125+
let order_id = uuid::Uuid::new_v4();
124126
let new_order = NewMarketOrder {
125-
id: uuid::Uuid::new_v4(),
127+
id: order_id,
126128
contract_symbol: position.contract_symbol,
127129
quantity: Decimal::try_from(position.quantity).expect("to fit into decimal"),
128-
trader_id: position.trader,
130+
trader_id: trader_pubkey,
129131
direction: position.trader_direction.opposite(),
130132
leverage: Decimal::from_f32(position.trader_leverage).expect("to fit into decimal"),
131133
// This order can basically not expire, but if the user does not come back online
@@ -140,26 +142,13 @@ async fn check_if_positions_need_to_get_liquidated(
140142
false => OrderReason::CoordinatorLiquidated,
141143
};
142144

143-
let order = match orders::insert_market_order(
144-
&mut conn,
145-
new_order.clone(),
146-
order_reason.clone(),
147-
) {
148-
Ok(order) => order,
149-
Err(e) => {
150-
tracing::error!("Failed to insert liquidation order into DB. Error: {e:#}");
151-
continue;
152-
}
153-
};
154-
155-
let message = NewOrderMessage {
156-
order,
157-
channel_opening_params: None,
145+
let message = OrderbookMessage::NewOrder {
146+
new_order: NewOrder::Market(new_order),
158147
order_reason,
159148
};
160149

161-
if let Err(e) = trading_sender.send(message).await {
162-
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing liquidated position. Error: {e:#}");
150+
if let Err(e) = orderbook_sender.send(message).await {
151+
tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing liquidated position. Error: {e:#}");
163152
continue;
164153
}
165154
}

0 commit comments

Comments
 (0)