Skip to content

Commit d3d0e30

Browse files
committed
JVMCBC-1697 Global KV connections ignore negotiated features
Motivation ---------- KeyValueMessageHandler.channelActive() was accessing the "negotiated features" attribute before FeatureNegotiatingHandler set it. Modifications ------------- Insert a barrier that absorbs the channelActive event from pipelined handshake handlers, and defers it until the handshake is complete. Change-Id: I2ad233649e92b7d47eacc416a563e54e7f8a1583 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/235399 Reviewed-by: Michael Reiche <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 0c7a8c0 commit d3d0e30

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

core-io/src/main/java/com/couchbase/client/core/endpoint/KeyValueEndpoint.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.couchbase.client.core.io.netty.TrafficCaptureHandler;
2323
import com.couchbase.client.core.io.netty.kv.ErrorMapLoadingHandler;
2424
import com.couchbase.client.core.io.netty.kv.FeatureNegotiatingHandler;
25+
import com.couchbase.client.core.io.netty.kv.HandshakeBarrier;
2526
import com.couchbase.client.core.io.netty.kv.KeyValueMessageHandler;
2627
import com.couchbase.client.core.io.netty.kv.MemcacheProtocolDecodeHandler;
2728
import com.couchbase.client.core.io.netty.kv.MemcacheProtocolVerificationHandler;
@@ -105,6 +106,7 @@ public void init(BaseEndpoint endpoint, ChannelPipeline pipeline) {
105106
authenticator.authKeyValueConnection(ctx, pipeline);
106107

107108
bucketname.ifPresent(s -> pipeline.addLast(new SelectBucketHandler(ctx, s)));
109+
pipeline.addLast(new HandshakeBarrier());
108110
pipeline.addLast(new KeyValueMessageHandler(endpoint, ctx, bucketname));
109111
}
110112

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.couchbase.client.core.io.netty.kv;
18+
19+
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
20+
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
21+
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
22+
23+
import java.net.SocketAddress;
24+
25+
import static java.util.Objects.requireNonNull;
26+
27+
/**
28+
* Waits for the pipelined handshake sequence to complete before propagating
29+
* the `channelActive` signal.
30+
* <p>
31+
* Necessary because {@link KeyValueMessageHandler#channelActive}
32+
* depends on channel attributes set by pipelined handshake handlers.
33+
* <p>
34+
* @implNote Some handshake handlers are implicit barriers
35+
* (specifically {@link SelectBucketHandler} and {@link SaslAuthenticationHandler}
36+
* with a multistep mechanism), but they are not always present.
37+
*/
38+
public class HandshakeBarrier extends ChannelDuplexHandler {
39+
40+
private ChannelPromise interceptedConnectPromise;
41+
42+
@Override
43+
public void connect(
44+
ChannelHandlerContext ctx,
45+
SocketAddress remoteAddress,
46+
SocketAddress localAddress,
47+
ChannelPromise promise
48+
) {
49+
interceptedConnectPromise = requireNonNull(promise);
50+
51+
ChannelPromise downstream = ctx.newPromise();
52+
downstream.addListener(f -> {
53+
if (f.isSuccess()) {
54+
interceptedConnectPromise.trySuccess();
55+
ctx.pipeline().remove(this);
56+
ctx.fireChannelActive();
57+
58+
} else {
59+
interceptedConnectPromise.tryFailure(f.cause());
60+
}
61+
});
62+
63+
ctx.connect(remoteAddress, localAddress, downstream);
64+
}
65+
66+
@Override
67+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
68+
interceptedConnectPromise.tryFailure(cause);
69+
ctx.fireExceptionCaught(cause);
70+
}
71+
72+
@Override
73+
public void channelActive(ChannelHandlerContext ctx) {
74+
// Prevent this signal from reaching non-pipelined handlers,
75+
// because the pipelined handshake handlers might still be waiting for responses.
76+
}
77+
}

0 commit comments

Comments
 (0)