@@ -1131,9 +1131,7 @@ class Database extends common.GrpcServiceObject {
1131
1131
setSpanErrorAndException ( span , e as Error ) ;
1132
1132
this . emit ( 'error' , e ) ;
1133
1133
} finally {
1134
- if ( span . isRecording ( ) ) {
1135
- span . end ( ) ;
1136
- }
1134
+ span . end ( ) ;
1137
1135
}
1138
1136
} ) ;
1139
1137
}
@@ -2104,11 +2102,8 @@ class Database extends common.GrpcServiceObject {
2104
2102
} ) ;
2105
2103
session ! . lastError = err ;
2106
2104
this . pool_ . release ( session ! ) ;
2107
- this . getSnapshot ( options , callback ! ) ;
2108
- // Explicitly requested in code review that this span.end() be
2109
- // moved out of this.getSnapshot, and that there will a later refactor,
2110
- // similar to https://github.com/googleapis/nodejs-spanner/issues/2159
2111
2105
span . end ( ) ;
2106
+ this . getSnapshot ( options , callback ! ) ;
2112
2107
} else {
2113
2108
span . addEvent ( 'Using Session' , { 'session.id' : session ?. id } ) ;
2114
2109
this . pool_ . release ( session ! ) ;
@@ -3084,12 +3079,10 @@ class Database extends common.GrpcServiceObject {
3084
3079
dataStream . removeListener ( 'end' , endListener ) ;
3085
3080
dataStream . end ( ) ;
3086
3081
snapshot . end ( ) ;
3082
+ span . end ( ) ;
3087
3083
// Create a new data stream and add it to the end user stream.
3088
3084
dataStream = this . runStream ( query , options ) ;
3089
3085
dataStream . pipe ( proxyStream ) ;
3090
- // Explicitly invoking span.end() here,
3091
- // instead of inside dataStream.on('error').
3092
- span . end ( ) ;
3093
3086
} else {
3094
3087
proxyStream . destroy ( err ) ;
3095
3088
snapshot . end ( ) ;
@@ -3219,26 +3212,6 @@ class Database extends common.GrpcServiceObject {
3219
3212
? ( optionsOrRunFn as RunTransactionOptions )
3220
3213
: { } ;
3221
3214
3222
- const retry = ( span : Span ) => {
3223
- this . runTransaction ( options , ( err , txn ) => {
3224
- if ( err ) {
3225
- setSpanError ( span , err ) ;
3226
- span . end ( ) ;
3227
- runFn ! ( err , null ) ;
3228
- return ;
3229
- }
3230
-
3231
- txn ! . once ( 'end' , ( ) => {
3232
- span . end ( ) ;
3233
- } ) ;
3234
- txn ! . once ( 'error' , err => {
3235
- setSpanError ( span , err ! ) ;
3236
- span . end ( ) ;
3237
- } ) ;
3238
- runFn ! ( null , txn ! ) ;
3239
- } ) ;
3240
- } ;
3241
-
3242
3215
startTrace ( 'Database.runTransaction' , this . _traceConfig , span => {
3243
3216
this . pool_ . getSession ( ( err , session ?, transaction ?) => {
3244
3217
if ( err ) {
@@ -3249,7 +3222,8 @@ class Database extends common.GrpcServiceObject {
3249
3222
span . addEvent ( 'No session available' , {
3250
3223
'session.id' : session ?. id ,
3251
3224
} ) ;
3252
- retry ( span ) ;
3225
+ span . end ( ) ;
3226
+ this . runTransaction ( options , runFn ! ) ;
3253
3227
return ;
3254
3228
}
3255
3229
@@ -3267,19 +3241,9 @@ class Database extends common.GrpcServiceObject {
3267
3241
transaction ! . excludeTxnFromChangeStreams ( ) ;
3268
3242
}
3269
3243
3270
- // Our span should only be ended if the
3271
- // transaction either errored or was ended.
3272
- transaction ! . once ( 'error' , err => {
3273
- setSpanError ( span , err ! ) ;
3274
- span . end ( ) ;
3275
- } ) ;
3276
-
3277
- transaction ! . once ( 'end' , ( ) => {
3278
- span . end ( ) ;
3279
- } ) ;
3280
-
3281
3244
const release = ( ) => {
3282
3245
this . pool_ . release ( session ! ) ;
3246
+ span . end ( ) ;
3283
3247
} ;
3284
3248
3285
3249
const runner = new TransactionRunner (
@@ -3289,21 +3253,26 @@ class Database extends common.GrpcServiceObject {
3289
3253
options
3290
3254
) ;
3291
3255
3292
- runner . run ( ) . then ( release , err => {
3293
- setSpanError ( span , err ) ;
3256
+ runner
3257
+ . run ( )
3258
+ . then ( release , err => {
3259
+ setSpanError ( span , err ) ;
3294
3260
3295
- if ( isSessionNotFoundError ( err ) ) {
3296
- span . addEvent ( 'No session available' , {
3297
- 'session.id' : session ?. id ,
3298
- } ) ;
3299
- release ( ) ;
3300
- retry ( span ) ;
3301
- } else {
3302
- setImmediate ( runFn ! , err ) ;
3303
- release ( ) ;
3304
- span . end ( ) ;
3305
- }
3306
- } ) ;
3261
+ if ( isSessionNotFoundError ( err ) ) {
3262
+ span . addEvent ( 'No session available' , {
3263
+ 'session.id' : session ?. id ,
3264
+ } ) ;
3265
+ release ( ) ;
3266
+ this . runTransaction ( options , runFn ! ) ;
3267
+ } else {
3268
+ release ( ) ;
3269
+ setImmediate ( runFn ! , err ) ;
3270
+ }
3271
+ } )
3272
+ . catch ( e => {
3273
+ setSpanErrorAndException ( span , e as Error ) ;
3274
+ throw e ;
3275
+ } ) ;
3307
3276
} ) ;
3308
3277
} ) ;
3309
3278
}
@@ -3557,13 +3526,13 @@ class Database extends common.GrpcServiceObject {
3557
3526
// Remove the current data stream from the end user stream.
3558
3527
dataStream . unpipe ( proxyStream ) ;
3559
3528
dataStream . end ( ) ;
3529
+ span . end ( ) ;
3560
3530
// Create a new stream and add it to the end user stream.
3561
3531
dataStream = this . batchWriteAtLeastOnce (
3562
3532
mutationGroups ,
3563
3533
options
3564
3534
) ;
3565
3535
dataStream . pipe ( proxyStream ) ;
3566
- span . end ( ) ;
3567
3536
} else {
3568
3537
span . end ( ) ;
3569
3538
proxyStream . destroy ( err ) ;
@@ -3662,14 +3631,8 @@ class Database extends common.GrpcServiceObject {
3662
3631
span . addEvent ( 'No session available' , {
3663
3632
'session.id' : session ?. id ,
3664
3633
} ) ;
3665
- // Retry this method.
3666
- this . writeAtLeastOnce ( mutations , options , ( err , resp ) => {
3667
- if ( err ) {
3668
- setSpanError ( span , err ) ;
3669
- }
3670
- span . end ( ) ;
3671
- cb ! ( err , resp ) ;
3672
- } ) ;
3634
+ span . end ( ) ;
3635
+ this . writeAtLeastOnce ( mutations , options , cb ! ) ;
3673
3636
return ;
3674
3637
}
3675
3638
if ( err ) {
0 commit comments