Skip to content

Commit 3f446de

Browse files
authored
Merge pull request #41 from namjug-kim/develop
Develop
2 parents 29c0ed3 + 667c7b2 commit 3f446de

File tree

6 files changed

+141
-30
lines changed

6 files changed

+141
-30
lines changed

reactive-crypto-coineal/src/main/kotlin/com/njkim/reactivecrypto/coineal/CoinealRawWebsocketClient.kt

+32-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
2222
import com.njkim.reactivecrypto.coineal.model.CoinealMessageFrame
2323
import com.njkim.reactivecrypto.coineal.model.CoinealOrderBook
2424
import com.njkim.reactivecrypto.coineal.model.CoinealTickDataWrapper
25+
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
26+
import com.njkim.reactivecrypto.core.netty.HeartBeatHandler
2527
import io.netty.buffer.ByteBuf
2628
import io.netty.buffer.ByteBufInputStream
2729
import io.netty.channel.ChannelHandlerContext
@@ -34,6 +36,8 @@ import reactor.core.publisher.Flux
3436
import reactor.core.publisher.toFlux
3537
import reactor.netty.http.client.HttpClient
3638
import java.nio.charset.Charset
39+
import java.time.ZonedDateTime
40+
import java.util.concurrent.TimeUnit
3741
import java.util.zip.GZIPInputStream
3842

