From c64ce96e3b3b5c5ed8716e4b22ab8cb25e78214d Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 4 Mar 2025 23:48:18 +0200 Subject: [PATCH 1/2] chore: wire up x-goog-spanner-request-id generation to RPCs Wire up the request-id generation with the respective RPCs as a first phase before applying grpc.CallSettings. Updates #3537 --- .../cloud/spanner/DatabaseClientImpl.java | 34 +++++++++- .../com/google/cloud/spanner/Options.java | 34 ++++++++++ .../google/cloud/spanner/SessionClient.java | 7 ++- .../com/google/cloud/spanner/SessionImpl.java | 15 ++++- .../cloud/spanner/XGoogSpannerRequestId.java | 19 ++++++ .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 16 +++++ .../cloud/spanner/spi/v1/SpannerRpc.java | 3 +- .../cloud/spanner/DatabaseClientImplTest.java | 62 ++++++++++++++++++- .../spanner/XGoogSpannerRequestIdTest.java | 43 +++++++++++++ 9 files changed, 227 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index ed5b017934..fadd16513d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -27,6 +27,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.spanner.v1.BatchWriteResponse; import io.opentelemetry.api.common.Attributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; class DatabaseClientImpl implements DatabaseClient { @@ -40,6 +43,8 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; @VisibleForTesting final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final boolean useMultiplexedSessionForRW; + private final int dbId; + private final AtomicInteger nthRequest; final boolean useMultiplexedSessionBlindWrite; @@ -86,6 +91,15 @@ class DatabaseClientImpl implements DatabaseClient { this.tracer = tracer; this.useMultiplexedSessionForRW = useMultiplexedSessionForRW; this.commonAttributes = commonAttributes; + + this.dbId = this.dbIdFromClientId(this.clientId); + this.nthRequest = new AtomicInteger(0); + } + + private int dbIdFromClientId(String clientId) { + int i = clientId.indexOf("-"); + String strWithValue = clientId.substring(i + 1); + return Integer.parseInt(strWithValue); } @VisibleForTesting @@ -179,8 +193,22 @@ public CommitResponse writeAtLeastOnceWithOptions( return getMultiplexedSessionDatabaseClient() .writeAtLeastOnceWithOptions(mutations, options); } + + int nthRequest = this.nextNthRequest(); + int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */ + XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0); + return runWithSessionRetry( - session -> session.writeAtLeastOnceWithOptions(mutations, options)); + (session) -> { + reqId.incrementAttempt(); + // TODO: Update the channelId depending on the session that is inferred. + ArrayList allOptions = new ArrayList<>(); + allOptions.add(new Options.RequestIdOption(reqId)); + allOptions.addAll(Arrays.asList(options)); + + return session.writeAtLeastOnceWithOptions( + mutations, allOptions.toArray(new TransactionOption[0])); + }); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -189,6 +217,10 @@ public CommitResponse writeAtLeastOnceWithOptions( } } + private int nextNthRequest() { + return this.nthRequest.addAndGet(1); + } + @Override public ServerStream batchWriteAtLeastOnce( final Iterable mutationGroups, final TransactionOption... options) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index c8c588f813..630684130f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -512,6 +512,7 @@ void appendToOptions(Options options) { private RpcOrderBy orderBy; private RpcLockHint lockHint; private Boolean lastStatement; + private XGoogSpannerRequestId reqId; // Construction is via factory methods below. private Options() {} @@ -568,6 +569,14 @@ String pageToken() { return pageToken; } + boolean hasReqId() { + return reqId != null; + } + + XGoogSpannerRequestId reqId() { + return reqId; + } + boolean hasFilter() { return filter != null; } @@ -1018,4 +1027,29 @@ public boolean equals(Object o) { return o instanceof LastStatementUpdateOption; } } + + static final class RequestIdOption extends InternalOption implements TransactionOption { + private final XGoogSpannerRequestId reqId; + + RequestIdOption(XGoogSpannerRequestId reqId) { + this.reqId = reqId; + } + + @Override + void appendToOptions(Options options) { + options.reqId = this.reqId; + } + + @Override + public int hashCode() { + return RequestIdOption.class.hashCode(); + } + + @Override + public boolean equals(Object o) { + // TODO: Examine why the precedent for LastStatementUpdateOption + // does not check against the actual value. + return o instanceof RequestIdOption; + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 2edfb66d89..b2c8e943fa 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -133,6 +133,7 @@ public void run() { .getTracer() .getCurrentSpan() .addAnnotation(String.format("Creating %d sessions", sessionCount)); + while (remainingSessionsToCreate > 0) { try { sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint); @@ -387,6 +388,10 @@ private List internalBatchCreateSessions( .spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent); span.addAnnotation(String.format("Requesting %d sessions", sessionCount)); try (IScope s = spanner.getTracer().withSpan(span)) { + // TODO: Infer the caller client if possible else create separate + // outside counter for such asynchronous operations and then also + // increment the operations for each asynchronous operation. + XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 1, 1, 1); List sessions = spanner .getRpc() @@ -395,7 +400,7 @@ private List internalBatchCreateSessions( sessionCount, spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - options); + reqId.withOptions(options)); span.addAnnotation( String.format( "Request for %d sessions returned %d sessions", sessionCount, sessions.size())); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 454709275f..0d561e0a65 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -261,6 +261,7 @@ public CommitResponse writeAtLeastOnceWithOptions( .setNanos(options.maxCommitDelay().getNano()) .build()); } + RequestOptions commitRequestOptions = getRequestOptions(transactionOptions); if (commitRequestOptions != null) { @@ -269,8 +270,14 @@ public CommitResponse writeAtLeastOnceWithOptions( CommitRequest request = requestBuilder.build(); ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT); try (IScope s = tracer.withSpan(span)) { + // Inject the request id into the request options. + XGoogSpannerRequestId reqId = options.reqId(); return SpannerRetryHelper.runTxWithRetriesOnAborted( - () -> new CommitResponse(spanner.getRpc().commit(request, getOptions()))); + () -> { + reqId.incrementAttempt(); + return new CommitResponse( + spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); + }); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -423,7 +430,11 @@ public ApiFuture asyncClose() { public void close() { ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION); try (IScope s = tracer.withSpan(span)) { - spanner.getRpc().deleteSession(getName(), getOptions()); + // TODO: Infer the caller client if possible else create separate + // outside counter for such asynchronous operations and then also + // increment the operations for each asynchronous operation. + XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 2, 1, 1); + spanner.getRpc().deleteSession(getName(), reqId.withOptions(options)); } catch (RuntimeException e) { span.setStatus(e); throw e; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 4f6c011475..0da80ec366 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -17,9 +17,13 @@ package com.google.cloud.spanner; import com.google.api.core.InternalApi; +import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; +import io.grpc.Metadata; import java.math.BigInteger; import java.security.SecureRandom; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; @InternalApi @@ -28,6 +32,9 @@ public class XGoogSpannerRequestId { @VisibleForTesting static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId(); + public static final Metadata.Key REQUEST_HEADER_KEY = + Metadata.Key.of("x-goog-spanner-request-id", Metadata.ASCII_STRING_MARSHALLER); + @VisibleForTesting static final long VERSION = 1; // The version of the specification being implemented. @@ -81,6 +88,18 @@ public boolean equals(Object other) { && Objects.equals(this.attempt, otherReqId.attempt); } + public void incrementAttempt() { + this.attempt++; + } + + @SuppressWarnings("unchecked") + public Map withOptions(Map options) { + Map copyOptions = new HashMap<>(); + copyOptions.putAll(options); + copyOptions.put(SpannerRpc.Option.REQUEST_ID, this.toString()); + return copyOptions; + } + @Override public int hashCode() { return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 0e540ea792..d8023c2ba2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -71,6 +71,7 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider; +import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory; @@ -88,6 +89,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; import com.google.common.util.concurrent.RateLimiter; @@ -193,6 +195,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -2018,6 +2021,8 @@ GrpcCallContext newCallContext( // Set channel affinity in GAX. context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); } + String methodName = method.getFullMethodName(); + context = withRequestId(context, options, methodName); } if (compressorName != null) { // This sets the compressor for Client -> Server. @@ -2041,6 +2046,8 @@ GrpcCallContext newCallContext( context .withStreamWaitTimeoutDuration(waitTimeout) .withStreamIdleTimeoutDuration(idleTimeout); + + // TODO: Infer the x-goog-spanner-request-id header and inject it in accordingly. CallContextConfigurator configurator = SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY.get(); ApiCallContext apiCallContextFromContext = null; if (configurator != null) { @@ -2049,6 +2056,15 @@ GrpcCallContext newCallContext( return (GrpcCallContext) context.merge(apiCallContextFromContext); } + GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) { + String reqId = (String) options.get(Option.REQUEST_ID); + System.out.println("\033[32moptions.reqId: " + reqId + "\033[00m " + methodName); + Map> withReqId = + ImmutableMap.of( + XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(), Collections.singletonList(reqId)); + return context.withExtraHeaders(withReqId); + } + void registerResponseObserver(SpannerResponseObserver responseObserver) { responseObservers.add(responseObserver); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 9ad9420474..d029084477 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -78,7 +78,8 @@ public interface SpannerRpc extends ServiceRpc { /** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */ enum Option { - CHANNEL_HINT("Channel Hint"); + CHANNEL_HINT("Channel Hint"), + REQUEST_ID("Request Id"); private final String value; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 87ea5c19ce..cd44e14114 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -105,6 +105,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; +import io.grpc.ServerInterceptors; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessServerBuilder; @@ -152,6 +153,7 @@ public class DatabaseClientImplTest { private static final String DATABASE_NAME = String.format( "projects/%s/instances/%s/databases/%s", TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE); + private static XGoogSpannerRequestIdTest.ServerHeaderEnforcer xGoogReqIdInterceptor; private static MockSpannerServiceImpl mockSpanner; private static Server server; private static LocalChannelProvider channelProvider; @@ -220,13 +222,14 @@ public static void startStaticServer() throws IOException { StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET)); mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES); + xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(); executor = Executors.newSingleThreadExecutor(); String uniqueName = InProcessServerBuilder.generateName(); server = InProcessServerBuilder.forName(uniqueName) // We need to use a real executor for timeouts to occur. .scheduledExecutorService(new ScheduledThreadPoolExecutor(1)) - .addService(mockSpanner) + .addService(ServerInterceptors.intercept(mockSpanner, xGoogReqIdInterceptor)) .build() .start(); channelProvider = LocalChannelProvider.create(uniqueName); @@ -5168,6 +5171,63 @@ public void testRetryOnResourceExhausted() { } } + @Test + public void testSelect1HasXGoogRequestIdHeader() { + SingerInfo info = SingerInfo.newBuilder().setSingerId(1).build(); + Statement statement = Statement.of("SELECT * FROM FOO"); + mockSpanner.putStatementResult( + StatementResult.query( + statement, + com.google.spanner.v1.ResultSet.newBuilder() + .setMetadata( + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + Field.newBuilder() + .setName("a1") + .setType( + Type.newBuilder() + .setCodeValue(Integer.MAX_VALUE) + .build()) + .build()) + .addFields( + Field.newBuilder() + .setName("b1") + .setType( + Type.newBuilder() + .setCodeValue(Integer.MAX_VALUE) + .build()) + .build()) + .build()) + .build()) + .addRows( + ListValue.newBuilder() + .addValues( + com.google.protobuf.Value.newBuilder() + .setListValue( + ListValue.newBuilder() + .addValues( + com.google.protobuf.Value.newBuilder() + .setNumberValue(6.626) + .build()) + .addValues( + com.google.protobuf.Value.newBuilder() + .setNumberValue(-6.626) + .build()) + .build()) + .build()) + .build()) + .build())); + + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = client.singleUse().executeQuery(statement)) { + assertTrue(resultSet.next()); + assertAsString(ImmutableList.of("6.626", "-6.626"), resultSet, 0); + } + } + @Test public void testSessionPoolExhaustedError_containsStackTraces() { assumeFalse( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index 12c9213c7d..e1a027482b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -18,8 +18,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.junit.Test; @@ -51,4 +58,40 @@ public void testEnsureHexadecimalFormatForRandProcessID() { Matcher m = XGoogSpannerRequestIdTest.REGEX_RAND_PROCESS_ID.matcher(str); assertTrue(m.matches()); } + + public static class ServerHeaderEnforcer implements ServerInterceptor { + private List gotValues; + + ServerHeaderEnforcer() { + this.gotValues = new ArrayList(); + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + final Metadata requestHeaders, + ServerCallHandler next) { + // Firstly assert and validate that at least we've got a requestId. + String gotReqId = requestHeaders.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY); + assertNotNull(gotReqId); + Matcher m = XGoogSpannerRequestIdTest.REGEX_RAND_PROCESS_ID.matcher(gotReqId); + String message = + String.format( + "%s lacks %s in %s", + call.getMethodDescriptor().getFullMethodName(), + XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(), + gotReqId); + System.out.println("\033[32mMessage: " + message + "\033[00m"); + assertTrue(m.matches()); + + this.gotValues.add(gotReqId); + + // Finally proceed with the call. + return next.startCall(call, requestHeaders); + } + + public String[] accumulatedValues() { + return this.gotValues.toArray(new String[0]); + } + } } From d66e013d3a91be0157646bebd2023208d7333584 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 6 Mar 2025 19:39:25 +0200 Subject: [PATCH 2/2] Wire up more x-goog-spanner-request-id propagations --- .../cloud/spanner/DatabaseClientImpl.java | 60 +++++++++++++++---- .../com/google/cloud/spanner/Options.java | 3 +- .../spanner/ResumableStreamIterator.java | 1 + .../google/cloud/spanner/SessionClient.java | 21 +++++-- .../com/google/cloud/spanner/SessionImpl.java | 3 +- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 5 +- .../cloud/spanner/DatabaseClientImplTest.java | 5 +- .../spanner/XGoogSpannerRequestIdTest.java | 27 ++++++--- 8 files changed, 94 insertions(+), 31 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index fadd16513d..4e41416a2a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -27,8 +27,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.spanner.v1.BatchWriteResponse; import io.opentelemetry.api.common.Attributes; -import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -169,7 +169,15 @@ public CommitResponse writeWithOptions( if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options); } - return runWithSessionRetry(session -> session.writeWithOptions(mutations, options)); + int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */ + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0); + return runWithSessionRetry( + session -> { + reqId.incrementAttempt(); + // TODO: Update the channelId depending on the session that is inferred. + return session.writeWithOptions(mutations, appendReqIdToOptions(reqId, options)); + }); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -194,20 +202,15 @@ public CommitResponse writeAtLeastOnceWithOptions( .writeAtLeastOnceWithOptions(mutations, options); } - int nthRequest = this.nextNthRequest(); int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */ - XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0); - + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0); return runWithSessionRetry( (session) -> { reqId.incrementAttempt(); // TODO: Update the channelId depending on the session that is inferred. - ArrayList allOptions = new ArrayList<>(); - allOptions.add(new Options.RequestIdOption(reqId)); - allOptions.addAll(Arrays.asList(options)); - return session.writeAtLeastOnceWithOptions( - mutations, allOptions.toArray(new TransactionOption[0])); + mutations, appendReqIdToOptions(reqId, options)); }); } catch (RuntimeException e) { span.setStatus(e); @@ -217,8 +220,22 @@ public CommitResponse writeAtLeastOnceWithOptions( } } + private TransactionOption[] appendReqIdToOptions( + XGoogSpannerRequestId reqId, TransactionOption... options) { + List allOptions = Arrays.asList(options); + allOptions.add(new Options.RequestIdOption(reqId)); + return allOptions.toArray(new TransactionOption[0]); + } + + private UpdateOption[] appendReqIdToOptions( + XGoogSpannerRequestId reqId, UpdateOption... options) { + List allOptions = Arrays.asList(options); + allOptions.add(new Options.RequestIdOption(reqId)); + return allOptions.toArray(new UpdateOption[0]); + } + private int nextNthRequest() { - return this.nthRequest.addAndGet(1); + return this.nthRequest.incrementAndGet(); } @Override @@ -230,7 +247,17 @@ public ServerStream batchWriteAtLeastOnce( if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options); } - return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options)); + + int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */ + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0); + + return runWithSessionRetry( + (session) -> { + reqId.incrementAttempt(); + return session.batchWriteAtLeastOnce( + mutationGroups, appendReqIdToOptions(reqId, options)); + }); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -378,7 +405,14 @@ private long executePartitionedUpdateWithPooledSession( final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { - return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); + int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */ + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0); + return runWithSessionRetry( + session -> { + reqId.incrementAttempt(); + return session.executePartitionedUpdate(stmt, appendReqIdToOptions(reqId, options)); + }); } catch (RuntimeException e) { span.setStatus(e); span.end(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 630684130f..10b89fface 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -1028,7 +1028,8 @@ public boolean equals(Object o) { } } - static final class RequestIdOption extends InternalOption implements TransactionOption { + static final class RequestIdOption extends InternalOption + implements UpdateOption, TransactionOption { private final XGoogSpannerRequestId reqId; RequestIdOption(XGoogSpannerRequestId reqId) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 793f3bcbe3..ab400e7cd9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -243,6 +243,7 @@ protected PartialResultSet computeNext() { && (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) { return buffer.pop(); } + try { if (stream.hasNext()) { PartialResultSet next = stream.next(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index b2c8e943fa..4376a2ff6f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.GuardedBy; /** Client for creating single sessions and batches of sessions. */ @@ -175,6 +176,12 @@ interface SessionConsumer { private final DatabaseId db; private final Attributes commonAttributes; + // SessionClient is created long before a DatabaseClientImpl is created, + // as batch sessions are firstly created then later attached to each Client. + private static AtomicInteger NTH_ID = new AtomicInteger(0); + private final int nthId; + private final AtomicInteger nthRequest; + @GuardedBy("this") private volatile long sessionChannelCounter; @@ -187,6 +194,8 @@ interface SessionConsumer { this.executorFactory = executorFactory; this.executor = executorFactory.get(); this.commonAttributes = spanner.getTracer().createCommonAttributes(db); + this.nthId = SessionClient.NTH_ID.incrementAndGet(); + this.nthRequest = new AtomicInteger(0); } @Override @@ -207,11 +216,15 @@ SessionImpl createSession() { // The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE, // which is also a valid channel hint. final Map options; + final long channelId; synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); + channelId = sessionChannelCounter; } ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1); com.google.spanner.v1.Session session = spanner .getRpc() @@ -219,7 +232,7 @@ SessionImpl createSession() { db.getName(), spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - options); + reqId.withOptions(options)); SessionReference sessionReference = new SessionReference( session.getName(), session.getCreateTime(), session.getMultiplexed(), options); @@ -388,10 +401,8 @@ private List internalBatchCreateSessions( .spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent); span.addAnnotation(String.format("Requesting %d sessions", sessionCount)); try (IScope s = spanner.getTracer().withSpan(span)) { - // TODO: Infer the caller client if possible else create separate - // outside counter for such asynchronous operations and then also - // increment the operations for each asynchronous operation. - XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 1, 1, 1); + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1); List sessions = spanner .getRpc() diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 0d561e0a65..a674b0a3a9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -423,7 +423,8 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... @Override public ApiFuture asyncClose() { - return spanner.getRpc().asyncDeleteSession(getName(), getOptions()); + XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 2, 1, 1); + return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(options)); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index d8023c2ba2..9594785673 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -2047,7 +2047,6 @@ GrpcCallContext newCallContext( .withStreamWaitTimeoutDuration(waitTimeout) .withStreamIdleTimeoutDuration(idleTimeout); - // TODO: Infer the x-goog-spanner-request-id header and inject it in accordingly. CallContextConfigurator configurator = SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY.get(); ApiCallContext apiCallContextFromContext = null; if (configurator != null) { @@ -2058,6 +2057,10 @@ GrpcCallContext newCallContext( GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) { String reqId = (String) options.get(Option.REQUEST_ID); + if (reqId == null) { + return context; + } + System.out.println("\033[32moptions.reqId: " + reqId + "\033[00m " + methodName); Map> withReqId = ImmutableMap.of( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index cd44e14114..0092014f3d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -120,6 +120,7 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; @@ -222,7 +223,9 @@ public static void startStaticServer() throws IOException { StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET)); mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES); - xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(); + Set checkMethods = + new HashSet(Arrays.asList("google.spanner.v1.Spanner/BatchCreateSessions")); + xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(checkMethods); executor = Executors.newSingleThreadExecutor(); String uniqueName = InProcessServerBuilder.generateName(); server = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index e1a027482b..38a5c47612 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -27,6 +27,7 @@ import io.grpc.ServerInterceptor; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.junit.Test; @@ -61,9 +62,11 @@ public void testEnsureHexadecimalFormatForRandProcessID() { public static class ServerHeaderEnforcer implements ServerInterceptor { private List gotValues; + private Set checkMethods; - ServerHeaderEnforcer() { + ServerHeaderEnforcer(Set checkMethods) { this.gotValues = new ArrayList(); + this.checkMethods = checkMethods; } @Override @@ -71,17 +74,23 @@ public ServerCall.Listener interceptCall( ServerCall call, final Metadata requestHeaders, ServerCallHandler next) { + String methodName = call.getMethodDescriptor().getFullMethodName(); + if (!this.checkMethods.contains(methodName)) { + return next.startCall(call, requestHeaders); + } + // Firstly assert and validate that at least we've got a requestId. String gotReqId = requestHeaders.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY); - assertNotNull(gotReqId); Matcher m = XGoogSpannerRequestIdTest.REGEX_RAND_PROCESS_ID.matcher(gotReqId); - String message = - String.format( - "%s lacks %s in %s", - call.getMethodDescriptor().getFullMethodName(), - XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(), - gotReqId); - System.out.println("\033[32mMessage: " + message + "\033[00m"); + if (!m.matches()) { + String message = + String.format( + "%s lacks %s", methodName, XGoogSpannerRequestId.REQUEST_HEADER_KEY.name()); + System.out.println("\033[31mMessage: " + message + "\033[00m"); + } else { + System.out.println("\033[32mMessage: " + methodName + " has " + gotReqId + "\033[00m"); + } + assertNotNull(gotReqId); assertTrue(m.matches()); this.gotValues.add(gotReqId);