Skip to content

Commit 1b4b4d3

Browse files
chore(spanner): Implement fallback for multiplexed session on Partitioned Operations (#3631)
* chore(spanner): Implement fallback for multiplexed session on Partitioned Operations - handle server side kill switch for multiplexed sessions with Partitioned Ops. * fix(spanner): Updated multiplexed session fallback to use status code for narrowing down initial validation. * lint(spanner): Fixes formatting issues and variable categorization. * fix(spanner): Updated the error message validation for Multiplexed Session fallback in Partitioned Operations. * lint(spanner): Fixed formatting issues.
1 parent b04ea80 commit 1b4b4d3

File tree

4 files changed

+180
-30
lines changed

4 files changed

+180
-30
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

+53-17
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.Options.QueryOption;
2323
import com.google.cloud.spanner.Options.ReadOption;
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.collect.ImmutableList;
2728
import com.google.protobuf.Struct;
@@ -35,6 +36,7 @@
3536
import java.time.Instant;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.concurrent.atomic.AtomicReference;
3941
import java.util.concurrent.locks.ReentrantLock;
4042
import javax.annotation.Nullable;
@@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
5961
@GuardedBy("multiplexedSessionLock")
6062
private final AtomicReference<SessionImpl> multiplexedSessionReference;
6163

64+
/**
65+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
66+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
67+
*/
68+
@VisibleForTesting
69+
static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
70+
6271
BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
6372
this.sessionClient = checkNotNull(sessionClient);
6473
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
@@ -85,7 +94,7 @@ public String getDatabaseRole() {
8594
@Override
8695
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
8796
SessionImpl session;
88-
if (isMultiplexedSessionEnabled) {
97+
if (canUseMultiplexedSession()) {
8998
session = getMultiplexedSession();
9099
} else {
91100
session = sessionClient.createSession();
@@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
131140
batchTransactionId);
132141
}
133142

143+
private boolean canUseMultiplexedSession() {
144+
return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps.get();
145+
}
146+
134147
private SessionImpl getMultiplexedSession() {
135148
this.multiplexedSessionLock.lock();
136149
try {
@@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
216229
builder.setPartitionOptions(pbuilder.build());
217230

218231
final PartitionReadRequest request = builder.build();
219-
PartitionResponse response = rpc.partitionRead(request, options);
220-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
221-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
222-
Partition partition =
223-
Partition.createReadPartition(
224-
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
225-
partitions.add(partition);
232+
try {
233+
PartitionResponse response = rpc.partitionRead(request, options);
234+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
235+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
236+
Partition partition =
237+
Partition.createReadPartition(
238+
p.getPartitionToken(),
239+
partitionOptions,
240+
table,
241+
index,
242+
keys,
243+
columns,
244+
readOptions);
245+
partitions.add(partition);
246+
}
247+
return partitions.build();
248+
} catch (SpannerException e) {
249+
maybeMarkUnimplementedForPartitionedOps(e);
250+
throw e;
226251
}
227-
return partitions.build();
228252
}
229253

230254
@Override
@@ -256,15 +280,27 @@ public List<Partition> partitionQuery(
256280
builder.setPartitionOptions(pbuilder.build());
257281

258282
final PartitionQueryRequest request = builder.build();
259-
PartitionResponse response = rpc.partitionQuery(request, options);
260-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
261-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
262-
Partition partition =
263-
Partition.createQueryPartition(
264-
p.getPartitionToken(), partitionOptions, statement, queryOptions);
265-
partitions.add(partition);
283+
try {
284+
PartitionResponse response = rpc.partitionQuery(request, options);
285+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
286+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
287+
Partition partition =
288+
Partition.createQueryPartition(
289+
p.getPartitionToken(), partitionOptions, statement, queryOptions);
290+
partitions.add(partition);
291+
}
292+
return partitions.build();
293+
} catch (SpannerException e) {
294+
maybeMarkUnimplementedForPartitionedOps(e);
295+
throw e;
296+
}
297+
}
298+
299+
void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
300+
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
301+
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
302+
unimplementedForPartitionedOps.set(true);
266303
}
267-
return partitions.build();
268304
}
269305

