Skip to content

Commit f277d3f

Browse files
committed
Adapt netty quic transport implementation to the latest change in transport API and SIGNIFICANTLY rework it, improving the correctness of the implementation
1 parent fcf9063 commit f277d3f

File tree

9 files changed

+316
-431
lines changed

9 files changed

+316
-431
lines changed

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport.kt

+27-18
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,7 +59,7 @@ private class NettyQuicClientTransportBuilderImpl : NettyQuicClientTransportBuil
5959
private var bootstrap: (Bootstrap.() -> Unit)? = null
6060
private var codec: (QuicClientCodecBuilder.() -> Unit)? = null
6161
private var ssl: (QuicSslContextBuilder.() -> Unit)? = null
62-
private var quicBootstrap: (QuicChannelBootstrap.() -> Unit)? = null
62+
private var quicBootstrap: QuicChannelBootstrap.() -> Unit = { }
6363

6464
override fun channel(cls: KClass<out DatagramChannel>) {
6565
this.channelFactory = ReflectiveChannelFactory(cls.java)
@@ -114,24 +114,18 @@ private class NettyQuicClientTransportBuilderImpl : NettyQuicClientTransportBuil
114114
return NettyQuicClientTransportImpl(
115115
coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(),
116116
bootstrap = bootstrap,
117-
quicBootstrap = quicBootstrap,
118-
manageBootstrap = manageEventLoopGroup
119-
)
117+
quicBootstrap = quicBootstrap
118+
).also {
119+
if (manageEventLoopGroup) it.shutdownOnCancellation(bootstrap.config().group())
120+
}
120121
}
121122
}
122123

