Skip to content

Commit 04b42d8

Browse files
committed
add otel redoer queue count gauge
1 parent dcd2b73 commit 04b42d8

File tree

6 files changed

+40
-35
lines changed

6 files changed

+40
-35
lines changed

Dockerfile.consumer

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ RUN pip3 install --break-system-packages opentelemetry-distro
1111
# Above, `--break-system-packages` flag overrides the
1212
# "This environment is externally managed" error that calling pip
1313
# would otherwise incur here.
14-
# TODO Research alternative approach.
1514

1615
WORKDIR /app
1716
COPY middleware/* .

Dockerfile.redoer

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ RUN apt-get update \
77
&& apt-get -y autoremove \
88
&& apt-get -y clean
99

10+
RUN pip3 install --break-system-packages opentelemetry-distro
11+
# Above, `--break-system-packages` flag overrides the
12+
# "This environment is externally managed" error that calling pip
13+
# would otherwise incur here.
14+
1015
WORKDIR /app
1116
COPY middleware/* .
1217

docker-compose.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ services:
1919
init-db:
2020
image: senzing/init-database:latest
2121
depends_on:
22-
- db
22+
db:
23+
condition: service_healthy
2324
environment:
2425
SENZING_TOOLS_DATASOURCES: PEOPLE
2526
SENZING_TOOLS_ENGINE_CONFIGURATION_JSON: >-

middleware/consumer.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,8 @@ def clean_up(signum, frm):
172172

173173
# OTel setup #
174174
meter = otel.init('consumer')
175-
otel_msgs_counter = meter.create_counter(
176-
'consumer.messages.count',
177-
description='Counter incremented with each message processed by the consumer.')
178-
otel_durations = meter.create_histogram(
179-
'consumer.messages.duration',
180-
'Message processing duration for the consumer.')
175+
otel_msgs_counter = meter.create_counter('consumer.messages.count')
176+
otel_durations = meter.create_histogram('consumer.messages.duration')
181177
# end OTel setup #
182178

183179
while 1:

middleware/otel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
def init(service_name):
1414
'''Perform general OTel setup and return meter obj.'''
1515
resource = Resource.create(attributes={SERVICE_NAME: service_name})
16-
metric_reader = PeriodicExportingMetricReader(ConsoleMetricExporter())
16+
metric_reader = PeriodicExportingMetricReader(ConsoleMetricExporter(), export_interval_millis=5000)
1717
meter_provider = MeterProvider(resource=resource,
1818
metric_readers=[metric_reader])
1919

middleware/redoer.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from timeout_handling import *
1212

1313
import otel
14+
from opentelemetry import metrics
1415

1516
try:
1617
log.info('Importing senzing_core library . . .')
@@ -51,29 +52,31 @@ def go():
5152
log.error(fmterr(e))
5253

5354
# OTel setup #
55+
log.info('Starting OTel setup.')
5456
meter = otel.init('redoer')
55-
56-
otel_msgs_counter = meter.create_counter(
57-
'redoer.messages.count',
58-
description='Counter incremented with each message processed by the redoer.')
59-
60-
otel_durations = meter.create_histogram(
61-
'redoer.messages.duration',
62-
'Message processing duration for the redoer.')
57+
otel_msgs_counter = meter.create_counter('redoer.messages.count')
58+
otel_durations = meter.create_histogram('redoer.messages.duration')
6359

6460
def _queue_count_steward(tally):
65-
'''Coroutine function; emits like a generator but values can also be
66-
sent into it too. Its generator-like behavior let's us pass it into the Gauge
67-
constructor.'''
61+
'''Coroutine function; this lets us both:
62+
- 1) easily pass in updated tally values via `send`
63+
- 2) accommodate OTel's spec of a "a generator that yields
64+
iterables of Observation"
65+
Ref: https://opentelemetry-python.readthedocs.io/en/latest/api/metrics.html#opentelemetry.metrics.Meter.create_observable_gauge
66+
'''
6867
while 1:
69-
newtally = yield tally
70-
tally = newtally if newtally
71-
queue_count_steward = _queue_count_steward(0)
68+
newtally = yield [metrics.Observation(tally)]
69+
# We check the type b/c OTel internals will send in a
70+
# CallbackOptions object that we'll want to ignore;
71+
# meanwhile type `int` means we sent in an updated tally value.
72+
# ourselves.
73+
if newtally and type(newtally) is int: tally = newtally
74+
queue_count_steward = _queue_count_steward(-1)
7275
next(queue_count_steward) # prime it.
7376
otel_queue_gauge = meter.create_observable_gauge(
74-
'redoer.queue.count',
75-
description='Current number of items in the redo queue.',
76-
[queue_count_steward])
77+
'redoer.queue.count', [queue_count_steward])
78+
79+
log.info('Finished OTel setup.')
7780
# end OTel setup #
7881

7982
log.info('Starting primary loop.')
@@ -126,20 +129,21 @@ def _queue_count_steward(tally):
126129
except sz.SzError as sz_err:
127130
log.error(SZ_TAG + fmterr(sz_err))
128131

129-
finish = time.perf_counter()
130-
otel_msgs_counter.add(1,
131-
{'status': success_status,
132-
'service': 'redoer',
133-
'environment': RUNTIME_ENV})
134-
otel_durations.record(finish - start,
135-
{'status': success_status,
136-
'service': 'redoer',
137-
'environment': RUNTIME_ENV})
132+
finish = time.perf_counter()
133+
otel_msgs_counter.add(1,
134+
{'status': success_status,
135+
'service': 'redoer',
136+
'environment': RUNTIME_ENV})
137+
otel_durations.record(finish - start,
138+
{'status': success_status,
139+
'service': 'redoer',
140+
'environment': RUNTIME_ENV})
138141

139142
else:
140143
try:
141144
tally = sz_eng.count_redo_records()
142145
queue_count_steward.send(tally)
146+
#otel_queue_gauge.record(tally)
143147
log.debug(SZ_TAG + 'Current redo count: ' + str(tally))
144148
except sz.SzRetryableError as sz_ret_err:
145149
log.error(SZ_TAG + fmterr(sz_ret_err))

0 commit comments

Comments
 (0)