diff --git a/.trivyignore.yaml b/.trivyignore.yaml index c874c9c..c52a1b4 100644 --- a/.trivyignore.yaml +++ b/.trivyignore.yaml @@ -3,4 +3,5 @@ misconfigurations: - id: AVD-DS-0026 paths: - Dockerfile.consumer # Jira -67 + - Dockerfile.exporter # ephemeral container, healthcheck not necessary - Dockerfile.tools diff --git a/Dockerfile.exporter b/Dockerfile.exporter new file mode 100644 index 0000000..580fa71 --- /dev/null +++ b/Dockerfile.exporter @@ -0,0 +1,22 @@ +FROM senzing/senzingsdk-runtime:4.0.0 + +USER root + +RUN apt-get update \ + && apt-get -y install --no-install-recommends curl python3 python3-pip python3-boto3 \ + && apt-get -y autoremove \ + && apt-get -y clean + +WORKDIR /app +COPY middleware/* . + +# Add a new user and switch to it. +RUN useradd -m -u 1001 senzing +USER senzing + +ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app + +# Flush buffer - helps with print statements. +ENV PYTHONUNBUFFERED=1 + +CMD ["python3", "exporter.py"] diff --git a/README.md b/README.md index dbeae2f..74e45e4 100644 --- a/README.md +++ b/README.md @@ -57,14 +57,32 @@ and run the consumer service on our local machine. This setup includes: docker compose run tools /bin/bash ``` -1. Spinning up a consumer service: +### Consumer + +Spinning up a consumer service (intended to be a continually-running process; in +a production scenarion, multiple instances could be running simultaneously as +needed): ```bash - docker compose run --env AWS_PROFILE_NAME="some-profile-name" --env \ + docker compose run --env AWS_PROFILE=some-profile-name --env \ Q_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-senzing-local-ingest" \ consumer ``` +#### Exporter + +Spinning up the exporter middleware (this is intended to be an ephemeral +container): + + ```bash + docker compose run --env AWS_PROFILE=localstack --env S3_BUCKET_NAME=sqs-senzing-local-export exporter + ``` + +You can view information about files in the Localstack S3 bucket by visiting +this URL: + + http://localhost:4566/sqs-senzing-local-export + ### Using the services The `tools` container should be configured with the necessary environment diff --git a/docker-compose.yaml b/docker-compose.yaml index ae5923f..98ca73b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -100,6 +100,31 @@ services: # Note: `.aws` mount might not be needed later. - ~/.aws:/home/senzing/.aws + exporter: + build: + context: . + dockerfile: Dockerfile.exporter + depends_on: + - db + environment: + AWS_ENDPOINT_URL: http://localstack:4566 + SENZING_ENGINE_CONFIGURATION_JSON: >- + { + "PIPELINE": { + "CONFIGPATH": "/etc/opt/senzing", + "LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}", + "RESOURCEPATH": "/opt/senzing/er/resources", + "SUPPORTPATH": "/opt/senzing/data" + }, + "SQL": { + "BACKEND": "SQL", + "CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable" + } + } + volumes: + - ~/.aws:/home/senzing/.aws + - ~/tmp:/tmp # Should you wish to write files to host. + volumes: localstack: postgres: diff --git a/middleware/exporter.py b/middleware/exporter.py new file mode 100644 index 0000000..6b29fc4 --- /dev/null +++ b/middleware/exporter.py @@ -0,0 +1,150 @@ +import json +import io +import os +import time +import sys +import boto3 +import senzing as sz + +from loglib import * +log = retrieve_logger() + +try: + log.info('Importing senzing_core library . . .') + import senzing_core as sz_core + log.info('Imported senzing_core successfully.') +except Exception as e: + log.error('Importing senzing_core library failed.') + log.error(e) + sys.exit(1) + +if 'SENZING_ENGINE_CONFIGURATION_JSON' not in os.environ: + log.error('SENZING_ENGINE_CONFIGURATION_JSON environment variable required.') + sys.exit(1) +SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON']) + +if 'S3_BUCKET_NAME' not in os.environ: + log.error('S3_BUCKET_NAME environment variable required.') + sys.exit(1) +S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME'] + +EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS + +#------------------------------------------------------------------------------- + +def ts(): + '''Return current timestamp in ms as a str''' + return str(int(round(time.time() * 1000))) + +def make_s3_client(): + try: + sess = boto3.Session() + if 'AWS_ENDPOINT_URL' in os.environ: + return sess.client('s3', endpoint_url=os.environ['AWS_ENDPOINT_URL']) + else: + return sess.client('s3') + except Exception as e: + log.error(AWS_TAG + str(e)) + sys.exit(1) + +def go(): + ''' + Exports Senzing JSON entity report data into a buffer, then + uploads the buffer as a file into the output S3 bucket. + + References: + - https://garage.senzing.com/sz-sdk-python/senzing.html#senzing.szengine.SzEngine.export_json_entity_report + - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj.html + ''' + + # Init S3 client + s3 = make_s3_client() + + # Init senzing engine object. + # Note that Senzing engine object cannot be passed around between functions, + # else it will be eagerly cleaned up / destroyed and no longer usable. + sz_eng = None + try: + sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG) + sz_eng = sz_factory.create_engine() + log.info(SZ_TAG + 'Senzing engine object instantiated.') + except sz.SzError as sz_err: + log.error(SZ_TAG + str(sz_err)) + sys.exit(1) + except Exception as e: + log.error(str(e)) + sys.exit(1) + + # init buffer + buff = io.BytesIO() + + # Retrieve output from sz into buff + # sz will export JSONL lines; we add the chars necessary to make + # the output as a whole be a single JSON blob. + log.info(SZ_TAG + 'Starting export from Senzing.') + try: + export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS) + log.info(SZ_TAG + 'Obtained export_json_entity_report handle.') + buff.write('['.encode('utf-8')) + while 1: + log.debug(SZ_TAG + 'Fetching chunk...') + chunk = sz_eng.fetch_next(export_handle) + if not chunk: + break + buff.write(chunk.encode('utf-8')) + log.debug('Wrote chunk to buffer.') + buff.write(','.encode('utf-8')) + sz_eng.close_export_report(export_handle) + log.info(SZ_TAG + 'Closed export handle.') + buff.seek(-1, os.SEEK_CUR) # toss out last comma + buff.write(']'.encode('utf-8')) + log.info('Total bytes exported/buffered: ' + str(buff.getbuffer().nbytes)) + except sz.SzError as err: + log.error(SZ_TAG + str(err)) + except Exception as e: + log.error(str(e)) + + # rewind buffer + buff.seek(0) + buff.flush() + + # write buff to S3 using upload_fileobj + fname = 'output-' + ts() + '.json' + log.info(AWS_TAG + 'About to upload JSON file ' + fname + ' to S3 ...') + try: + s3.upload_fileobj(buff, S3_BUCKET_NAME, fname) + log.info(AWS_TAG + 'Successfully uploaded file.') + except Exception as e: + log.error(AWS_TAG + str(e)) + +#------------------------------------------------------------------------------- + +def main(): + log.info('====================') + log.info(' EXPORTER') + log.info(' *STARTED*') + log.info('====================') + go() + +if __name__ == '__main__': main() + +#------------------------------------------------------------------------------- +# ad-hoc test funcs - might move later + +def _upload_test_file_to_s3(): + print("Starting test upload to S3 ...") + s3 = make_s3_client() + print(s3) + fname = 'hemingway.txt' + resp = s3.upload_file(fname, S3_BUCKET_NAME, fname) + print(resp) + print('Upload successful.') + +def _get_file_from_s3(key): + '''Get file from S3 and write to /tmp (use docker-compose to map this + to desired directory on host machine).''' + print('Grabbing file...') + s3 = make_s3_client() + resp = s3.download_file(S3_BUCKET_NAME, key, '/tmp/'+key) + print(resp) + print('Done.') diff --git a/middleware/loglib.py b/middleware/loglib.py new file mode 100644 index 0000000..c7381aa --- /dev/null +++ b/middleware/loglib.py @@ -0,0 +1,24 @@ +import logging +import sys + +AWS_TAG = '[AWS] ' +SZ_TAG = '[SZ] ' +DLQ_TAG = '[DLQ] ' + +_instantiated_loggers = {} + +def retrieve_logger(tag='default'): + global _instantiated_loggers + if tag in _instantiated_loggers: + return _instantiated_loggers[tag] + else: + x = logging.getLogger(tag) + x.setLevel(logging.INFO) + handler = logging.StreamHandler() + fmt = logging.Formatter( + '[%(asctime)s] [%(levelname)s] ' \ + '[%(filename)s:%(lineno)s] %(message)s') + handler.setFormatter(fmt) + x.addHandler(handler) + _instantiated_loggers[tag] = x + return x diff --git a/middleware/sample-data/hemingway.txt b/middleware/sample-data/hemingway.txt new file mode 100644 index 0000000..95a12f8 --- /dev/null +++ b/middleware/sample-data/hemingway.txt @@ -0,0 +1,7 @@ +It was very late and everyone had left the cafe except an old man who sat in the +shadow the leaves of the tree made against the electric light. In the day time the +street was dusty, but at night the dew settled the dust and the old man liked to sit +late because he was deaf and now at night it was quiet and he felt the difference. +The two waiters inside the cafe knew that the old man was a little drunk, and while +he was a good client they knew that if he became too drunk he would leave without +paying, so they kept watch on him.