@@ -3212,8 +3212,26 @@ class Database extends common.GrpcServiceObject {
3212
3212
? ( optionsOrRunFn as RunTransactionOptions )
3213
3213
: { } ;
3214
3214
3215
+ const retry = ( span : Span ) => {
3216
+ this . runTransaction ( options , ( err , txn ) => {
3217
+ if ( err ) {
3218
+ setSpanError ( span , err ) ;
3219
+ runFn ! ( err , null ) ;
3220
+ return ;
3221
+ }
3222
+
3223
+ txn ! . once ( 'end' , ( ) => {
3224
+ span . end ( ) ;
3225
+ } ) ;
3226
+ txn ! . once ( 'error' , ( ) => {
3227
+ span . end ( ) ;
3228
+ } ) ;
3229
+ runFn ! ( null , txn ! ) ;
3230
+ } ) ;
3231
+ } ;
3232
+
3215
3233
startTrace ( 'Database.runTransaction' , this . _traceConfig , span => {
3216
- this . pool_ . getSession ( async ( err , session ?, transaction ?) => {
3234
+ this . pool_ . getSession ( ( err , session ?, transaction ?) => {
3217
3235
if ( err ) {
3218
3236
setSpanError ( span , err ) ;
3219
3237
}
@@ -3222,11 +3240,7 @@ class Database extends common.GrpcServiceObject {
3222
3240
span . addEvent ( 'No session available' , {
3223
3241
'session.id' : session ?. id ,
3224
3242
} ) ;
3225
- // In this case we are invoking runTransaction afresh
3226
- // hence we have to wait for this call to complete before
3227
- // ending the span.
3228
- await this . runTransaction ( options , runFn ! ) ;
3229
- span . end ( ) ;
3243
+ retry ( span ) ;
3230
3244
return ;
3231
3245
}
3232
3246
@@ -3246,24 +3260,26 @@ class Database extends common.GrpcServiceObject {
3246
3260
transaction ! . excludeTxnFromChangeStreams ( ) ;
3247
3261
}
3248
3262
3263
+ // Our span should only be ended if the
3264
+ // transaction either errored or was ended.
3265
+ transaction ! . once ( 'error' , err => {
3266
+ setSpanError ( span , err ! ) ;
3267
+ span . end ( ) ;
3268
+ } ) ;
3269
+
3270
+ transaction ! . once ( 'end' , err => {
3271
+ setSpanError ( span , err ! ) ;
3272
+ span . end ( ) ;
3273
+ } ) ;
3274
+
3249
3275
const release = ( ) => {
3250
3276
this . pool_ . release ( session ! ) ;
3251
3277
} ;
3252
3278
3253
3279
const runner = new TransactionRunner (
3254
3280
session ! ,
3255
3281
transaction ! ,
3256
- async ( err , resp ) => {
3257
- if ( err ) {
3258
- setSpanError ( span , err ! ) ;
3259
- }
3260
- // It is paramount that we await
3261
- // the caller to return before
3262
- // exiting this function otherwise the span
3263
- // order will not be correct.
3264
- await runFn ! ( err , resp ) ;
3265
- span . end ( ) ;
3266
- } ,
3282
+ runFn ! ,
3267
3283
options
3268
3284
) ;
3269
3285
@@ -3275,13 +3291,12 @@ class Database extends common.GrpcServiceObject {
3275
3291
'session.id' : session ?. id ,
3276
3292
} ) ;
3277
3293
release ( ) ;
3278
- await this . runTransaction ( options , runFn ! ) ;
3294
+ retry ( span ) ;
3279
3295
} else {
3280
3296
setImmediate ( runFn ! , err ) ;
3281
3297
release ( ) ;
3298
+ span . end ( ) ;
3282
3299
}
3283
-
3284
- span . end ( ) ;
3285
3300
} ) ;
3286
3301
} ) ;
3287
3302
} ) ;
0 commit comments