Skip to content

Commit b025509

Browse files
authored
Merge pull request #44 from namjug-kim/develop
fix: change duplicate event time in orderBook stream
2 parents adfa23d + 37e7857 commit b025509

File tree

9 files changed

+39
-13
lines changed

9 files changed

+39
-13
lines changed

reactive-crypto-bitmax/src/main/kotlin/com/njkim/reactivecrypto/bitmax/BitmaxWebsocketClient.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import mu.KotlinLogging
2828
import reactor.core.publisher.Flux
2929
import reactor.core.scheduler.Schedulers
3030
import java.math.BigDecimal
31+
import java.time.ZonedDateTime
3132
import java.util.concurrent.ConcurrentHashMap
3233
import java.util.concurrent.Executors
3334
import java.util.concurrent.atomic.AtomicInteger
@@ -74,10 +75,11 @@ class BitmaxWebsocketClient : ExchangeWebsocketClient {
7475

7576
return Flux.merge(targetWebsockets)
7677
.map { bitmaxOrderBookDataWrapper ->
78+
val now = ZonedDateTime.now()
7779
OrderBook(
7880
"${bitmaxOrderBookDataWrapper.seqnum}",
7981
bitmaxOrderBookDataWrapper.s,
80-
bitmaxOrderBookDataWrapper.ts,
82+
now,
8183
ExchangeVendor.BITMAX,
8284
bitmaxOrderBookDataWrapper.bids.map {
8385
OrderBookUnit(
@@ -142,6 +144,7 @@ class BitmaxWebsocketClient : ExchangeWebsocketClient {
142144
}
143145

144146
val currentOrderBook = prevOrderBook.copy(
147+
eventTime = orderBook.eventTime,
145148
asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
146149
bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
147150
)

reactive-crypto-bitmax/src/test/java/com/njkim/reactivecrypto/bitmax/BitmaxWebsocketClientJavaTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class BitmaxWebsocketClientJavaTest {
3333
@Test
3434
public void tick_data_subscribe() {
3535
// given
36-
CurrencyPair targetCurrencyPair = CurrencyPair.parse("PAX", "USDT");
36+
CurrencyPair targetCurrencyPair = CurrencyPair.parse("BTC", "USDT");
3737
Flux<TickData> tickDataFlux = new BitmaxWebsocketClient()
3838
.createTradeWebsocket(Collections.singletonList(targetCurrencyPair));
3939

@@ -69,7 +69,7 @@ public void tick_data_subscribe() {
6969
@Test
7070
public void bitmax_orderBook_subscribe() {
7171
// given
72-
CurrencyPair targetCurrencyPair = CurrencyPair.parse("PAX", "USDT");
72+
CurrencyPair targetCurrencyPair = CurrencyPair.parse("BTC", "USDT");
7373
Flux<OrderBook> orderBookFlux = new BitmaxWebsocketClient()
7474
.createDepthSnapshot(Collections.singletonList(targetCurrencyPair));
7575

reactive-crypto-bitmax/src/test/kotlin/com/njkim/reactivecrypto/bitmax/BitmaxRawWebsocketClientTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class BitmaxRawWebsocketClientTest {
2828
@Test
2929
fun `bitmax tick data subscribe`() {
3030
// given
31-
val symbol = CurrencyPair(Currency.PAX, Currency.USDT)
31+
val symbol = CurrencyPair(Currency.BTC, Currency.USDT)
3232
val recentTradeMaxCount = 20
3333
val bitmaxTradeDataFlux = BitmaxRawWebsocketClient()
3434
.createTradeDataFlux(symbol, recentTradeMaxCount)
@@ -63,7 +63,7 @@ class BitmaxRawWebsocketClientTest {
6363
@Test
6464
fun `bitmax orderBook data subscribe`() {
6565
// given
66-
val symbol = CurrencyPair(Currency.PAX, Currency.USDT)
66+
val symbol = CurrencyPair(Currency.BTC, Currency.USDT)
6767
val marketDepthLevel = 20
6868
val bitmaxTradeDataFlux = BitmaxRawWebsocketClient()
6969
.createOrderBookFlux(symbol, marketDepthLevel)

reactive-crypto-bitmax/src/test/kotlin/com/njkim/reactivecrypto/bitmax/BitmaxWebsocketClientTest.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.njkim.reactivecrypto.bitmax
1818

1919
import com.njkim.reactivecrypto.core.common.model.ExchangeVendor
2020
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
21+
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
2122
import mu.KotlinLogging
2223
import org.assertj.core.api.Assertions
2324
import org.junit.Test
@@ -30,7 +31,7 @@ class BitmaxWebsocketClientTest {
3031
@Test
3132
fun `tick data subscribe`() {
3233
// given
33-
val targetCurrencyPair = CurrencyPair.parse("PAX", "USDT")
34+
val targetCurrencyPair = CurrencyPair.parse("BTC", "USDT")
3435
val tickDataFlux = BitmaxWebsocketClient()
3536
.createTradeWebsocket(listOf(targetCurrencyPair))
3637

@@ -66,16 +67,18 @@ class BitmaxWebsocketClientTest {
6667
@Test
6768
fun `orderBook subscribe`() {
6869
// given
69-
val targetCurrencyPair = CurrencyPair.parse("PAX", "USDT")
70+
val targetCurrencyPair = CurrencyPair.parse("BTC", "USDT")
7071
val orderBookFlux = BitmaxWebsocketClient()
7172
.createDepthSnapshot(listOf(targetCurrencyPair))
7273
.doOnNext { log.info { it } }
74+
var prevTimestamp = 0L
7375

7476
// when
7577
StepVerifier.create(orderBookFlux.limitRequest(5))
7678
.expectNextCount(3)
7779
// then
7880
.assertNext {
81+
prevTimestamp = it.eventTime.toEpochMilli()
7982
Assertions.assertThat(it).isNotNull
8083
Assertions.assertThat(it.currencyPair)
8184
.isEqualTo(targetCurrencyPair)
@@ -103,6 +106,8 @@ class BitmaxWebsocketClientTest {
103106
.isGreaterThan(it.bids[1].price)
104107
}
105108
.assertNext {
109+
Assertions.assertThat(prevTimestamp)
110+
.isNotEqualTo(it.eventTime.toEpochMilli())
106111
Assertions.assertThat(it).isNotNull
107112
Assertions.assertThat(it.currencyPair)
108113
.isEqualTo(targetCurrencyPair)

reactive-crypto-idax/src/main/kotlin/com/njkim/reactivecrypto/idax/IdaxWebsocketClient.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.njkim.reactivecrypto.core.common.util.toEpochMilli
2626
import mu.KotlinLogging
2727
import reactor.core.publisher.Flux
2828
import java.math.BigDecimal
29+
import java.time.ZonedDateTime
2930
import java.util.concurrent.ConcurrentHashMap
3031
import java.util.concurrent.atomic.AtomicInteger
3132
import java.util.concurrent.atomic.AtomicLong
@@ -62,10 +63,11 @@ class IdaxWebsocketClient : ExchangeWebsocketClient {
6263
return idaxRawWebsocketClient.createOrderBookChangeFlux(subscribeTargets)
6364
.flatMapIterable { idaxMessageFrame ->
6465
idaxMessageFrame.data.map {
66+
val now = ZonedDateTime.now()
6567
OrderBook(
66-
createTickDataUniqueId(it.timestamp.toEpochMilli()),
68+
createTickDataUniqueId(now.toEpochMilli()),
6769
idaxMessageFrame.currencyPair,
68-
it.timestamp,
70+
now,
6971
ExchangeVendor.IDAX,
7072
it.bids,
7173
it.asks
@@ -117,6 +119,7 @@ class IdaxWebsocketClient : ExchangeWebsocketClient {
117119
}
118120

119121
val currentOrderBook = prevOrderBook.copy(
122+
eventTime = orderBook.eventTime,
120123
asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
121124
bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
122125
)

reactive-crypto-idax/src/test/kotlin/com/njkim/reactivecrypto/idax/IdaxWebsocketClientTest.kt

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.njkim.reactivecrypto.idax
1818

1919
import com.njkim.reactivecrypto.core.common.model.ExchangeVendor
2020
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
21+
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
2122
import mu.KotlinLogging
2223
import org.assertj.core.api.Assertions
2324
import org.junit.Test
@@ -70,12 +71,14 @@ class IdaxWebsocketClientTest {
7071
val orderBookFlux = IdaxWebsocketClient()
7172
.createDepthSnapshot(listOf(targetCurrencyPair))
7273
.doOnNext { log.info { it } }
74+
var prevTimestamp = 0L
7375

7476
// when
7577
StepVerifier.create(orderBookFlux.limitRequest(5))
7678
.expectNextCount(3)
7779
// then
7880
.assertNext {
81+
prevTimestamp = it.eventTime.toEpochMilli()
7982
Assertions.assertThat(it).isNotNull
8083
Assertions.assertThat(it.currencyPair)
8184
.isEqualTo(targetCurrencyPair)
@@ -103,6 +106,8 @@ class IdaxWebsocketClientTest {
103106
.isGreaterThan(it.bids[1].price)
104107
}
105108
.assertNext {
109+
Assertions.assertThat(prevTimestamp)
110+
.isNotEqualTo(it.eventTime.toEpochMilli())
106111
Assertions.assertThat(it).isNotNull
107112
Assertions.assertThat(it.currencyPair)
108113
.isEqualTo(targetCurrencyPair)

reactive-crypto-kraken/src/main/kotlin/com/njkim/reactivecrypto/kraken/KrakenWebsocketClient.kt

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class KrakenWebsocketClient : AbstractExchangeWebsocketClient() {
146146
}
147147

148148
val currentOrderBook = prevOrderBook.copy(
149+
eventTime = orderBook.eventTime,
149150
asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
150151
bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
151152
)

reactive-crypto-okex/src/main/kotlin/com/njkim/reactivecrypto/okex/OkexWebsocketClient.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import reactor.core.publisher.Flux
3939
import reactor.netty.http.client.HttpClient
4040
import java.math.BigDecimal
4141
import java.nio.charset.Charset
42+
import java.time.ZonedDateTime
4243
import java.util.concurrent.ConcurrentHashMap
4344
import kotlin.streams.toList
4445

@@ -77,10 +78,11 @@ class OkexWebsocketClient : AbstractExchangeWebsocketClient() {
7778
.map { it.data }
7879
.flatMapIterable {
7980
it.map { okexTickData ->
81+
val now = ZonedDateTime.now()
8082
OrderBook(
81-
"${okexTickData.instrumentId}${okexTickData.timestamp.toEpochMilli()}",
83+
"${okexTickData.instrumentId}${now.toEpochMilli()}",
8284
okexTickData.instrumentId,
83-
okexTickData.timestamp,
85+
now,
8486
ExchangeVendor.OKEX,
8587
okexTickData.getBids().toMutableList(),
8688
okexTickData.getAsks().toMutableList()
@@ -132,6 +134,7 @@ class OkexWebsocketClient : AbstractExchangeWebsocketClient() {
132134
}
133135

134136
val currentOrderBook = prevOrderBook.copy(
137+
eventTime = orderBook.eventTime,
135138
asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
136139
bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
137140
)

reactive-crypto-okex/src/test/kotlin/com/njkim/reactivecrypto/okex/OkexWebsocketClientTest.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.njkim.reactivecrypto.okex
22

33
import com.njkim.reactivecrypto.core.common.model.ExchangeVendor
44
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
5+
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
56
import mu.KotlinLogging
67
import org.assertj.core.api.Assertions
78
import org.junit.Test
@@ -50,13 +51,16 @@ class OkexWebsocketClientTest {
5051
fun `okex orderBook subscribe`() {
5152
// given
5253
val targetCurrencyPair = CurrencyPair.parse("BTC", "USDT")
53-
val okexOrderBookFlux = OkexWebsocketClient()
54+
val orderBookFlux = OkexWebsocketClient()
5455
.createDepthSnapshot(listOf(targetCurrencyPair))
56+
var prevTimestamp = 0L
5557

5658
// when
57-
StepVerifier.create(okexOrderBookFlux.limitRequest(2))
59+
StepVerifier.create(orderBookFlux.limitRequest(5))
60+
.expectNextCount(3)
5861
// then
5962
.assertNext {
63+
prevTimestamp = it.eventTime.toEpochMilli()
6064
Assertions.assertThat(it).isNotNull
6165
Assertions.assertThat(it.currencyPair)
6266
.isEqualTo(targetCurrencyPair)
@@ -84,6 +88,8 @@ class OkexWebsocketClientTest {
8488
.isGreaterThan(it.bids[1].price)
8589
}
8690
.assertNext {
91+
Assertions.assertThat(prevTimestamp)
92+
.isNotEqualTo(it.eventTime.toEpochMilli())
8793
Assertions.assertThat(it).isNotNull
8894
Assertions.assertThat(it.currencyPair)
8995
.isEqualTo(targetCurrencyPair)

0 commit comments

Comments
 (0)