diff --git a/Dockerfile.consumer b/Dockerfile.consumer index 4c8c281..f2c1461 100644 --- a/Dockerfile.consumer +++ b/Dockerfile.consumer @@ -7,6 +7,11 @@ RUN apt-get update \ && apt-get -y autoremove \ && apt-get -y clean +RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http +# Above, `--break-system-packages` flag overrides the +# "This environment is externally managed" error that calling pip +# would otherwise incur here. + WORKDIR /app COPY middleware/* . diff --git a/Dockerfile.exporter b/Dockerfile.exporter index 42bdad7..1c099bf 100644 --- a/Dockerfile.exporter +++ b/Dockerfile.exporter @@ -7,6 +7,11 @@ RUN apt-get update \ && apt-get -y autoremove \ && apt-get -y clean +RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http +# Above, `--break-system-packages` flag overrides the +# "This environment is externally managed" error that calling pip +# would otherwise incur here. + WORKDIR /app COPY middleware/* . diff --git a/Dockerfile.redoer b/Dockerfile.redoer index 2fac85d..315bb6c 100644 --- a/Dockerfile.redoer +++ b/Dockerfile.redoer @@ -7,6 +7,11 @@ RUN apt-get update \ && apt-get -y autoremove \ && apt-get -y clean +RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http +# Above, `--break-system-packages` flag overrides the +# "This environment is externally managed" error that calling pip +# would otherwise incur here. + WORKDIR /app COPY middleware/* . diff --git a/README.md b/README.md index 23d714e..a5f7391 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,10 @@ _Environment variables (see docker-compose.yaml):_ initially retrieved from SQS - Sets the maximum amount of time the Consumer will wait for a Senzing `add_record` to complete before bailing and moving on. +- `RUNTIME_ENV` -- the runtime environment (e.g., "Dev", "Prod", etc.). + - Optional; defaults to "unknown". +- `OTEL_USE_OTLP_EXPORTER` -- 'true' or 'false' (default is false) +- `OTEL_EXPORTER_OTLP_ENDPOINT` _Mounts in docker-compose.yaml:_ @@ -243,6 +247,10 @@ _Environment variables:_ - When either (a) Senzing's internal redo queue is empty or (b) a `SzRetryableError` is encountered, this sets how long to wait before attemping the next Senzing op. +- `RUNTIME_ENV` -- the runtime environment (e.g., "Dev", "Prod", etc.). + - Optional; defaults to "unknown". +- `OTEL_USE_OTLP_EXPORTER` -- 'true' or 'false' (default is false) +- `OTEL_EXPORTER_OTLP_ENDPOINT` ### Exporter @@ -261,6 +269,10 @@ docker compose run --env AWS_PROFILE=localstack --env S3_BUCKET_NAME=sqs-senzing - `FOLDER_NAME` -- optional (defaults to `exporter-outputs`); folder inside S3 where the file will be placed. - `LOG_LEVEL` -- optional; defaults to `INFO`. +- `RUNTIME_ENV` -- the runtime environment (e.g., "Dev", "Prod", etc.). + - Optional; defaults to "unknown". +- `OTEL_USE_OTLP_EXPORTER` -- 'true' or 'false' (default is false) +- `OTEL_EXPORTER_OTLP_ENDPOINT` _Mounts in docker-compose.yaml:_ diff --git a/docker-compose.yaml b/docker-compose.yaml index 6d9ba2c..47e62b6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,7 +19,8 @@ services: init-db: image: senzing/init-database:latest depends_on: - - db + db: + condition: service_healthy environment: SENZING_TOOLS_DATASOURCES: PEOPLE SENZING_TOOLS_ENGINE_CONFIGURATION_JSON: >- diff --git a/middleware/consumer.py b/middleware/consumer.py index b085415..8567279 100644 --- a/middleware/consumer.py +++ b/middleware/consumer.py @@ -11,6 +11,8 @@ from timeout_handling import * +import otel + try: log.info('Importing senzing_core library . . .') import senzing_core as sz_core @@ -22,6 +24,7 @@ Q_URL = os.environ['Q_URL'] SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420)) SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON']) +RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel POLL_SECONDS = 20 # 20 seconds is SQS max @@ -167,6 +170,14 @@ def clean_up(signum, frm): except Exception as e: log.error(fmterr(e)) + # OTel setup # + log.info('Starting OTel setup.') + meter = otel.init('consumer') + otel_msgs_counter = meter.create_counter('consumer.messages.count') + otel_durations = meter.create_histogram('consumer.messages.duration') + log.info('Finished OTel setup.') + # end OTel setup # + while 1: try: # Get next message. @@ -176,11 +187,15 @@ def clean_up(signum, frm): + receipt_handle) rcd = json.loads(body) + start = time.perf_counter() + success_status = otel.FAILURE # initial default value + try: # Process and send to Senzing. start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS) resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body) cancel_alarm_timer() + success_status = otel.SUCCESS log.debug(SZ_TAG + 'Successful add_record having ReceiptHandle: ' + receipt_handle) except KeyError as ke: @@ -208,6 +223,16 @@ def clean_up(signum, frm): else: del_msg(sqs, Q_URL, receipt_handle) + finish = time.perf_counter() + otel_msgs_counter.add(1, + {'status': success_status, + 'service': 'consumer', + 'environment': RUNTIME_ENV}) + otel_durations.record(finish - start, + {'status': success_status, + 'service': 'consumer', + 'environment': RUNTIME_ENV}) + except Exception as e: log.error(fmterr(e)) diff --git a/middleware/exporter.py b/middleware/exporter.py index dc95b62..d506913 100644 --- a/middleware/exporter.py +++ b/middleware/exporter.py @@ -10,6 +10,8 @@ from loglib import * log = retrieve_logger() +import otel + try: log.info('Importing senzing_core library . . .') import senzing_core as sz_core @@ -28,6 +30,7 @@ sys.exit(1) S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME'] FOLDER_NAME = os.environ.get('FOLDER_NAME', 'exporter-outputs') +RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS @@ -82,6 +85,14 @@ def go(): except Exception as e: log.error(fmterr(e)) + # OTel setup # + log.info('Starting OTel setup.') + meter = otel.init('exporter') + otel_exp_counter = meter.create_counter('exporter.export.count') + otel_duration = meter.create_histogram('exporter.export.duration') + log.info('Finished OTel setup.') + # end OTel setup # + # init buffer buff = io.BytesIO() @@ -89,6 +100,10 @@ def go(): # sz will export JSONL lines; we add the chars necessary to make # the output as a whole be a single JSON blob. log.info(SZ_TAG + 'Starting export from Senzing.') + + start = time.perf_counter() + success_status = otel.FAILURE # initial default state + try: export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS) log.info(SZ_TAG + 'Obtained export_json_entity_report handle.') @@ -121,9 +136,20 @@ def go(): try: s3.upload_fileobj(buff, S3_BUCKET_NAME, full_path) log.info(AWS_TAG + 'Successfully uploaded file.') + success_status = otel.SUCCESS except Exception as e: log.error(AWS_TAG + fmterr(e)) + finish = time.perf_counter() + otel_exp_counter.add(1, + {'status': success_status, + 'service': 'exporter', + 'environment': RUNTIME_ENV}) + otel_duration.record(finish - start, + {'status': success_status, + 'service': 'exporter', + 'environment': RUNTIME_ENV}) + #------------------------------------------------------------------------------- def main(): @@ -134,4 +160,3 @@ def main(): go() if __name__ == '__main__': main() - diff --git a/middleware/otel.py b/middleware/otel.py new file mode 100644 index 0000000..b6c9eaa --- /dev/null +++ b/middleware/otel.py @@ -0,0 +1,35 @@ +# References: +# https://opentelemetry.io/docs/languages/python/instrumentation/#metrics +# https://opentelemetry.io/docs/languages/python/exporters/#console +# https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/ + +import os + +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader) +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter + +INTERVAL_MS = 5000 + +def init(service_name): + '''Perform general OTel setup and return meter obj.''' + resource = Resource.create(attributes={SERVICE_NAME: service_name}) + if os.getenv('OTEL_USE_OTLP_EXPORTER', 'false').lower() == 'true': + metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter(), export_interval_millis=INTERVAL_MS) + else: + metric_reader = PeriodicExportingMetricReader(ConsoleMetricExporter(), export_interval_millis=INTERVAL_MS) + meter_provider = MeterProvider(resource=resource, + metric_readers=[metric_reader]) + + # Set the global default meter provider: + metrics.set_meter_provider(meter_provider) + + # Create a meter from the global meter provider: + return metrics.get_meter(service_name+'.meter') + +SUCCESS = 'success' +FAILURE = 'failure' diff --git a/middleware/redoer.py b/middleware/redoer.py index c8a6bd0..0c73b36 100644 --- a/middleware/redoer.py +++ b/middleware/redoer.py @@ -10,6 +10,9 @@ from timeout_handling import * +import otel +from opentelemetry import metrics + try: log.info('Importing senzing_core library . . .') import senzing_core as sz_core @@ -20,6 +23,7 @@ SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420)) SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON']) +RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel # How long to wait before attempting next Senzing op. WAIT_SECONDS = int(os.environ.get('WAIT_SECONDS', 10)) @@ -47,6 +51,33 @@ def go(): except Exception as e: log.error(fmterr(e)) + # OTel setup # + log.info('Starting OTel setup.') + meter = otel.init('redoer') + otel_msgs_counter = meter.create_counter('redoer.messages.count') + otel_durations = meter.create_histogram('redoer.messages.duration') + + def _queue_count_steward(tally): + '''Coroutine function; this lets us both: + - 1) easily pass in updated tally values via `send` + - 2) accommodate OTel's spec of a "a generator that yields + iterables of Observation" + Ref: https://opentelemetry-python.readthedocs.io/en/latest/api/metrics.html#opentelemetry.metrics.Meter.create_observable_gauge + ''' + while 1: + newtally = yield [metrics.Observation(tally)] + # We check the type b/c OTel internals will send in a + # CallbackOptions object that we'll want to ignore; + # meanwhile type `int` means we sent in an updated tally value + # ourselves. + if newtally and type(newtally) is int: tally = newtally + queue_count_steward = _queue_count_steward(-1) + next(queue_count_steward) # prime it. + meter.create_observable_gauge('redoer.queue.count', [queue_count_steward]) + + log.info('Finished OTel setup.') + # end OTel setup # + log.info('Starting primary loop.') # Approach: @@ -64,12 +95,14 @@ def go(): attempts_left = MAX_REDO_ATTEMPTS while 1: try: - if have_rcd: + start = time.perf_counter() + success_status = otel.FAILURE # initial default value try: start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS) sz_eng.process_redo_record(rcd) cancel_alarm_timer() + success_status = otel.SUCCESS have_rcd = 0 log.debug(SZ_TAG + 'Successfully redid one record via process_redo_record().') continue @@ -95,9 +128,20 @@ def go(): except sz.SzError as sz_err: log.error(SZ_TAG + fmterr(sz_err)) + finish = time.perf_counter() + otel_msgs_counter.add(1, + {'status': success_status, + 'service': 'redoer', + 'environment': RUNTIME_ENV}) + otel_durations.record(finish - start, + {'status': success_status, + 'service': 'redoer', + 'environment': RUNTIME_ENV}) + else: try: tally = sz_eng.count_redo_records() + queue_count_steward.send(tally) log.debug(SZ_TAG + 'Current redo count: ' + str(tally)) except sz.SzRetryableError as sz_ret_err: log.error(SZ_TAG + fmterr(sz_ret_err))