123124
private class NettyQuicClientTransportImpl(
124125
override val coroutineContext: CoroutineContext,
125126
private val bootstrap: Bootstrap,
126-
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
127-
manageBootstrap: Boolean,
127+
private val quicBootstrap: QuicChannelBootstrap.() -> Unit,
128128
) : NettyQuicClientTransport {
129-
init {
130-
if (manageBootstrap) callOnCancellation {
131-
bootstrap.config().group().shutdownGracefully().awaitFuture()
132-
}
133-
}
134-
135129
override fun target(remoteAddress: InetSocketAddress): NettyQuicClientTargetImpl = NettyQuicClientTargetImpl(
136130
coroutineContext = coroutineContext.supervisorContext(),
137131
bootstrap = bootstrap,
@@ -146,14 +140,29 @@ private class NettyQuicClientTransportImpl(
146140
private class NettyQuicClientTargetImpl(
147141
override val coroutineContext: CoroutineContext,
148142
private val bootstrap: Bootstrap,
149-
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
143+
private val quicBootstrap: QuicChannelBootstrap.() -> Unit,
150144
private val remoteAddress: SocketAddress,
151145
) : RSocketClientTarget {
152146
@RSocketTransportApi
153-
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
154-
QuicChannel.newBootstrap(bootstrap.bind().awaitChannel()).also { quicBootstrap?.invoke(it) }
147+
override suspend fun connectClient(): RSocketConnection {
148+
currentCoroutineContext().ensureActive()
149+
coroutineContext.ensureActive()
150+
151+
val channel = QuicChannel.newBootstrap(
152+
bootstrap.bind().awaitChannel()
153+
)
154+
.apply(quicBootstrap)
155155
.handler(
156-
NettyQuicConnectionInitializer(handler, coroutineContext, isClient = true)
157-
).remoteAddress(remoteAddress).connect().awaitFuture()
156+
NettyQuicConnectionInitializer(
157+
parentContext = coroutineContext,
158+
onConnection = null
159+
)
160+
)
161+
.streamHandler(NettyQuicStreamInitializer)
162+
.remoteAddress(remoteAddress)
163+
.connect()
164+
.awaitFuture()
165+
166+
return channel.attr(NettyQuicConnection.ATTRIBUTE).get()
158167
}
159168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2015-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.quic
18+
19+
import io.netty.channel.*
20+
import io.netty.incubator.codec.quic.*
21+
import io.netty.util.*
22+
import io.rsocket.kotlin.internal.io.*
23+
import io.rsocket.kotlin.transport.*
24+
import io.rsocket.kotlin.transport.netty.internal.*
25+
import kotlinx.coroutines.*
26+
import kotlinx.coroutines.channels.Channel
27+
import kotlin.coroutines.*
28+
29+
@RSocketTransportApi
30+
internal class NettyQuicConnection(
31+
parentContext: CoroutineContext,
32+
private val channel: QuicChannel,
33+
private val isClient: Boolean,
34+
) : RSocketMultiplexedConnection, ChannelInboundHandlerAdapter() {
35+
override val coroutineContext: CoroutineContext = parentContext.childContext() + channel.eventLoop().asCoroutineDispatcher()
36+
private val streamsContext = coroutineContext.supervisorContext()
37+
38+
private val inboundStreams = Channel<RSocketMultiplexedConnection.Stream>(Channel.UNLIMITED) {
39+
it.cancel("Connection closed")
40+
}
41+
42+
init {
43+
@OptIn(DelicateCoroutinesApi::class)
44+
launch(start = CoroutineStart.ATOMIC) {
45+
try {
46+
awaitCancellation()
47+
} finally {
48+
nonCancellable {
49+
inboundStreams.cancel()
50+
// stop streams first
51+
streamsContext.job.cancelAndJoin()
52+
channel.close().awaitFuture()
53+
// close UDP channel
54+
if (isClient) channel.parent().close().awaitFuture()
55+
}
56+
}
57+
}
58+
}
59+
60+
fun initStreamChannel(streamChannel: QuicStreamChannel) {
61+
val stream = NettyQuicStream(streamsContext, streamChannel)
62+
streamChannel.attr(NettyQuicStream.ATTRIBUTE).set(stream)
63+
streamChannel.pipeline().addLast("rsocket-quic-stream", stream)
64+
65+
if (streamChannel.isLocalCreated) return
66+
67+
if (inboundStreams.trySend(stream).isFailure) stream.cancel("Connection closed")
68+
}
69+
70+
override fun channelInactive(ctx: ChannelHandlerContext) {
71+
cancel("Channel is not active")
72+
ctx.fireChannelInactive()
73+
}
74+
75+
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable?) {
76+
cancel("exceptionCaught", cause)
77+
}
78+
79+
override suspend fun createStream(): RSocketMultiplexedConnection.Stream {
80+
val streamChannel = channel.createStream(QuicStreamType.BIDIRECTIONAL, NettyQuicStreamInitializer).awaitFuture()
81+
return streamChannel.attr(NettyQuicStream.ATTRIBUTE).get()
82+
}
83+
84+
override suspend fun acceptStream(): RSocketMultiplexedConnection.Stream? {
85+
return inboundStreams.receiveCatching().getOrNull()
86+
}
87+
88+
companion object {
89+
val ATTRIBUTE: AttributeKey<NettyQuicConnection> = AttributeKey.newInstance<NettyQuicConnection>("rsocket-quic-connection")
90+
}
91+
}
92+
93+
@RSocketTransportApi
94+
internal class NettyQuicConnectionInitializer(
95+
private val parentContext: CoroutineContext,
96+
private val onConnection: ((RSocketConnection) -> Unit)?,
97+
) : ChannelInitializer<QuicChannel>() {
98+
override fun initChannel(channel: QuicChannel) {
99+
val connection = NettyQuicConnection(
100+
parentContext = parentContext,
101+
channel = channel,
102+
isClient = onConnection == null
103+
)
104+
channel.attr(NettyQuicConnection.ATTRIBUTE).set(connection)
105+
106+
channel.pipeline().addLast(
107+
"rsocket-connection",
108+
connection
109+
)
110+
111+
onConnection?.invoke(connection)
112+
}
113+
}

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnectionHandler.kt

-134
This file was deleted.

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnectionInitializer.kt

-37
This file was deleted.

0 commit comments

Comments
 (0)