Skip to content

Commit 3b3ab72

Browse files
docs: documented how to use the subscriber based instrumentation (#3432)
1 parent 964543b commit 3b3ab72

File tree

11 files changed

+224
-39
lines changed

11 files changed

+224
-39
lines changed

lib/subscribers/README.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Subscriber-based Instrumentation
2+
3+
As of v13.2.0, we have begun to refactor our traditional instrumentation (`Shim`-based monkey-patching) to instead subscribe to events emitted by Node's [`diagnostic_channel TracingChannel`](https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel). This is done through [`@apm-js-collab/tracing-hooks`](https://github.com/apm-js-collab/tracing-hooks), a helper for [`orchestrion-js`](https://github.com/apm-js-collab/orchestrion-js) which injects the relevant tracing channels into the instrumented package. We then define a `Subscriber` that listens to these channels for specific events (`asyncEnd`, `asyncStart`, `start`, `end`, and/or `error`) and record what we need from the event data and context (which is preserved through `AsyncLocalStorage`).
4+
5+
## How to Implement
6+
7+
Like `Shim`-based instrumentation, subscriber-based instrumentation largely relies on the specific way the package you're instrumenting is written. However, all packages will follow the below template/guidelines.
8+
9+
### Disable Shim-based Instrumentation
10+
11+
1. While you are testing your new instrumentation, it's important that you're not also testing the old instrumentation. However, you likely want to keep the old instrumentation around while you're refactoring for reference. The easiest way to do this is to just remove the instrumentation reference in `lib/instrumentations.js`.
12+
2. When you are done refactoring, make sure to delete all files in `lib/instrumentation/<package_name>` (or `lib/instrumentation/<package_name>.js`), the tests in `test/unit/instrumentation/<package_name> `that rely on `Shim`-based wrapping, and the instrumentation reference in `instrumentations.js` if you haven't already.
13+
14+
### Instrumentation Config
15+
16+
Now, it is time to look at the internals of the package you're instrumenting. Again, the `Shim`-based instrumentation you're replacing should be helpful here to get the gist of the package internals.
17+
18+
1. Create a folder within `lib/subscribers` with the name of the package. If the package is not a new instrumentation, use the same name as the one in `test/versioned`. If it is a new instrumentation and the package name is exceptionally long or complicated or is prefixed with `@`, you may provide a shortened version (e.g. `@modelcontextprotocol/sdk `->`mcp-sdk `). Remember to name the versioned test folder with the same name (`test/versioned/<package_name|shortened_package_name>`).
19+
2. Create a `config.js` within that folder.
20+
3. Add a reference to the new config file in [`lib/subscriber-configs.js`](../subscriber-configs.js):
21+
1. ```javascript
22+
...require('./subscribers/<package_name>/config')
23+
```
24+
4. Identify one function to start with and find where this function lives in the package i.e. the relative file path.
25+
5. Once you have found where the function you're instrumenting is, you need to determine how it is defined in [AST](https://astexplorer.net/), so that `orchestrion` can properly wrap it. You can then add the proper instrumentation object to your `config.js`.
26+
27+
#### Config Template
28+
29+
```javascript
30+
// in lib/subscribers/<package_name>/config.js
31+
32+
const config = {
33+
path: './<package_name>/<subscriber_name>.js',
34+
instrumentations: [
35+
{
36+
/**
37+
* By convention, we prefix channelNames with `nr_` and include at least the expressionName or methodName.
38+
* It could also contain the moduleName or className to further differentiate between subscribers.
39+
*/
40+
channelName: 'nr_functionName',
41+
/**
42+
* <version_range> should be the same as the old instrumentation.
43+
* However, you may need to break apart that range across different configs
44+
* because code can differ from version to version.
45+
*
46+
* <relative_path_to_file> is the relative path from the instrumented package
47+
* to the file that contains the code that you want to instrument
48+
*/
49+
module: { name: '<package_name>', versionRange: '<version_range>', filePath: '<relative_path_to_file>' },
50+
functionQuery: {
51+
className: 'ClassName',
52+
methodName: 'methodName',
53+
// If the function is `async`, specify `Async` here. Callback functions are typically `Sync`.
54+
kind: 'Sync' | 'Async'
55+
},
56+
// OR
57+
// if not a Class
58+
functionQuery: {
59+
moduleName: 'ModuleName',
60+
expressionName: 'expressionName',
61+
kind: 'Sync' | 'Async'
62+
},
63+
// OR
64+
// if the module is not defined
65+
functionQuery: {
66+
expressionName: 'expressionName',
67+
kind: 'Sync' | 'Async'
68+
}
69+
}
70+
/**
71+
* If you need to use the same instrumentation/subscriber for differently structured code
72+
* (e.g. an older version of the package uses moduleName/expressionName, but now the
73+
* same function is className/methodName), you'd add another instrumentation object
74+
* to the array of `instrumentations`.
75+
*/
76+
]
77+
}
78+
79+
module.exports = {
80+
// Note: config(s) must be in an array, even if there's just one
81+
'<package_name>': [
82+
config
83+
]
84+
}
85+
```
86+
87+
### Creating the Subscribers
88+
89+
Now that you have the config specified for the function that you are instrumenting, you'll then need to create a subscriber for it. All subscribers should at least inherit from the base [`Subscriber`](./base.js) with the exception of subscribers that do not rely on `orchestrion` to create their tracing channels (they inherit from the `node:diagnostics_channel` `Subscriber` in [`dc-base.js`](./dc-base.js)).
90+
91+
#### Datastore Subscribers
92+
93+
For datastore queries, inherit from `DbQuerySubscriber`. For datastore operations, inherit from `DbOperationSubscriber`.
94+
95+
#### Messaging Subscribers
96+
97+
For messaging queues, inherit from `MessageConsumerSubscriber` or `MessageProducerSubscriber.`
98+
99+
#### Propagation Subscriber
100+
101+
Many packages are written in a way that causes `AsyncLocalStorage` to lose context. A common instance of this is multiple nestled callbacks. To solve this, create `PropagationSubscriber`s for inner functions within the one you are instrumenting. You may have to experiment a few times to know which function is losing context; in most cases, you should only need one `PropagationSubscriber` to support another subscriber.

lib/subscribers/amqplib/consume.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
const MessageConsumer = require('../message-consumer')
6+
const MessageConsumerSubscriber = require('../message-consumer')
77
const { getParameters, getParametersFromMessage, TEMP_RE } = require('./utils')
88

9-
class ConsumeSubscriber extends MessageConsumer {
9+
class ConsumeSubscriber extends MessageConsumerSubscriber {
1010
constructor({ agent, logger, channelName = 'nr_consume' }) {
1111
super({ agent, logger, packageName: 'amqplib', channelName, system: 'RabbitMQ', type: 'Exchange', callback: 1, transport: 'AMQP' })
1212
this.segmentName = 'amqplib.Channel#consume'

lib/subscribers/application-logs.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
*/
55

66
const Subscriber = require('./base')
7-
const { isApplicationLoggingEnabled, isLocalDecoratingEnabled, isLogForwardingEnabled, isMetricsEnabled, createModuleUsageMetric, incrementLoggingLinesMetrics } = require('../util/application-logging')
7+
const { isApplicationLoggingEnabled,
8+
isLocalDecoratingEnabled,
9+
isLogForwardingEnabled,
10+
isMetricsEnabled, createModuleUsageMetric,
11+
incrementLoggingLinesMetrics } = require('../util/application-logging')
812

913
class ApplicationLogsSubscriber extends Subscriber {
10-
constructor({ agent, logger, packageName, channelName, }) {
11-
super({ agent, logger, packageName, channelName, })
14+
constructor({ agent, logger, packageName, channelName }) {
15+
super({ agent, logger, packageName, channelName })
1216
this.requireActiveTx = false
1317
this.libMetricCreated = false
1418
}

lib/subscribers/base.js

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,31 @@ const ArrayPrototypeSplice = makeCall(Array.prototype.splice)
3838
* register handlers for. For any name in the set, a corresponding method
3939
* must exist on the subscriber instance. The method will be passed the
4040
* event object. Possible event names are `start`, `end`, `asyncStart`,
41-
* `asyncEnd`, and `error`. @link https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel
42-
* @property {boolean} [opaque=false] If true, any children segments will not be created
43-
* @property {boolean} [internal=false] If true, any children segments from the same library will not be created
41+
* `asyncEnd`, and `error`.
42+
*
43+
* See {@link https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel}
44+
* @property {boolean} [opaque=false] If true, any children segments will not be created.
45+
* @property {boolean} [internal=false] If true, any children segments from the same library
46+
* will not be created.
4447
* @property {string} [prefix='orchestrion:'] String to prepend to diagnostics
4548
* channel event names. This provides a namespace for the events we are
4649
* injecting into a module.
47-
* @property {boolean} [requireActiveTx=true] If true, the subscriber will only handle events when there is an active transaction.
48-
* @property {boolean} [propagateContext=false] If true, it will bind `asyncStart` to the store and re-propagate the active context. It will also attach the `transaction` to the event in `start.bindStore`. This is used for functions that queue async code and context is lost.
49-
* @property {string} id A unique identifier for the subscriber, combining the prefix, package name, and channel name.
50+
* @property {boolean} [requireActiveTx=true] If true, the subscriber will only handle events
51+
* when there is an active transaction.
52+
* @property {boolean} [propagateContext=false] If true, it will bind `asyncStart` to the store
53+
* and re-propagate the active context. It will also attach the `transaction` to the event in
54+
* `start.bindStore`. This is used for functions that queue async code and context is lost.
55+
* @property {string} id A unique identifier for the subscriber, combining the prefix, package
56+
* name, and channel name.
5057
* @property {TracingChannel} channel The tracing channel instance this subscriber will be monitoring.
5158
* @property {AsyncLocalStorage} store The async local storage instance used for context management.
52-
* @property {number} callback position of callback if it needs to be wrapped for instrumentation. -1 means last argument
59+
* @property {number} [callback=null] Position of callback if it needs to be wrapped for instrumentation.
60+
* -1 means last argument.
5361
*/
5462
class Subscriber {
63+
/**
64+
* @param {SubscriberParams} params the subscriber constructor params
65+
*/
5566
constructor({ agent, logger, packageName, channelName }) {
5667
this.agent = agent
5768
this.logger = logger.child({ component: `${packageName}-subscriber` })
@@ -121,15 +132,15 @@ class Subscriber {
121132

122133
/**
123134
* Wraps an event emitter and runs the wrap in the new context
124-
* If the event is `end` or `error` it'll touch the active segment
135+
* If the event is `end` or `error`, it'll touch the active segment.
125136
*
126137
* @param {object} params to function
127138
* @param {Array} params.args arguments to function
128139
* @param {number} params.index index of argument to wrap
129-
* @param {string} params.name name of emit function
140+
* @param {string} [params.name] name of emit function, defaults to 'emit'
130141
* @param {Context} params.ctx context to bind wrapped emit to
131142
*/
132-
wrapEventEmitter({ args, index, name, ctx }) {
143+
wrapEventEmitter({ args, index, name = 'emit', ctx }) {
133144
const orig = args[index][name]
134145
const self = this
135146
function wrapEmit(...emitArgs) {
@@ -148,7 +159,7 @@ class Subscriber {
148159
* If the segment is successfully created, it will be started and added to the context.
149160
* @param {object} params - Parameters for creating the segment
150161
* @param {string} params.name - The name of the segment
151-
* @param {object} params.recorder - Optional recorder for the segment
162+
* @param {object} [params.recorder] - Optional recorder for the segment
152163
* @param {Context} params.ctx - The context containing the parent segment and transaction
153164
* @returns {Context} - The updated context with the new segment or existing context if segment creation fails
154165
*/
@@ -191,8 +202,8 @@ class Subscriber {
191202
}
192203

193204
/**
194-
* Not all subscribers need change the context on `start`.
195-
* This is defined on base to fulfill those use cases
205+
* Not all subscribers need to change the context on an event.
206+
* This is defined on base to fulfill those use cases.
196207
* @param {object} data event passed to handler
197208
* @param {Context} ctx context passed to handler
198209
* @returns {Context} either new context or existing

lib/subscribers/db-operation.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ const { DB } = require('../metrics/names')
99

1010
/**
1111
* Subscriber for database operation events e.g. `connect`.
12+
*
13+
* @property {string} operation The name of the database operation.
14+
* Used to name the segment created for this operation.
1215
*/
1316
class DbOperationSubscriber extends DbSubscriber {
1417
/**

lib/subscribers/db-query.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,24 @@ const { DB } = require('../metrics/names')
99
const ParsedStatement = require('../db/parsed-statement')
1010
const parseSql = require('../db/query-parsers/sql')
1111

12+
/**
13+
* Defines a subscriber for database queries.
14+
*
15+
* @property {string} queryString Must be set by any class that extends this
16+
* one prior to this class's `.handler` method being invoked. It represents
17+
* the statement being sent to the database.
18+
* @property {boolean} [isBatch] Set to true if this subscriber is for batch queries.
19+
*/
1220
class DbQuerySubscriber extends DbSubscriber {
21+
/**
22+
* On an event, this handler will create a segment with the name
23+
* `{DB.STATEMENT}/{DB_SYSTEM}/{COLLECTION}/{OPERATION}`.
24+
* Other than `DB.STATEMENT` which is a constant, these values
25+
* are extracted from `this.queryString`.
26+
* @param {object} data event data
27+
* @param {Context} ctx the context
28+
* @returns {Context} the updated context with the new segment
29+
*/
1330
handler(data, ctx) {
1431
const queryString = this.queryString
1532
const parsed = this.parseQueryString(queryString)

lib/subscribers/db.js

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,25 @@ const Subscriber = require('./base')
77
const { ALL, DB } = require('../metrics/names')
88
const urltils = require('../util/urltils')
99

10+
/**
11+
* @property {object} parameters Must be set by subclasses prior to invoking
12+
* the `addAttributes` method. Should contain the following keys:
13+
* - `host`: The database host.
14+
* - `database_name`: The name of the database.
15+
* - `port_path_or_id`: The database port, path, or ID.
16+
* @property {string} system The database system being used (e.g., MySQL, MongoDB).
17+
*/
1018
class DbSubscriber extends Subscriber {
19+
/**
20+
* @param {object} params constructor params object
21+
* @param {object} params.agent A New Relic Node.js agent instance.
22+
* @param {object} params.logger An agent logger instance.
23+
* @param {string} params.packageName The package name being instrumented.
24+
* This is what a developer would provide to the `require` function.
25+
* @param {string} params.channelName A unique name for the diagnostics channel
26+
* that will be created and monitored.
27+
* @param {string} params.system The database system being used (e.g., MySQL, MongoDB).
28+
*/
1129
constructor({ agent, logger, packageName, channelName, system }) {
1230
super({ agent, logger, packageName, channelName })
1331
this.system = system
@@ -29,8 +47,12 @@ class DbSubscriber extends Subscriber {
2947
return this.config.datastore_tracer.database_name_reporting.enabled
3048
}
3149

50+
/**
51+
* Adds `this.parameters` to the active segment.
52+
* @param {object} segment the current segment
53+
*/
3254
addAttributes(segment) {
33-
for (let [key, value] of Object.entries(this.parameters)) {
55+
for (let [key, value] of Object.entries(this.parameters ?? {})) {
3456
if (this.instanceKeys.includes(key) && !this.instanceReporting) {
3557
continue
3658
}

lib/subscribers/message-consumer.js

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ const messageTransactionRecorder = require('#agentlib/metrics/recorders/message-
1010
const isString = require('#agentlib/util/is-string.js')
1111

1212
/**
13-
* A message consumer does the following:
14-
* 1. Calling consume creates a segment if in an active transaction
15-
* 2. For every consumption, typically registered as a callback, it will create a transaction of type `message`, create a baseSegment, add both segment and trace attributes, and assign the `message-transaction` timeslice metrics
16-
*
1713
* @typedef {object} MessageConsumerParams
1814
* @property {object} agent A New Relic Node.js agent instance.
1915
* @property {object} logger An agent logger instance.
@@ -26,7 +22,16 @@ const isString = require('#agentlib/util/is-string.js')
2622
* @property {number} callback if consumer is callback based, indicates index of callback
2723
* @property {string} transport identifier of the transport(see Transaction.TRANSPORT_TYPES)
2824
*/
29-
class MessageConsumer extends Subscriber {
25+
26+
/**
27+
* A message consumer does the following:
28+
* 1. Calling consume creates a segment if in an active transaction
29+
* 2. For every consumption, typically registered as a callback, it will create a transaction of type `message`, create a baseSegment, add both segment and trace attributes, and assign the `message-transaction` timeslice metrics
30+
*/
31+
class MessageConsumerSubscriber extends Subscriber {
32+
/**
33+
* @param {MessageConsumerParams} params constructor params
34+
*/
3035
constructor({ agent, logger, packageName, channelName, system, type, callback, transport }) {
3136
super({ agent, logger, packageName, channelName })
3237
this.system = system
@@ -143,4 +148,4 @@ class MessageConsumer extends Subscriber {
143148
}
144149
}
145150

146-
module.exports = MessageConsumer
151+
module.exports = MessageConsumerSubscriber

lib/subscribers/message-producer.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,7 @@ const Subscriber = require('./base')
88
const genericRecorder = require('#agentlib/metrics/recorders/generic.js')
99

1010
/**
11-
*
12-
* Creates the segment for a message producer call. Injects appropriate DT/CAT
13-
* headers if enabled.
14-
*
15-
* @typedef {object} MessageProducerParams
11+
* @typedef {object} MessageProducerParams
1612
* @property {object} agent A New Relic Node.js agent instance.
1713
* @property {object} logger An agent logger instance.
1814
* @property {string} packageName The package name being instrumented.
@@ -22,7 +18,15 @@ const genericRecorder = require('#agentlib/metrics/recorders/generic.js')
2218
* @property {string} system canonical mapping of system(i.e. - Kafka, RabbitMq, SNS, SQS)
2319
* @property {string} type destinationType: Exchange, Queue, Topic
2420
*/
21+
22+
/**
23+
* Creates the segment for a message producer call. Injects appropriate DT/CAT
24+
* headers if enabled.
25+
*/
2526
class MessageProducerSubscriber extends Subscriber {
27+
/**
28+
* @param {MessageProducerParams} params constructor params
29+
*/
2630
constructor({ agent, logger, packageName, channelName, system, type }) {
2731
super({ agent, logger, packageName, channelName })
2832
this.system = system

0 commit comments

Comments
 (0)