270306
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ private boolean canUseMultiplexedSessionsForRW() {
124124
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
125125
}
126126

127+
private boolean canUseMultiplexedSessionsForPartitionedOps() {
128+
return this.useMultiplexedSessionPartitionedOps
129+
&& this.multiplexedSessionDatabaseClient != null
130+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForPartitionedOpsSupported();
131+
}
132+
127133
@Override
128134
public Dialect getDialect() {
129135
return pool.getDialect();
@@ -323,8 +329,15 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
323329

324330
@Override
325331
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
326-
if (useMultiplexedSessionPartitionedOps) {
327-
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
332+
333+
if (canUseMultiplexedSessionsForPartitionedOps()) {
334+
try {
335+
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
336+
} catch (SpannerException e) {
337+
if (!multiplexedSessionDatabaseClient.maybeMarkUnimplementedForPartitionedOps(e)) {
338+
throw e;
339+
}
340+
}
328341
}
329342
return executePartitionedUpdateWithPooledSession(stmt, options);
330343
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ void onError(SpannerException spannerException) {
104104
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
105105
// multiplexed sessions" is returned.
106106
this.client.maybeMarkUnimplementedForRW(spannerException);
107+
// Mark multiplexed sessions for Partitioned Ops as unimplemented and fall back to regular
108+
// sessions if
109+
// UNIMPLEMENTED with error message "Partitioned operations are not supported with multiplexed
110+
// sessions".
111+
this.client.maybeMarkUnimplementedForPartitionedOps(spannerException);
107112
}
108113

109114
@Override
@@ -214,6 +219,12 @@ public void close() {
214219
*/
215220
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);
216221

222+
/**
223+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
224+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
225+
*/
226+
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
227+
217228
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
218229
this(sessionClient, Clock.systemUTC());
219230
}
@@ -316,7 +327,18 @@ && verifyErrorMessage(
316327
}
317328
}
318329

