Skip to content

Commit 9d850a7

Browse files
committed
chore: wire up x-goog-spanner-request-id to all
Wires up x-goog-spanner-request-id for piecemeal addition per gRPC method, starting with BatchCreateSessions. This change involves creating TransactionOption, UpdateOption variants that allow holding the request id. Updates googleapis#3537
1 parent 9940b66 commit 9d850a7

15 files changed

+545
-40
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,11 @@ void initTransaction() {
458458

459459
private void initTransactionInternal(BeginTransactionRequest request) {
460460
try {
461+
XGoogSpannerRequestId reqId =
462+
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
461463
Transaction transaction =
462-
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
464+
rpc.beginTransaction(
465+
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
463466
if (!transaction.hasReadTimestamp()) {
464467
throw SpannerExceptionFactory.newSpannerException(
465468
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
@@ -803,7 +806,8 @@ ResultSet executeQueryInternalWithOptions(
803806
tracer.createStatementAttributes(statement, options),
804807
session.getErrorHandler(),
805808
rpc.getExecuteQueryRetrySettings(),
806-
rpc.getExecuteQueryRetryableCodes()) {
809+
rpc.getExecuteQueryRetryableCodes(),
810+
session.getRequestIdCreator()) {
807811
@Override
808812
CloseableIterator<PartialResultSet> startStream(
809813
@Nullable ByteString resumeToken,
@@ -826,11 +830,13 @@ CloseableIterator<PartialResultSet> startStream(
826830
if (selector != null) {
827831
request.setTransaction(selector);
828832
}
833+
834+
this.incrementXGoogRequestIdAttempt();
829835
SpannerRpc.StreamingCall call =
830836
rpc.executeQuery(
831837
request.build(),
832838
stream.consumer(),
833-
getTransactionChannelHint(),
839+
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
834840
isRouteToLeader());
835841
session.markUsed(clock.instant());
836842
stream.setCall(call, request.getTransaction().hasBegin());
@@ -1008,7 +1014,8 @@ ResultSet readInternalWithOptions(
10081014
tracer.createTableAttributes(table, readOptions),
10091015
session.getErrorHandler(),
10101016
rpc.getReadRetrySettings(),
1011-
rpc.getReadRetryableCodes()) {
1017+
rpc.getReadRetryableCodes(),
1018+
session.getRequestIdCreator()) {
10121019
@Override
10131020
CloseableIterator<PartialResultSet> startStream(
10141021
@Nullable ByteString resumeToken,
@@ -1029,11 +1036,12 @@ CloseableIterator<PartialResultSet> startStream(
10291036
builder.setTransaction(selector);
10301037
}
10311038
builder.setRequestOptions(buildRequestOptions(readOptions));
1039+
this.incrementXGoogRequestIdAttempt();
10321040
SpannerRpc.StreamingCall call =
10331041
rpc.read(
10341042
builder.build(),
10351043
stream.consumer(),
1036-
getTransactionChannelHint(),
1044+
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
10371045
isRouteToLeader());
10381046
session.markUsed(clock.instant());
10391047
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,9 @@ private List<Partition> partitionQuery(
315315

316316
final PartitionQueryRequest request = builder.build();
317317
try {
318-
PartitionResponse response = rpc.partitionQuery(request, options);
318+
XGoogSpannerRequestId reqId =
319+
this.session.requestIdCreator.nextRequestId(1 /* channelId */, 0);
320+
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
319321
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
320322
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
321323
Partition partition =

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+64-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import com.google.common.util.concurrent.ListenableFuture;
2828
import com.google.spanner.v1.BatchWriteResponse;
2929
import io.opentelemetry.api.common.Attributes;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Objects;
33+
import java.util.concurrent.atomic.AtomicInteger;
3034
import javax.annotation.Nullable;
3135

3236
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
4044
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
4145
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
4246
@VisibleForTesting final boolean useMultiplexedSessionForRW;
47+
private final int dbId;
48+
private final AtomicInteger nthRequest;
4349

4450
final boolean useMultiplexedSessionBlindWrite;
4551

@@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
8692
this.tracer = tracer;
8793
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
8894
this.commonAttributes = commonAttributes;
95+
96+
this.dbId = this.dbIdFromClientId(this.clientId);
97+
this.nthRequest = new AtomicInteger(0);
98+
}
99+
100+
private int dbIdFromClientId(String clientId) {
101+
int i = clientId.indexOf("-");
102+
String strWithValue = clientId.substring(i + 1);
103+
if (Objects.equals(strWithValue, "")) {
104+
strWithValue = "0";
105+
}
106+
return Integer.parseInt(strWithValue);
89107
}
90108

91109
@VisibleForTesting
@@ -183,8 +201,20 @@ public CommitResponse writeAtLeastOnceWithOptions(
183201
return getMultiplexedSessionDatabaseClient()
184202
.writeAtLeastOnceWithOptions(mutations, options);
185203
}
204+
205+
int nthRequest = this.nextNthRequest();
206+
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
207+
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0);
208+
186209
return runWithSessionRetry(
187-
session -> session.writeAtLeastOnceWithOptions(mutations, options));
210+
(session) -> {
211+
reqId.incrementAttempt();
212+
// TODO: Update the channelId depending on the session that is inferred.
213+
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
214+
allOptions.add(new Options.RequestIdOption(reqId));
215+
return session.writeAtLeastOnceWithOptions(
216+
mutations, allOptions.toArray(new TransactionOption[0]));
217+
});
188218
} catch (RuntimeException e) {
189219
span.setStatus(e);
190220
throw e;
@@ -193,16 +223,38 @@ public CommitResponse writeAtLeastOnceWithOptions(
193223
}
194224
}
195225

226+
private int nextNthRequest() {
227+
return this.nthRequest.incrementAndGet();
228+
}
229+
196230
@Override
197231
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
198232
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
199233
throws SpannerException {
200234
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
201235
try (IScope s = tracer.withSpan(span)) {
236+
XGoogSpannerRequestId reqId =
237+
XGoogSpannerRequestId.of(this.dbId, 1 /*TODO:channelId*/, this.nextNthRequest(), 0);
238+
System.out.println("\033[35mbatchWriteAtLeastOnceReq: " + reqId.toString() + "\033[00m");
202239
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
203-
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
240+
reqId.incrementAttempt();
241+
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
242+
allOptions.add(new Options.RequestIdOption(reqId));
243+
return getMultiplexedSessionDatabaseClient()
244+
.batchWriteAtLeastOnce(mutationGroups, allOptions.toArray(new TransactionOption[0]));
204245
}
205-
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
246+
return runWithSessionRetry(
247+
(session) -> {
248+
reqId.incrementAttempt();
249+
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
250+
System.out.println(
251+
"\033[35mbatchWriteAtLeastOnce:: session.class: "
252+
+ session.getClass()
253+
+ "\033[00m");
254+
allOptions.add(new Options.RequestIdOption(reqId));
255+
return session.batchWriteAtLeastOnce(
256+
mutationGroups, allOptions.toArray(new TransactionOption[0]));
257+
});
206258
} catch (RuntimeException e) {
207259
span.setStatus(e);
208260
throw e;
@@ -350,7 +402,15 @@ private long executePartitionedUpdateWithPooledSession(
350402
final Statement stmt, final UpdateOption... options) {
351403
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
352404
try (IScope s = tracer.withSpan(span)) {
353-
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
405+
XGoogSpannerRequestId reqId =
406+
XGoogSpannerRequestId.of(this.dbId, 1 /*TODO: channelId*/, this.nextNthRequest(), 0);
407+
return runWithSessionRetry(
408+
(session) -> {
409+
reqId.incrementAttempt();
410+
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
411+
allOptions.add(new Options.RequestIdOption(reqId));
412+
return session.executePartitionedUpdate(stmt, allOptions.toArray(new UpdateOption[0]));
413+
});
354414
} catch (RuntimeException e) {
355415
span.setStatus(e);
356416
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+35
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ void appendToOptions(Options options) {
535535
private RpcLockHint lockHint;
536536
private Boolean lastStatement;
537537
private IsolationLevel isolationLevel;
538+
private XGoogSpannerRequestId reqId;
538539

539540
// Construction is via factory methods below.
540541
private Options() {}
@@ -591,6 +592,14 @@ String pageToken() {
591592
return pageToken;
592593
}
593594

595+
boolean hasReqId() {
596+
return reqId != null;
597+
}
598+
599+
XGoogSpannerRequestId reqId() {
600+
return reqId;
601+
}
602+
594603
boolean hasFilter() {
595604
return filter != null;
596605
}
@@ -1052,4 +1061,30 @@ public boolean equals(Object o) {
10521061
return o instanceof LastStatementUpdateOption;
10531062
}
10541063
}
1064+
1065+
static final class RequestIdOption extends InternalOption
1066+
implements TransactionOption, UpdateOption {
1067+
private final XGoogSpannerRequestId reqId;
1068+
1069+
RequestIdOption(XGoogSpannerRequestId reqId) {
1070+
this.reqId = reqId;
1071+
}
1072+
1073+
@Override
1074+
void appendToOptions(Options options) {
1075+
options.reqId = this.reqId;
1076+
}
1077+
1078+
@Override
1079+
public int hashCode() {
1080+
return RequestIdOption.class.hashCode();
1081+
}
1082+
1083+
@Override
1084+
public boolean equals(Object o) {
1085+
// TODO: Examine why the precedent for LastStatementUpdateOption
1086+
// does not check against the actual value.
1087+
return o instanceof RequestIdOption;
1088+
}
1089+
}
10551090
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,20 @@ long executeStreamingPartitionedUpdate(
8181
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
8282
Options options = Options.fromUpdateOptions(updateOptions);
8383

84+
XGoogSpannerRequestId reqId =
85+
session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
86+
8487
try {
8588
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
8689

8790
while (true) {
91+
reqId.incrementAttempt();
8892
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
8993

9094
try {
9195
ServerStream<PartialResultSet> stream =
92-
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
96+
rpc.executeStreamingPartitionedDml(
97+
request, reqId.withOptions(session.getOptions()), remainingTimeout);
9398

9499
for (PartialResultSet rs : stream) {
95100
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
@@ -113,12 +118,17 @@ long executeStreamingPartitionedUpdate(
113118
LOGGER.log(
114119
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
115120
request = resumeOrRestartRequest(resumeToken, statement, request, options);
121+
if (resumeToken.isEmpty()) {
122+
// Create a new xGoogSpannerRequestId.
123+
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
124+
}
116125
} catch (AbortedException e) {
117126
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
118127
resumeToken = ByteString.EMPTY;
119128
foundStats = false;
120129
updateCount = 0L;
121130
request = newTransactionRequestFrom(statement, options);
131+
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
122132
}
123133
}
124134
if (!foundStats) {
@@ -209,7 +219,9 @@ private ByteString initTransaction(final Options options) {
209219
.setExcludeTxnFromChangeStreams(
210220
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
211221
.build();
212-
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
222+
XGoogSpannerRequestId reqId =
223+
session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 1);
224+
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
213225
if (tx.getId().isEmpty()) {
214226
throw SpannerExceptionFactory.newSpannerException(
215227
ErrorCode.INTERNAL,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
7171
private CloseableIterator<PartialResultSet> stream;
7272
private ByteString resumeToken;
7373
private boolean finished;
74+
public XGoogSpannerRequestId xGoogRequestId;
75+
private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator;
7476
/**
7577
* Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
7678
* reached the maximum buffer size without seeing a restart token; in this case, we will drain the
@@ -85,7 +87,8 @@ protected ResumableStreamIterator(
8587
TraceWrapper tracer,
8688
ErrorHandler errorHandler,
8789
RetrySettings streamingRetrySettings,
88-
Set<Code> retryableCodes) {
90+
Set<Code> retryableCodes,
91+
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
8992
this(
9093
maxBufferSize,
9194
streamName,
@@ -94,7 +97,8 @@ protected ResumableStreamIterator(
9497
Attributes.empty(),
9598
errorHandler,
9699
streamingRetrySettings,
97-
retryableCodes);
100+
retryableCodes,
101+
xGoogRequestIdCreator);
98102
}
99103

100104
protected ResumableStreamIterator(
@@ -105,14 +109,16 @@ protected ResumableStreamIterator(
105109
Attributes attributes,
106110
ErrorHandler errorHandler,
107111
RetrySettings streamingRetrySettings,
108-
Set<Code> retryableCodes) {
112+
Set<Code> retryableCodes,
113+
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
109114
checkArgument(maxBufferSize >= 0);
110115
this.maxBufferSize = maxBufferSize;
111116
this.tracer = tracer;
112117
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
113118
this.errorHandler = errorHandler;
114119
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
115120
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
121+
this.xGoogRequestIdCreator = xGoogRequestIdCreator;
116122
}
117123

118124
private ExponentialBackOff newBackOff() {
@@ -189,6 +195,14 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
189195
}
190196
}
191197

198+
public void incrementXGoogRequestIdAttempt() {
199+
if (this.xGoogRequestId == null) {
200+
this.xGoogRequestId =
201+
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 0 /*attempt*/);
202+
}
203+
this.xGoogRequestId.incrementAttempt();
204+
}
205+
192206
private enum DirectExecutor implements Executor {
193207
INSTANCE;
194208

@@ -281,6 +295,7 @@ protected PartialResultSet computeNext() {
281295
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
282296
stream = null;
283297
try (IScope s = tracer.withSpan(span)) {
298+
incrementXGoogRequestIdAttempt();
284299
long delay = spannerException.getRetryDelayInMillis();
285300
if (delay != -1) {
286301
backoffSleep(context, delay);
@@ -301,6 +316,7 @@ protected PartialResultSet computeNext() {
301316
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
302317
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
303318
stream = null;
319+
xGoogRequestId = null;
304320
continue;
305321
}
306322
}
@@ -326,6 +342,10 @@ private void startGrpcStreaming() {
326342
// When start a new stream set the Span as current to make the gRPC Span a child of
327343
// this Span.
328344
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
345+
if (xGoogRequestId == null) {
346+
xGoogRequestId =
347+
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: retrieve channelId*/, 0);
348+
}
329349
stream.requestPrefetchChunks();
330350
}
331351
}

0 commit comments

Comments
 (0)