Skip to content

Commit 9c1e93d

Browse files
authored
Finalize new transport API and improve stability of QUIC implementation (#290)
* Finalize new Transport API * Adapt core implementation to the latest change in transport API and improve code-organization a bit * Adapt transport implementations to the latest change in transport API, simplify and improve some parts * Enable all transport tests * Fix old ktor tcp transport implementation * Ignore tests for old transports except for local
1 parent 10be4ae commit 9c1e93d

File tree

73 files changed

+1406
-1665
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1406
-1665
lines changed

ktor-plugins/ktor-client-rsocket/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,7 +26,6 @@ import io.rsocket.kotlin.*
2626
import io.rsocket.kotlin.core.*
2727
import io.rsocket.kotlin.transport.*
2828
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
29-
import kotlinx.coroutines.*
3029
import kotlin.coroutines.*
3130

3231
private val RSocketSupportConfigKey = AttributeKey<RSocketSupportConfig.Internal>("RSocketSupportConfig")
@@ -66,9 +65,7 @@ private class RSocketSupportTarget(
6665
override val coroutineContext: CoroutineContext get() = client.coroutineContext
6766

6867
@RSocketTransportApi
69-
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
70-
client.webSocket(request) {
71-
handler.handleKtorWebSocketConnection(this)
72-
}
68+
override suspend fun connectClient(): RSocketConnection {
69+
return KtorWebSocketConnection(client.webSocketSession(request))
7370
}
7471
}

ktor-plugins/ktor-server-rsocket/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ import io.rsocket.kotlin.*
2424
import io.rsocket.kotlin.core.*
2525
import io.rsocket.kotlin.transport.*
2626
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
27+
import kotlinx.coroutines.*
2728

2829
private val RSocketSupportConfigKey = AttributeKey<RSocketSupportConfig.Internal>("RSocketSupportConfig")
2930

@@ -54,8 +55,8 @@ internal fun Route.rSocketHandler(acceptor: ConnectionAcceptor): suspend Default
5455
val config = application.attributes.getOrNull(RSocketSupportConfigKey)
5556
?: error("Plugin RSocketSupport is not installed. Consider using `install(RSocketSupport)` in server config first.")
5657

57-
val handler = config.server.createHandler(acceptor)
5858
return {
59-
handler.handleKtorWebSocketConnection(this)
59+
config.server.acceptConnection(acceptor, KtorWebSocketConnection(this))
60+
awaitCancellation()
6061
}
6162
}

rsocket-core/api/rsocket-core.api

+6-13
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,9 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
222222
}
223223

