Skip to content

Commit 1b7d8b6

Browse files
committed
feat(observability): fix bugs found from product review + negative cases
This change adds recording of retry span annotations, catching cases in which exceptions where thrown but spans were not ended while testing out and visually confirming the results.
1 parent 0342e74 commit 1b7d8b6

File tree

4 files changed

+143
-78
lines changed

4 files changed

+143
-78
lines changed

src/database.ts

+65-49
Original file line numberDiff line numberDiff line change
@@ -2815,6 +2815,7 @@ class Database extends common.GrpcServiceObject {
28152815
this.runStream(query, options)
28162816
.on('error', err => {
28172817
setSpanError(span, err);
2818+
span.end();
28182819
callback!(err as grpc.ServiceError, rows, stats, metadata);
28192820
})
28202821
.on('response', response => {
@@ -3060,7 +3061,7 @@ class Database extends common.GrpcServiceObject {
30603061
dataStream
30613062
.once('data', () => (dataReceived = true))
30623063
.once('error', err => {
3063-
setSpanError(span, err);
3064+
setSpanErrorAndException(span, err as Error);
30643065

30653066
if (
30663067
!dataReceived &&
@@ -3222,8 +3223,8 @@ class Database extends common.GrpcServiceObject {
32223223
span.addEvent('No session available', {
32233224
'session.id': session?.id,
32243225
});
3225-
this.runTransaction(options, runFn!);
32263226
span.end();
3227+
this.runTransaction(options, runFn!);
32273228
return;
32283229
}
32293230

@@ -3242,8 +3243,8 @@ class Database extends common.GrpcServiceObject {
32423243
}
32433244

32443245
const release = () => {
3245-
span.end();
32463246
this.pool_.release(session!);
3247+
span.end();
32473248
};
32483249

32493250
const runner = new TransactionRunner(
@@ -3253,28 +3254,34 @@ class Database extends common.GrpcServiceObject {
32533254
if (err) {
32543255
setSpanError(span, err!);
32553256
}
3256-
span.end();
32573257
runFn!(err, resp);
32583258
},
32593259
options
32603260
);
32613261

32623262
runner.run().then(release, err => {
3263-
if (err) {
3264-
setSpanError(span, err!);
3265-
}
3263+
setSpanError(span, err);
32663264

32673265
if (isSessionNotFoundError(err)) {
32683266
span.addEvent('No session available', {
32693267
'session.id': session?.id,
32703268
});
3269+
span.addEvent('Retrying');
32713270
release();
3272-
this.runTransaction(options, runFn!);
3271+
this.runTransaction(
3272+
options,
3273+
(
3274+
err: ServiceError | null,
3275+
txn: Transaction | null | undefined
3276+
) => {
3277+
runFn!(err, txn);
3278+
}
3279+
);
32733280
} else {
3274-
if (!err) {
3275-
span.addEvent('Using Session', {'session.id': session!.id});
3276-
}
3277-
setImmediate(runFn!, err);
3281+
span.addEvent('Using Session', {'session.id': session!.id});
3282+
setImmediate((err: null | ServiceError) => {
3283+
runFn!(err);
3284+
}, err);
32783285
release();
32793286
}
32803287
});
@@ -3363,46 +3370,55 @@ class Database extends common.GrpcServiceObject {
33633370

33643371
let sessionId = '';
33653372
const getSession = this.pool_.getSession.bind(this.pool_);
3366-
const span = getActiveOrNoopSpan();
3367-
// Loop to retry 'Session not found' errors.
3368-
// (and yes, we like while (true) more than for (;;) here)
3369-
// eslint-disable-next-line no-constant-condition
3370-
while (true) {
3371-
try {
3372-
const [session, transaction] = await promisify(getSession)();
3373-
transaction.requestOptions = Object.assign(
3374-
transaction.requestOptions || {},
3375-
options.requestOptions
3376-
);
3377-
if (options.optimisticLock) {
3378-
transaction.useOptimisticLock();
3379-
}
3380-
if (options.excludeTxnFromChangeStreams) {
3381-
transaction.excludeTxnFromChangeStreams();
3382-
}
3383-
sessionId = session?.id;
3384-
span.addEvent('Using Session', {'session.id': sessionId});
3385-
const runner = new AsyncTransactionRunner<T>(
3386-
session,
3387-
transaction,
3388-
runFn,
3389-
options
3390-
);
33913373

3392-
try {
3393-
return await runner.run();
3394-
} finally {
3395-
this.pool_.release(session);
3396-
}
3397-
} catch (e) {
3398-
if (!isSessionNotFoundError(e as ServiceError)) {
3399-
span.addEvent('No session available', {
3400-
'session.id': sessionId,
3401-
});
3402-
throw e;
3374+
return startTrace(
3375+
'Database.runTransactionAsync',
3376+
this._traceConfig,
3377+
span => {
3378+
// Loop to retry 'Session not found' errors.
3379+
// (and yes, we like while (true) more than for (;;) here)
3380+
// eslint-disable-next-line no-constant-condition
3381+
while (true) {
3382+
try {
3383+
const [session, transaction] = await promisify(getSession)();
3384+
transaction.requestOptions = Object.assign(
3385+
transaction.requestOptions || {},
3386+
options.requestOptions
3387+
);
3388+
if (options.optimisticLock) {
3389+
transaction.useOptimisticLock();
3390+
}
3391+
if (options.excludeTxnFromChangeStreams) {
3392+
transaction.excludeTxnFromChangeStreams();
3393+
}
3394+
sessionId = session?.id;
3395+
span.addEvent('Using Session', {'session.id': sessionId});
3396+
const runner = new AsyncTransactionRunner<T>(
3397+
session,
3398+
transaction,
3399+
runFn,
3400+
options
3401+
);
3402+
3403+
try {
3404+
const result = await runner.run();
3405+
span.end();
3406+
return result;
3407+
} finally {
3408+
this.pool_.release(session);
3409+
}
3410+
} catch (e) {
3411+
if (!isSessionNotFoundError(e as ServiceError)) {
3412+
span.addEvent('No session available', {
3413+
'session.id': sessionId,
3414+
});
3415+
setSpanErrorAndException(span, e as Error);
3416+
throw e;
3417+
}
3418+
}
34033419
}
34043420
}
3405-
}
3421+
);
34063422
}
34073423

34083424
/**

src/instrument.ts

+31-5
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ function ensureInitialContextManagerSet() {
118118
}
119119
}
120120

121+
const debugTraces = process.env.SPANNER_DEBUG_TRACES === 'true';
122+
121123
/**
122124
* startTrace begins an active span in the current active context
123125
* and passes it back to the set callback function. Each span will
@@ -142,6 +144,10 @@ export function startTrace<T>(
142144
SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix,
143145
{kind: SpanKind.CLIENT},
144146
span => {
147+
if (debugTraces) {
148+
patchSpanEndForDebugging(span);
149+
}
150+
145151
span.setAttribute(SEMATTRS_DB_SYSTEM, 'spanner');
146152
span.setAttribute(ATTR_OTEL_SCOPE_NAME, TRACER_NAME);
147153
span.setAttribute(ATTR_OTEL_SCOPE_VERSION, TRACER_VERSION);
@@ -165,11 +171,20 @@ export function startTrace<T>(
165171
}
166172
}
167173

168-
if (config.that) {
169-
const fn = cb.bind(config.that);
170-
return fn(span);
171-
} else {
172-
return cb(span);
174+
// If at all the invoked function throws an exception,
175+
// record the exception and then end this span.
176+
try {
177+
if (config.that) {
178+
const fn = cb.bind(config.that);
179+
return fn(span);
180+
} else {
181+
return cb(span);
182+
}
183+
} catch (e) {
184+
setSpanErrorAndException(span, e as Error);
185+
span.end();
186+
// Finally re-throw the exception.
187+
throw e;
173188
}
174189
}
175190
);
@@ -289,3 +304,14 @@ class noopSpan implements Span {
289304
return this;
290305
}
291306
}
307+
308+
function patchSpanEndForDebugging(span: Span) {
309+
const origSpanEnd = span.end;
310+
const wrapSpanEnd = function (this: Span) {
311+
console.trace(`\x1b[35m${spanNameSuffix}.end()\x1b[00m`);
312+
return origSpanEnd.apply(this);
313+
};
314+
Object.defineProperty(span, 'end', {
315+
value: wrapSpanEnd,
316+
});
317+
}

src/transaction-runner.ts

+28-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {isSessionNotFoundError} from './session-pool';
2626
import {Database} from './database';
2727
import {google} from '../protos/protos';
2828
import IRequestOptions = google.spanner.v1.IRequestOptions;
29+
import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument';
2930

3031
// eslint-disable-next-line @typescript-eslint/no-var-requires
3132
const jsonProtos = require('../protos/protos.json');
@@ -224,6 +225,7 @@ export abstract class Runner<T> {
224225
async run(): Promise<T> {
225226
const start = Date.now();
226227
const timeout = this.options.timeout!;
228+
const span = getActiveOrNoopSpan();
227229

228230
let lastError: grpc.ServiceError;
229231

@@ -233,8 +235,18 @@ export abstract class Runner<T> {
233235
const transaction = await this.getTransaction();
234236

235237
try {
236-
return await this._run(transaction);
238+
const result = await this._run(transaction);
239+
if (this.attempts > 0) {
240+
// No add to annotate if the transaction wasn't retried.
241+
span.addEvent('Transaction Attempt Succeeded', {
242+
attempt: this.attempts + 1,
243+
});
244+
}
245+
return result;
237246
} catch (e) {
247+
span.addEvent('Transaction Attempt Failed', {
248+
attempt: this.attempts + 1,
249+
});
238250
this.session.lastError = e as grpc.ServiceError;
239251
lastError = e as grpc.ServiceError;
240252
}
@@ -243,19 +255,29 @@ export abstract class Runner<T> {
243255
// thrown here. We do this to bubble this error up to the caller who is
244256
// responsible for retrying the transaction on a different session.
245257
if (
246-
!RETRYABLE.includes(lastError.code!) &&
247-
!isRetryableInternalError(lastError)
258+
!RETRYABLE.includes(lastError!.code!) &&
259+
!isRetryableInternalError(lastError!)
248260
) {
249-
throw lastError;
261+
span.addEvent('Transaction Attempt Aborted', {
262+
attempt: this.attempts + 1,
263+
});
264+
setSpanErrorAndException(span, lastError!);
265+
throw lastError!;
250266
}
251267

252268
this.attempts += 1;
253269

254-
const delay = this.getNextDelay(lastError);
270+
const delay = this.getNextDelay(lastError!);
271+
span.addEvent('Backing off', {delay: delay, attempt: this.attempts});
255272
await new Promise(resolve => setTimeout(resolve, delay));
256273
}
257274

258-
throw new DeadlineError(lastError!);
275+
span.addEvent('Transaction Attempt Aborted due to Deadline Error', {
276+
total_attempts: this.attempts + 1,
277+
});
278+
const err = new DeadlineError(lastError!);
279+
setSpanErrorAndException(span, err);
280+
throw err;
259281
}
260282
}
261283

src/transaction.ts

+19-18
Original file line numberDiff line numberDiff line change
@@ -757,12 +757,6 @@ export class Snapshot extends EventEmitter {
757757
this.begin();
758758
}
759759
setSpanError(span, err);
760-
})
761-
.on('end', err => {
762-
if (err) {
763-
setSpanError(span, err);
764-
}
765-
span.end();
766760
});
767761

768762
if (resultStream instanceof Stream) {
@@ -1288,7 +1282,6 @@ export class Snapshot extends EventEmitter {
12881282
} catch (e) {
12891283
const errorStream = new PassThrough();
12901284
setSpanErrorAndException(span, e as Error);
1291-
span.end();
12921285
setImmediate(() => errorStream.destroy(e as Error));
12931286
return errorStream;
12941287
}
@@ -1332,12 +1325,6 @@ export class Snapshot extends EventEmitter {
13321325
) {
13331326
this.begin();
13341327
}
1335-
})
1336-
.on('end', err => {
1337-
if (err) {
1338-
setSpanError(span, err as Error);
1339-
}
1340-
span.end();
13411328
});
13421329

13431330
if (resultStream instanceof Stream) {
@@ -2112,15 +2099,29 @@ export class Transaction extends Dml {
21122099
} else if (!this._useInRunner) {
21132100
reqOpts.singleUseTransaction = this._options;
21142101
} else {
2115-
this.begin().then(() => {
2116-
this.commit(options, (err, resp) => {
2117-
if (err) {
2102+
this.begin()
2103+
.then(
2104+
() => {
2105+
this.commit(options, (err, resp) => {
2106+
if (err) {
2107+
setSpanError(span, err);
2108+
}
2109+
span.end();
2110+
callback(err, resp);
2111+
});
2112+
},
2113+
err => {
21182114
setSpanError(span, err);
2115+
callback(err);
2116+
span.end();
21192117
}
2118+
)
2119+
.catch(err => {
2120+
setSpanErrorAndException(span, err as Error);
21202121
span.end();
2121-
callback(err, resp);
2122+
// Re-throw the exception after recording it.
2123+
throw err;
21222124
});
2123-
}, callback);
21242125
return;
21252126
}
21262127

0 commit comments

Comments
 (0)