Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: wire up x-goog-spanner-request-id generation to RPCs #3675

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
Expand All @@ -40,6 +43,8 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;
private final int dbId;
private final AtomicInteger nthRequest;

final boolean useMultiplexedSessionBlindWrite;

Expand Down Expand Up @@ -86,6 +91,15 @@ class DatabaseClientImpl implements DatabaseClient {
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;

this.dbId = this.dbIdFromClientId(this.clientId);
this.nthRequest = new AtomicInteger(0);
}

private int dbIdFromClientId(String clientId) {
int i = clientId.indexOf("-");
String strWithValue = clientId.substring(i + 1);
return Integer.parseInt(strWithValue);
}

@VisibleForTesting
Expand Down Expand Up @@ -155,7 +169,15 @@ public CommitResponse writeWithOptions(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
return runWithSessionRetry(
session -> {
reqId.incrementAttempt();
// TODO: Update the channelId depending on the session that is inferred.
return session.writeWithOptions(mutations, appendReqIdToOptions(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -179,8 +201,17 @@ public CommitResponse writeAtLeastOnceWithOptions(
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}

int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
(session) -> {
reqId.incrementAttempt();
// TODO: Update the channelId depending on the session that is inferred.
return session.writeAtLeastOnceWithOptions(
mutations, appendReqIdToOptions(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -189,6 +220,24 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private TransactionOption[] appendReqIdToOptions(
XGoogSpannerRequestId reqId, TransactionOption... options) {
List<TransactionOption> allOptions = Arrays.asList(options);
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new TransactionOption[0]);
}

private UpdateOption[] appendReqIdToOptions(
XGoogSpannerRequestId reqId, UpdateOption... options) {
List<UpdateOption> allOptions = Arrays.asList(options);
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new UpdateOption[0]);
}

private int nextNthRequest() {
return this.nthRequest.incrementAndGet();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
Expand All @@ -198,7 +247,17 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
}
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));

int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);

return runWithSessionRetry(
(session) -> {
reqId.incrementAttempt();
return session.batchWriteAtLeastOnce(
mutationGroups, appendReqIdToOptions(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -346,7 +405,14 @@ private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
return runWithSessionRetry(
session -> {
reqId.incrementAttempt();
return session.executePartitionedUpdate(stmt, appendReqIdToOptions(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ void appendToOptions(Options options) {
private RpcOrderBy orderBy;
private RpcLockHint lockHint;
private Boolean lastStatement;
private XGoogSpannerRequestId reqId;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -568,6 +569,14 @@ String pageToken() {
return pageToken;
}

boolean hasReqId() {
return reqId != null;
}

XGoogSpannerRequestId reqId() {
return reqId;
}

boolean hasFilter() {
return filter != null;
}
Expand Down Expand Up @@ -1018,4 +1027,30 @@ public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}

static final class RequestIdOption extends InternalOption
implements UpdateOption, TransactionOption {
private final XGoogSpannerRequestId reqId;

RequestIdOption(XGoogSpannerRequestId reqId) {
this.reqId = reqId;
}

@Override
void appendToOptions(Options options) {
options.reqId = this.reqId;
}

@Override
public int hashCode() {
return RequestIdOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
// TODO: Examine why the precedent for LastStatementUpdateOption
// does not check against the actual value.
return o instanceof RequestIdOption;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ protected PartialResultSet computeNext() {
&& (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) {
return buffer.pop();
}

try {
if (stream.hasNext()) {
PartialResultSet next = stream.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;

/** Client for creating single sessions and batches of sessions. */
Expand Down Expand Up @@ -133,6 +134,7 @@ public void run() {
.getTracer()
.getCurrentSpan()
.addAnnotation(String.format("Creating %d sessions", sessionCount));

while (remainingSessionsToCreate > 0) {
try {
sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint);
Expand Down Expand Up @@ -174,6 +176,12 @@ interface SessionConsumer {
private final DatabaseId db;
private final Attributes commonAttributes;

// SessionClient is created long before a DatabaseClientImpl is created,
// as batch sessions are firstly created then later attached to each Client.
private static AtomicInteger NTH_ID = new AtomicInteger(0);
private final int nthId;
private final AtomicInteger nthRequest;

@GuardedBy("this")
private volatile long sessionChannelCounter;

Expand All @@ -186,6 +194,8 @@ interface SessionConsumer {
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
this.nthId = SessionClient.NTH_ID.incrementAndGet();
this.nthRequest = new AtomicInteger(0);
}

@Override
Expand All @@ -206,19 +216,23 @@ SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
// which is also a valid channel hint.
final Map<SpannerRpc.Option, ?> options;
final long channelId;
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
channelId = sessionChannelCounter;
}
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1);
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
reqId.withOptions(options));
SessionReference sessionReference =
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
Expand Down Expand Up @@ -387,6 +401,8 @@ private List<SessionImpl> internalBatchCreateSessions(
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
List<com.google.spanner.v1.Session> sessions =
spanner
.getRpc()
Expand All @@ -395,7 +411,7 @@ private List<SessionImpl> internalBatchCreateSessions(
sessionCount,
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
reqId.withOptions(options));
span.addAnnotation(
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
.setNanos(options.maxCommitDelay().getNano())
.build());
}

RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);

if (commitRequestOptions != null) {
Expand All @@ -269,8 +270,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
CommitRequest request = requestBuilder.build();
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
try (IScope s = tracer.withSpan(span)) {
// Inject the request id into the request options.
XGoogSpannerRequestId reqId = options.reqId();
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
() -> {
reqId.incrementAttempt();
return new CommitResponse(
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -416,14 +423,19 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...

@Override
public ApiFuture<Empty> asyncClose() {
return spanner.getRpc().asyncDeleteSession(getName(), getOptions());
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 2, 1, 1);
return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(options));
}

@Override
public void close() {
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
try (IScope s = tracer.withSpan(span)) {
spanner.getRpc().deleteSession(getName(), getOptions());
// TODO: Infer the caller client if possible else create separate
// outside counter for such asynchronous operations and then also
// increment the operations for each asynchronous operation.
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 2, 1, 1);
spanner.getRpc().deleteSession(getName(), reqId.withOptions(options));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Metadata;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@InternalApi
Expand All @@ -28,6 +32,9 @@ public class XGoogSpannerRequestId {
@VisibleForTesting
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();

public static final Metadata.Key<String> REQUEST_HEADER_KEY =
Metadata.Key.of("x-goog-spanner-request-id", Metadata.ASCII_STRING_MARSHALLER);

@VisibleForTesting
static final long VERSION = 1; // The version of the specification being implemented.

Expand Down Expand Up @@ -81,6 +88,18 @@ public boolean equals(Object other) {
&& Objects.equals(this.attempt, otherReqId.attempt);
}

public void incrementAttempt() {
this.attempt++;
}

@SuppressWarnings("unchecked")
public Map withOptions(Map options) {
Map copyOptions = new HashMap<>();
copyOptions.putAll(options);
copyOptions.put(SpannerRpc.Option.REQUEST_ID, this.toString());
return copyOptions;
}

@Override
public int hashCode() {
return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt);
Expand Down
Loading