27
27
import com .google .common .util .concurrent .ListenableFuture ;
28
28
import com .google .spanner .v1 .BatchWriteResponse ;
29
29
import io .opentelemetry .api .common .Attributes ;
30
+ import java .util .ArrayList ;
31
+ import java .util .Arrays ;
32
+ import java .util .Objects ;
33
+ import java .util .concurrent .atomic .AtomicInteger ;
30
34
import javax .annotation .Nullable ;
31
35
32
36
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
40
44
@ VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ;
41
45
@ VisibleForTesting final boolean useMultiplexedSessionPartitionedOps ;
42
46
@ VisibleForTesting final boolean useMultiplexedSessionForRW ;
47
+ private final int dbId ;
48
+ private final AtomicInteger nthRequest ;
43
49
44
50
final boolean useMultiplexedSessionBlindWrite ;
45
51
@@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
86
92
this .tracer = tracer ;
87
93
this .useMultiplexedSessionForRW = useMultiplexedSessionForRW ;
88
94
this .commonAttributes = commonAttributes ;
95
+
96
+ this .dbId = this .dbIdFromClientId (this .clientId );
97
+ this .nthRequest = new AtomicInteger (0 );
98
+ }
99
+
100
+ private int dbIdFromClientId (String clientId ) {
101
+ int i = clientId .indexOf ("-" );
102
+ String strWithValue = clientId .substring (i + 1 );
103
+ if (Objects .equals (strWithValue , "" )) {
104
+ strWithValue = "0" ;
105
+ }
106
+ return Integer .parseInt (strWithValue );
89
107
}
90
108
91
109
@ VisibleForTesting
@@ -183,8 +201,20 @@ public CommitResponse writeAtLeastOnceWithOptions(
183
201
return getMultiplexedSessionDatabaseClient ()
184
202
.writeAtLeastOnceWithOptions (mutations , options );
185
203
}
204
+
205
+ int nthRequest = this .nextNthRequest ();
206
+ int channelId = 1 ; /* TODO: infer the channelId from the gRPC channel of the session */
207
+ XGoogSpannerRequestId reqId = XGoogSpannerRequestId .of (this .dbId , channelId , nthRequest , 0 );
208
+
186
209
return runWithSessionRetry (
187
- session -> session .writeAtLeastOnceWithOptions (mutations , options ));
210
+ (session ) -> {
211
+ reqId .incrementAttempt ();
212
+ // TODO: Update the channelId depending on the session that is inferred.
213
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
214
+ allOptions .add (new Options .RequestIdOption (reqId ));
215
+ return session .writeAtLeastOnceWithOptions (
216
+ mutations , allOptions .toArray (new TransactionOption [0 ]));
217
+ });
188
218
} catch (RuntimeException e ) {
189
219
span .setStatus (e );
190
220
throw e ;
@@ -193,16 +223,38 @@ public CommitResponse writeAtLeastOnceWithOptions(
193
223
}
194
224
}
195
225
226
+ private int nextNthRequest () {
227
+ return this .nthRequest .incrementAndGet ();
228
+ }
229
+
196
230
@ Override
197
231
public ServerStream <BatchWriteResponse > batchWriteAtLeastOnce (
198
232
final Iterable <MutationGroup > mutationGroups , final TransactionOption ... options )
199
233
throws SpannerException {
200
234
ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
201
235
try (IScope s = tracer .withSpan (span )) {
236
+ XGoogSpannerRequestId reqId =
237
+ XGoogSpannerRequestId .of (this .dbId , 1 /*TODO:channelId*/ , this .nextNthRequest (), 0 );
238
+ System .out .println ("\033 [35mbatchWriteAtLeastOnceReq: " + reqId .toString () + "\033 [00m" );
202
239
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
203
- return getMultiplexedSessionDatabaseClient ().batchWriteAtLeastOnce (mutationGroups , options );
240
+ reqId .incrementAttempt ();
241
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
242
+ allOptions .add (new Options .RequestIdOption (reqId ));
243
+ return getMultiplexedSessionDatabaseClient ()
244
+ .batchWriteAtLeastOnce (mutationGroups , allOptions .toArray (new TransactionOption [0 ]));
204
245
}
205
- return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
246
+ return runWithSessionRetry (
247
+ (session ) -> {
248
+ reqId .incrementAttempt ();
249
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
250
+ System .out .println (
251
+ "\033 [35mbatchWriteAtLeastOnce:: session.class: "
252
+ + session .getClass ()
253
+ + "\033 [00m" );
254
+ allOptions .add (new Options .RequestIdOption (reqId ));
255
+ return session .batchWriteAtLeastOnce (
256
+ mutationGroups , allOptions .toArray (new TransactionOption [0 ]));
257
+ });
206
258
} catch (RuntimeException e ) {
207
259
span .setStatus (e );
208
260
throw e ;
@@ -350,7 +402,15 @@ private long executePartitionedUpdateWithPooledSession(
350
402
final Statement stmt , final UpdateOption ... options ) {
351
403
ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
352
404
try (IScope s = tracer .withSpan (span )) {
353
- return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
405
+ XGoogSpannerRequestId reqId =
406
+ XGoogSpannerRequestId .of (this .dbId , 1 /*TODO: channelId*/ , this .nextNthRequest (), 0 );
407
+ return runWithSessionRetry (
408
+ (session ) -> {
409
+ reqId .incrementAttempt ();
410
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
411
+ allOptions .add (new Options .RequestIdOption (reqId ));
412
+ return session .executePartitionedUpdate (stmt , allOptions .toArray (new UpdateOption [0 ]));
413
+ });
354
414
} catch (RuntimeException e ) {
355
415
span .setStatus (e );
356
416
span .end ();
0 commit comments