Skip to content

Commit 8a83f4a

Browse files
committed
chore: wire up x-goog-spanner-request-id for BatchCreateSessions
Wires up x-goog-spanner-request-id for piecemeal addition per gRPC method, starting with BatchCreateSessions. This change involves creating TransactionOption, UpdateOption variants that allow holding the request id. Updates googleapis#3537
1 parent 42cc961 commit 8a83f4a

File tree

8 files changed

+239
-5
lines changed

8 files changed

+239
-5
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import com.google.common.util.concurrent.ListenableFuture;
2828
import com.google.spanner.v1.BatchWriteResponse;
2929
import io.opentelemetry.api.common.Attributes;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.concurrent.atomic.AtomicInteger;
3033
import javax.annotation.Nullable;
3134

3235
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +43,8 @@ class DatabaseClientImpl implements DatabaseClient {
4043
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
4144
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
4245
@VisibleForTesting final boolean useMultiplexedSessionForRW;
46+
private final int dbId;
47+
private final AtomicInteger nthRequest;
4348

4449
final boolean useMultiplexedSessionBlindWrite;
4550

@@ -86,6 +91,15 @@ class DatabaseClientImpl implements DatabaseClient {
8691
this.tracer = tracer;
8792
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
8893
this.commonAttributes = commonAttributes;
94+
95+
this.dbId = this.dbIdFromClientId(this.clientId);
96+
this.nthRequest = new AtomicInteger(0);
97+
}
98+
99+
private int dbIdFromClientId(String clientId) {
100+
int i = clientId.indexOf("-");
101+
String strWithValue = clientId.substring(i + 1);
102+
return Integer.parseInt(strWithValue);
89103
}
90104

91105
@VisibleForTesting
@@ -179,8 +193,22 @@ public CommitResponse writeAtLeastOnceWithOptions(
179193
return getMultiplexedSessionDatabaseClient()
180194
.writeAtLeastOnceWithOptions(mutations, options);
181195
}
196+
197+
int nthRequest = this.nextNthRequest();
198+
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
199+
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0);
200+
182201
return runWithSessionRetry(
183-
session -> session.writeAtLeastOnceWithOptions(mutations, options));
202+
(session) -> {
203+
reqId.incrementAttempt();
204+
// TODO: Update the channelId depending on the session that is inferred.
205+
ArrayList<TransactionOption> allOptions = new ArrayList<>();
206+
allOptions.add(new Options.RequestIdOption(reqId));
207+
allOptions.addAll(Arrays.asList(options));
208+
209+
return session.writeAtLeastOnceWithOptions(
210+
mutations, allOptions.toArray(new TransactionOption[0]));
211+
});
184212
} catch (RuntimeException e) {
185213
span.setStatus(e);
186214
throw e;
@@ -189,6 +217,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
189217
}
190218
}
191219

220+
private int nextNthRequest() {
221+
return this.nthRequest.incrementAndGet();
222+
}
223+
192224
@Override
193225
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
194226
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+34
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ void appendToOptions(Options options) {
512512
private RpcOrderBy orderBy;
513513
private RpcLockHint lockHint;
514514
private Boolean lastStatement;
515+
private XGoogSpannerRequestId reqId;
515516

516517
// Construction is via factory methods below.
517518
private Options() {}
@@ -568,6 +569,14 @@ String pageToken() {
568569
return pageToken;
569570
}
570571

572+
boolean hasReqId() {
573+
return reqId != null;
574+
}
575+
576+
XGoogSpannerRequestId reqId() {
577+
return reqId;
578+
}
579+
571580
boolean hasFilter() {
572581
return filter != null;
573582
}
@@ -1018,4 +1027,29 @@ public boolean equals(Object o) {
10181027
return o instanceof LastStatementUpdateOption;
10191028
}
10201029
}
1030+
1031+
static final class RequestIdOption extends InternalOption implements TransactionOption {
1032+
private final XGoogSpannerRequestId reqId;
1033+
1034+
RequestIdOption(XGoogSpannerRequestId reqId) {
1035+
this.reqId = reqId;
1036+
}
1037+
1038+
@Override
1039+
void appendToOptions(Options options) {
1040+
options.reqId = this.reqId;
1041+
}
1042+
1043+
@Override
1044+
public int hashCode() {
1045+
return RequestIdOption.class.hashCode();
1046+
}
1047+
1048+
@Override
1049+
public boolean equals(Object o) {
1050+
// TODO: Examine why the precedent for LastStatementUpdateOption
1051+
// does not check against the actual value.
1052+
return o instanceof RequestIdOption;
1053+
}
1054+
}
10211055
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435
import javax.annotation.concurrent.GuardedBy;
3536