319-
private boolean verifyErrorMessage(SpannerException spannerException, String message) {
330+
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
331+
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
332+
&& verifyErrorMessage(
333+
spannerException,
334+
"Transaction type partitioned_dml not supported with multiplexed sessions")) {
335+
unimplementedForPartitionedOps.set(true);
336+
return true;
337+
}
338+
return false;
339+
}
340+
341+
static boolean verifyErrorMessage(SpannerException spannerException, String message) {
320342
if (spannerException.getCause() == null) {
321343
return false;
322344
}
@@ -391,6 +413,10 @@ boolean isMultiplexedSessionsForRWSupported() {
391413
return !this.unimplementedForRW.get();
392414
}
393415

416+
boolean isMultiplexedSessionsForPartitionedOpsSupported() {
417+
return !this.unimplementedForPartitionedOps.get();
418+
}
419+
394420
void close() {
395421
synchronized (this) {
396422
if (!this.isClosed) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

+85-10
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,12 @@
4646
import com.google.common.collect.Lists;
4747
import com.google.common.util.concurrent.MoreExecutors;
4848
import com.google.protobuf.ByteString;
49-
import com.google.spanner.v1.BatchWriteRequest;
50-
import com.google.spanner.v1.BatchWriteResponse;
51-
import com.google.spanner.v1.BeginTransactionRequest;
52-
import com.google.spanner.v1.CommitRequest;
53-
import com.google.spanner.v1.ExecuteSqlRequest;
49+
import com.google.spanner.v1.*;
5450
import com.google.spanner.v1.RequestOptions.Priority;
5551
import com.google.spanner.v1.Session;
56-
import com.google.spanner.v1.Transaction;
5752
import io.grpc.Status;
5853
import java.time.Duration;
59-
import java.util.Collections;
60-
import java.util.List;
61-
import java.util.Set;
62-
import java.util.UUID;
54+
import java.util.*;
6355
import java.util.concurrent.CountDownLatch;
6456
import java.util.concurrent.TimeUnit;
6557
import java.util.concurrent.atomic.AtomicInteger;
@@ -1540,6 +1532,89 @@ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToR
15401532
assertFalse(session2.getMultiplexed());
15411533
}
15421534

1535+
// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
1536+
@Test
1537+
public void
1538+
testInitialBeginTransactionWithPDML_receivesUnimplemented_fallsBackToRegularSession() {
1539+
mockSpanner.setBeginTransactionExecutionTime(
1540+
SimulatedExecutionTime.ofExceptions(
1541+
Arrays.asList(
1542+
Status.UNIMPLEMENTED
1543+
.withDescription(
1544+
"Transaction type partitioned_dml not supported with multiplexed sessions")
1545+
.asRuntimeException(),
1546+
Status.UNIMPLEMENTED
1547+
.withDescription(
1548+
"Transaction type partitioned_dml not supported with multiplexed sessions")
1549+
.asRuntimeException())));
1550+
DatabaseClientImpl client =
1551+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1552+
1553+
assertNotNull(client.multiplexedSessionDatabaseClient);
1554+
1555+
// Partitioned Ops transaction should fallback to regular sessions
1556+
assertEquals(UPDATE_COUNT, client.executePartitionedUpdate(UPDATE_STATEMENT));
1557+
1558+
// Verify that we received one ExecuteSqlRequest, and it uses a regular session due to fallback.
1559+
List<ExecuteSqlRequest> executeSqlRequests =
1560+
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
1561+
assertEquals(1, executeSqlRequests.size());
1562+
// Verify the requests are not executed using multiplexed sessions
1563+
Session session2 = mockSpanner.getSession(executeSqlRequests.get(0).getSession());
1564+
assertNotNull(session2);
1565+
assertFalse(session2.getMultiplexed());
1566+
assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForPartitionedOps.get());
1567+
}
1568+
1569+
// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
1570+
@Test
1571+
public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession() {
1572+
mockSpanner.setPartitionQueryExecutionTime(
1573+
SimulatedExecutionTime.ofException(
1574+
Status.INVALID_ARGUMENT
1575+
.withDescription(
1576+
"Partitioned operations are not supported with multiplexed sessions")
1577+
.asRuntimeException()));
1578+
BatchClientImpl client = (BatchClientImpl) spanner.getBatchClient(DatabaseId.of("p", "i", "d"));
1579+
1580+
try (BatchReadOnlyTransaction transaction =
1581+
client.batchReadOnlyTransaction(TimestampBound.strong())) {
1582+
// Partitioned Query should fail
1583+
SpannerException spannerException =
1584+
assertThrows(
1585+
SpannerException.class,
1586+
() -> {
1587+
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
1588+
});
1589+
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());
1590+
1591+
// Verify that we received one PartitionQueryRequest.
1592+
List<PartitionQueryRequest> partitionQueryRequests =
1593+
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1594+
assertEquals(1, partitionQueryRequests.size());
1595+
// Verify the requests were executed using multiplexed sessions
1596+
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1597+
assertNotNull(session2);
1598+
assertTrue(session2.getMultiplexed());
1599+
assertTrue(client.unimplementedForPartitionedOps.get());
1600+
}
1601+
try (BatchReadOnlyTransaction transaction =
1602+
client.batchReadOnlyTransaction(TimestampBound.strong())) {
1603+
// Partitioned Query should fail
1604+
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
1605+
1606+
// // Verify that we received two PartitionQueryRequest. and it uses a regular session due to
1607+
// fallback.
1608+
List<PartitionQueryRequest> partitionQueryRequests =
1609+
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1610+
assertEquals(2, partitionQueryRequests.size());
1611+
// Verify the requests are not executed using multiplexed sessions
1612+
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1613+
assertNotNull(session2);
1614+
assertFalse(session2.getMultiplexed());
1615+
}
1616+
}
1617+
15431618
@Test
15441619
public void
15451620
testReadWriteUnimplementedErrorDuringInitialBeginTransactionRPC_firstReceivesError_secondFallsBackToRegularSessions() {

0 commit comments

Comments
 (0)