3943
class CoinealRawWebsocketClient(
@@ -51,7 +55,20 @@ class CoinealRawWebsocketClient(
5155
.toFlux()
5256

5357
return HttpClient.create()
54-
.tcpConfiguration { tcp -> tcp.doOnConnected { connection -> connection.addHandler(GzipDecoder()) } }
58+
.tcpConfiguration { tcp ->
59+
tcp.doOnConnected { connection ->
60+
connection.addHandler(GzipDecoder())
61+
connection.addHandler(
62+
"heartBeat",
63+
HeartBeatHandler(
64+
false,
65+
10,
66+
TimeUnit.SECONDS,
67+
5
68+
) { "{\"ping\": ${ZonedDateTime.now().toEpochMilli()}}" }
69+
)
70+
}
71+
}
5572
.websocket()
5673
.uri(baseUri)
5774
.handle { inbound, outbound ->
@@ -73,7 +90,20 @@ class CoinealRawWebsocketClient(
7390
.toFlux()
7491

7592
return HttpClient.create()
76-
.tcpConfiguration { tcp -> tcp.doOnConnected { connection -> connection.addHandler(GzipDecoder()) } }
93+
.tcpConfiguration { tcp ->
94+
tcp.doOnConnected { connection ->
95+
connection.addHandler(GzipDecoder())
96+
connection.addHandler(
97+
"heartBeat",
98+
HeartBeatHandler(
99+
false,
100+
10,
101+
TimeUnit.SECONDS,
102+
5
103+
) { "{\"ping\": ${ZonedDateTime.now().toEpochMilli()}}" }
104+
)
105+
}
106+
}
77107
.websocket()
78108
.uri(baseUri)
79109
.handle { inbound, outbound ->
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright 2019 namjug-kim
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.njkim.reactivecrypto.core.common.exception
18+
19+
class HeartBeatFailException(message: String) : RuntimeException(message)

reactive-crypto-core/src/main/kotlin/com/njkim/reactivecrypto/core/common/model/currency/Currency.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ enum class Currency {
3535
SWC, LINDA, FML, GUNTHY, VIN, GOB, EVY, MGO, FUNDZ, UQC, CTC, MRPH, PCCM,
3636
PXL, IOTW, NRC, ZAT, POE, APL, VEX, VITAE, FTX, ABTC, TSL, SCRL, NIX, TMC, MCAN,
3737
LGD, ZEST, ALP, CXP, ZING, UCN, RET, WIRE, GUBI, LAD, EVNS, CAPP, ZBB, KZE, DBX, NTY,
38-
GFC, GMB, FLC, BTD,
38+
GFC, GMB, FLC, BTD, SKM, MALL, RFOX, FN, IZI, NVL, YTA, IPWT, AT, HVCC, CL,
39+
OA, VSS, STCC, CTA, CPX, KDH, LAMB, IFX, GCS, BRC, O2P, BHD, IVO, BTMC, BKBT,
40+
TCLB, NEAL,
3941

4042
M19,
4143

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2019 namjug-kim
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.njkim.reactivecrypto.core.netty
18+
19+
import com.njkim.reactivecrypto.core.common.exception.HeartBeatFailException
20+
import io.netty.channel.ChannelHandlerContext
21+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
22+
import io.netty.handler.timeout.IdleState
23+
import io.netty.handler.timeout.IdleStateEvent
24+
import io.netty.handler.timeout.IdleStateHandler
25+
import java.util.concurrent.TimeUnit
26+
27+
/**
28+
* @property pingTime ping message will be triggered when no write was performed for the specified
29+
* period of time. Specify {@code 0} to disable.
30+
*/
31+
class HeartBeatHandler(
32+
observeOutput: Boolean,
33+
private val pingTime: Long,
34+
unit: TimeUnit,
35+
pongTimeout: Long,
36+
private val pingMessage: () -> String
37+
) : IdleStateHandler(observeOutput, pingTime + pongTimeout, pingTime, 0, unit) {
38+
39+
@Throws(Exception::class)
40+
override fun channelIdle(ctx: ChannelHandlerContext, evt: IdleStateEvent) {
41+
if (evt.state() == IdleState.READER_IDLE) {
42+
ctx.close()
43+
throw HeartBeatFailException("")
44+
} else if (evt.state() == IdleState.WRITER_IDLE || evt.state() == IdleState.ALL_IDLE) {
45+
ctx.channel().writeAndFlush(TextWebSocketFrame(pingMessage()))
46+
}
47+
}
48+
}

reactive-crypto-huobikorea/src/main/kotlin/com/njkim/reactivecrypto/huobikorea/HuobiKoreaWebsocketClient.kt

+30-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
2525
import com.njkim.reactivecrypto.core.common.model.order.OrderBook
2626
import com.njkim.reactivecrypto.core.common.model.order.TickData
2727
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
28+
import com.njkim.reactivecrypto.core.netty.HeartBeatHandler
2829
import com.njkim.reactivecrypto.huobikorea.model.HuobiKoreaTickDataWrapper
2930
import com.njkim.reactivecrypto.huobikorea.model.HuobiOrderBook
3031
import com.njkim.reactivecrypto.huobikorea.model.HuobiSubscribeResponse
@@ -40,6 +41,7 @@ import reactor.core.publisher.Flux
4041
import reactor.netty.http.client.HttpClient
4142
import java.nio.charset.Charset
4243
import java.time.ZonedDateTime
44+
import java.util.concurrent.TimeUnit
4345
import java.util.zip.GZIPInputStream
4446
import kotlin.streams.toList
4547

@@ -62,7 +64,20 @@ class HuobiKoreaWebsocketClient : AbstractExchangeWebsocketClient() {
6264

6365
return HttpClient.create()
6466
.wiretap(log.isDebugEnabled)
65-
.tcpConfiguration { tcp -> tcp.doOnConnected { connection -> connection.addHandler(GzipDecoder()) } }
67+
.tcpConfiguration { tcp ->
68+
tcp.doOnConnected { connection ->
69+
connection.addHandler(GzipDecoder())
70+
connection.addHandler(
71+
"heartBeat",
72+
HeartBeatHandler(
73+
false,
74+
2000,
75+
TimeUnit.MILLISECONDS,
76+
1000
77+
) { "{\"ping\": ${ZonedDateTime.now().toEpochMilli()}}" }
78+
)
79+
}
80+
}
6681
.websocket()
6782
.uri(baseUri)
6883
.handle { inbound, outbound ->
@@ -93,7 +108,20 @@ class HuobiKoreaWebsocketClient : AbstractExchangeWebsocketClient() {
93108

94109
return HttpClient.create()
95110
.wiretap(log.isDebugEnabled)
96-
.tcpConfiguration { tcp -> tcp.doOnConnected { connection -> connection.addHandler(GzipDecoder()) } }
111+
.tcpConfiguration { tcp ->
112+
tcp.doOnConnected { connection ->
113+
connection.addHandler(GzipDecoder())
114+
connection.addHandler(
115+
"heartBeat",
116+
HeartBeatHandler(
117+
false,
118+
2000,
119+
TimeUnit.MILLISECONDS,
120+
1000
121+
) { "{\"ping\": ${ZonedDateTime.now().toEpochMilli()}}" }
122+
)
123+
}
124+
}
97125
.websocket()
98126
.uri(baseUri)
99127
.handle { inbound, outbound ->

reactive-crypto-idax/src/main/kotlin/com/njkim/reactivecrypto/idax/IdaxRawWebsocketClient.kt

+9-25
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@ package com.njkim.reactivecrypto.idax
1919
import com.fasterxml.jackson.databind.ObjectMapper
2020
import com.fasterxml.jackson.module.kotlin.readValue
2121
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
22+
import com.njkim.reactivecrypto.core.netty.HeartBeatHandler
2223
import com.njkim.reactivecrypto.idax.model.IdaxMessageFrame
2324
import com.njkim.reactivecrypto.idax.model.IdaxOrderBook
2425
import com.njkim.reactivecrypto.idax.model.IdaxTickData
25-
import io.netty.channel.ChannelHandlerContext
26-
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
27-
import io.netty.handler.timeout.IdleState
28-
import io.netty.handler.timeout.IdleStateEvent
29-
import io.netty.handler.timeout.IdleStateHandler
3026
import mu.KotlinLogging
3127
import reactor.core.publisher.Flux
3228
import reactor.core.publisher.toFlux
@@ -62,8 +58,10 @@ class IdaxRawWebsocketClient(
6258
return HttpClient.create()
6359
.tcpConfiguration { tcp ->
6460
tcp.doOnConnected {
65-
// okex websocket 은 30초동안 응답이 없는경우 해당 연결을 끊는다. 이를 막기 위해서 20초마다 ping 요청을 보낸다.
66-
it.addHandlerFirst("heartBeat", HeartBeatHandler(false, 2, 0, 0, TimeUnit.SECONDS))
61+
it.addHandlerFirst(
62+
"heartBeat",
63+
HeartBeatHandler(false, 2, TimeUnit.SECONDS, 1) { "{\"event\":\"ping\"}" }
64+
)
6765
}
6866
}
6967
.websocket()
@@ -96,8 +94,10 @@ class IdaxRawWebsocketClient(
9694
return HttpClient.create()
9795
.tcpConfiguration { tcp ->
9896
tcp.doOnConnected {
99-
// okex websocket 은 30초동안 응답이 없는경우 해당 연결을 끊는다. 이를 막기 위해서 20초마다 ping 요청을 보낸다.
100-
it.addHandlerFirst("heartBeat", HeartBeatHandler(false, 2, 0, 0, TimeUnit.SECONDS))
97+
it.addHandlerFirst(
98+
"heartBeat",
99+
HeartBeatHandler(false, 2, TimeUnit.SECONDS, 1) { "{\"event\":\"ping\"}" }
100+
)
101101
}
102102
}
103103
.websocket()
@@ -138,20 +138,4 @@ class IdaxRawWebsocketClient(
138138
}
139139
.map { objectMapper.readValue<IdaxMessageFrame<List<IdaxOrderBook>>>(it) }
140140
}
141-
142-
private inner class HeartBeatHandler(
143-
observeOutput: Boolean,
144-
readerIdleTime: Long,
145-
writerIdleTime: Long,
146-
allIdleTime: Long,
147-
unit: TimeUnit
148-
) : IdleStateHandler(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit) {
149-
150-
@Throws(Exception::class)
151-
override fun channelIdle(ctx: ChannelHandlerContext, evt: IdleStateEvent) {
152-
if (evt.state() == IdleState.READER_IDLE) {
153-
ctx.channel().writeAndFlush(TextWebSocketFrame("{\"event\":\"ping\"}"))
154-
}
155-
}
156-
}
157141
}

0 commit comments

Comments
 (0)