3637
/** Client for creating single sessions and batches of sessions. */
@@ -174,6 +175,12 @@ interface SessionConsumer {
174175
private final DatabaseId db;
175176
private final Attributes commonAttributes;
176177

178+
// SessionClient is created long before a DatabaseClientImpl is created,
179+
// as batch sessions are firstly created then later attached to each Client.
180+
private static AtomicInteger NTH_ID = new AtomicInteger(0);
181+
private final int nthId;
182+
private final AtomicInteger nthRequest;
183+
177184
@GuardedBy("this")
178185
private volatile long sessionChannelCounter;
179186

@@ -186,6 +193,8 @@ interface SessionConsumer {
186193
this.executorFactory = executorFactory;
187194
this.executor = executorFactory.get();
188195
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
196+
this.nthId = SessionClient.NTH_ID.incrementAndGet();
197+
this.nthRequest = new AtomicInteger(0);
189198
}
190199

191200
@Override
@@ -206,19 +215,23 @@ SessionImpl createSession() {
206215
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
207216
// which is also a valid channel hint.
208217
final Map<SpannerRpc.Option, ?> options;
218+
final long channelId;
209219
synchronized (this) {
210220
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
221+
channelId = sessionChannelCounter;
211222
}
212223
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
213224
try (IScope s = spanner.getTracer().withSpan(span)) {
225+
XGoogSpannerRequestId reqId =
226+
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1);
214227
com.google.spanner.v1.Session session =
215228
spanner
216229
.getRpc()
217230
.createSession(
218231
db.getName(),
219232
spanner.getOptions().getDatabaseRole(),
220233
spanner.getOptions().getSessionLabels(),
221-
options);
234+
reqId.withOptions(options));
222235
SessionReference sessionReference =
223236
new SessionReference(
224237
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
@@ -387,6 +400,8 @@ private List<SessionImpl> internalBatchCreateSessions(
387400
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
388401
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
389402
try (IScope s = spanner.getTracer().withSpan(span)) {
403+
XGoogSpannerRequestId reqId =
404+
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
390405
List<com.google.spanner.v1.Session> sessions =
391406
spanner
392407
.getRpc()
@@ -395,7 +410,7 @@ private List<SessionImpl> internalBatchCreateSessions(
395410
sessionCount,
396411
spanner.getOptions().getDatabaseRole(),
397412
spanner.getOptions().getSessionLabels(),
398-
options);
413+
reqId.withOptions(options));
399414
span.addAnnotation(
400415
String.format(
401416
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));

google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java

+19
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.api.core.InternalApi;
20+
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2021
import com.google.common.annotations.VisibleForTesting;
22+
import io.grpc.Metadata;
2123
import java.math.BigInteger;
2224
import java.security.SecureRandom;
25+
import java.util.HashMap;
26+
import java.util.Map;
2327
import java.util.Objects;
2428

2529
@InternalApi
@@ -28,6 +32,9 @@ public class XGoogSpannerRequestId {
2832
@VisibleForTesting
2933
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();
3034

35+
public static final Metadata.Key<String> REQUEST_HEADER_KEY =
36+
Metadata.Key.of("x-goog-spanner-request-id", Metadata.ASCII_STRING_MARSHALLER);
37+
3138
@VisibleForTesting
3239
static final long VERSION = 1; // The version of the specification being implemented.
3340

@@ -81,6 +88,18 @@ public boolean equals(Object other) {
8188
&& Objects.equals(this.attempt, otherReqId.attempt);
8289
}
8390

91+
public void incrementAttempt() {
92+
this.attempt++;
93+
}
94+
95+
@SuppressWarnings("unchecked")
96+
public Map withOptions(Map options) {
97+
Map copyOptions = new HashMap<>();
98+
copyOptions.putAll(options);
99+
copyOptions.put(SpannerRpc.Option.REQUEST_ID, this.toString());
100+
return copyOptions;
101+
}
102+
84103
@Override
85104
public int hashCode() {
86105
return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+18
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import com.google.cloud.spanner.SpannerOptions;
7272
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
7373
import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider;
74+
import com.google.cloud.spanner.XGoogSpannerRequestId;
7475
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
7576
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
7677
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory;
@@ -88,6 +89,7 @@
8889
import com.google.common.base.Supplier;
8990
import com.google.common.base.Suppliers;
9091
import com.google.common.collect.ImmutableList;
92+
import com.google.common.collect.ImmutableMap;
9193
import com.google.common.collect.ImmutableSet;
9294
import com.google.common.io.Resources;
9395
import com.google.common.util.concurrent.RateLimiter;
@@ -2023,6 +2025,8 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20232025
// Set channel affinity in GAX.
20242026
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
20252027
}
2028+
String methodName = method.getFullMethodName();
2029+
context = withRequestId(context, options, methodName);
20262030
}
20272031
if (compressorName != null) {
20282032
// This sets the compressor for Client -> Server.
@@ -2046,6 +2050,7 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20462050
context
20472051
.withStreamWaitTimeoutDuration(waitTimeout)
20482052
.withStreamIdleTimeoutDuration(idleTimeout);
2053+
20492054
CallContextConfigurator configurator = SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY.get();
20502055
ApiCallContext apiCallContextFromContext = null;
20512056
if (configurator != null) {
@@ -2054,6 +2059,19 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20542059
return (GrpcCallContext) context.merge(apiCallContextFromContext);
20552060
}
20562061

2062+
GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) {
2063+
String reqId = (String) options.get(Option.REQUEST_ID);
2064+
if (reqId == null) {
2065+
return context;
2066+
}
2067+
2068+
System.out.println("\033[32moptions.reqId: " + reqId + "\033[00m " + methodName);
2069+
Map<String, List<String>> withReqId =
2070+
ImmutableMap.of(
2071+
XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(), Collections.singletonList(reqId));
2072+
return context.withExtraHeaders(withReqId);
2073+
}
2074+
20572075
void registerResponseObserver(SpannerResponseObserver responseObserver) {
20582076
responseObservers.add(responseObserver);
20592077
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@
7878
public interface SpannerRpc extends ServiceRpc {
7979
/** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */
8080
enum Option {
81-
CHANNEL_HINT("Channel Hint");
81+
CHANNEL_HINT("Channel Hint"),
82+
REQUEST_ID("Request Id");
8283

8384
private final String value;
8485

0 commit comments

Comments
 (0)