224224
public final class io/rsocket/kotlin/core/RSocketServer {
225+
public final fun acceptConnection (Lio/rsocket/kotlin/ConnectionAcceptor;Lio/rsocket/kotlin/transport/RSocketConnection;)V
225226
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
226227
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
227-
public final fun createHandler (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketConnectionHandler;
228228
public final fun startServer (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
229229
}
230230

@@ -751,31 +751,24 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
751751
}
752752

753753
public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : kotlinx/coroutines/CoroutineScope {
754-
public abstract fun connectClient (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;)Lkotlinx/coroutines/Job;
754+
public abstract fun connectClient (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
755755
}
756756

757-
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection {
758-
}
759-
760-
public abstract interface class io/rsocket/kotlin/transport/RSocketConnectionHandler {
761-
public abstract fun handleConnection (Lio/rsocket/kotlin/transport/RSocketConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
757+
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection : kotlinx/coroutines/CoroutineScope {
762758
}
763759

764760
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection : io/rsocket/kotlin/transport/RSocketConnection {
765761
public abstract fun acceptStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
766762
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
767763
}
768764

769-
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : java/lang/AutoCloseable {
770-
public abstract fun close ()V
771-
public abstract fun isClosedForSend ()Z
765+
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : kotlinx/coroutines/CoroutineScope {
772766
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
773767
public abstract fun sendFrame (Lkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
774768
public abstract fun setSendPriority (I)V
775769
}
776770

777771
public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
778-
public abstract fun isClosedForSend ()Z
779772
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
780773
public abstract fun sendFrame (ILkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
781774
}
@@ -784,7 +777,7 @@ public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstanc
784777
}
785778

786779
public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : kotlinx/coroutines/CoroutineScope {
787-
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
780+
public abstract fun startServer (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
788781
}
789782

790783
public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
@@ -809,7 +802,7 @@ public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
809802
}
810803

811804
public final class io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue {
812-
public fun <init> (I)V
805+
public fun <init> ()V
813806
public final fun cancel ()V
814807
public final fun close ()V
815808
public final fun dequeueFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentContext.kt

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,23 +20,16 @@ import io.rsocket.kotlin.frame.*
2020
import io.rsocket.kotlin.frame.io.*
2121
import io.rsocket.kotlin.keepalive.*
2222
import io.rsocket.kotlin.payload.*
23-
import io.rsocket.kotlin.transport.*
2423
import kotlinx.io.*
2524

2625
// send/receive setup, resume, resume ok, lease, error
27-
@RSocketTransportApi
2826
internal abstract class ConnectionEstablishmentContext(
29-
private val frameCodec: FrameCodec,
27+
protected val frameCodec: FrameCodec,
3028
) {
31-
protected abstract suspend fun receiveFrameRaw(): Buffer?
32-
protected abstract suspend fun sendFrame(frame: Buffer)
33-
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
29+
protected abstract suspend fun receiveConnectionFrameRaw(): Buffer?
30+
protected abstract suspend fun sendConnectionFrameRaw(frame: Buffer)
3431

35-
// only setup|lease|resume|resume_ok|error frames
36-
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
37-
expectedStreamId = 0,
38-
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
39-
)
32+
protected suspend fun sendFrameConnectionFrame(frame: Frame): Unit = sendConnectionFrameRaw(frameCodec.encodeFrame(frame))
4033

4134
suspend fun sendSetup(
4235
version: Version,
@@ -45,5 +38,11 @@ internal abstract class ConnectionEstablishmentContext(
4538
resumeToken: Buffer?,
4639
payloadMimeType: PayloadMimeType,
4740
payload: Payload,
48-
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
41+
): Unit = sendFrameConnectionFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
42+
43+
// only setup|lease|resume|resume_ok|error frames
44+
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
45+
expectedStreamId = 0,
46+
frame = receiveConnectionFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
47+
)
4948
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentHandler.kt

-107
This file was deleted.

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,20 +18,15 @@ package io.rsocket.kotlin.connection
1818

1919
import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.frame.*
21-
import io.rsocket.kotlin.keepalive.*
2221
import io.rsocket.kotlin.operation.*
23-
import io.rsocket.kotlin.transport.*
2422
import kotlinx.coroutines.*
2523
import kotlinx.io.*
26-
import kotlin.coroutines.*
2724

28-
@RSocketTransportApi
2925
internal class ConnectionInbound(
30-
// requestContext
31-
override val coroutineContext: CoroutineContext,
26+
private val requestsScope: CoroutineScope,
3227
private val responder: RSocket,
3328
private val keepAliveHandler: KeepAliveHandler,
34-
) : CoroutineScope {
29+
) {
3530
fun handleFrame(frame: Frame): Unit = when (frame) {
3631
is MetadataPushFrame -> receiveMetadataPush(frame.metadata)
3732
is KeepAliveFrame -> receiveKeepAlive(frame.respond, frame.data, frame.lastPosition)
@@ -42,9 +37,9 @@ internal class ConnectionInbound(
4237
}
4338

4439
private fun receiveMetadataPush(metadata: Buffer) {
45-
launch {
40+
requestsScope.launch {
4641
responder.metadataPush(metadata)
47-
}.invokeOnCompletion { metadata.close() }
42+
}.invokeOnCompletion { metadata.clear() }
4843
}
4944

5045
@Suppress("UNUSED_PARAMETER") // will be used later

0 commit comments

Comments
 (0)