Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .trivyignore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ misconfigurations:
- id: AVD-DS-0026
paths:
- Dockerfile.consumer # Jira -67
- Dockerfile.exporter # ephemeral container, healthcheck not necessary
- Dockerfile.tools
22 changes: 22 additions & 0 deletions Dockerfile.exporter
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM senzing/senzingsdk-runtime:4.0.0

USER root

RUN apt-get update \
&& apt-get -y install --no-install-recommends curl python3 python3-pip python3-boto3 \
&& apt-get -y autoremove \
&& apt-get -y clean

WORKDIR /app
COPY middleware/* .

# Add a new user and switch to it.
RUN useradd -m -u 1001 senzing
USER senzing

ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app

# Flush buffer - helps with print statements.
ENV PYTHONUNBUFFERED=1

CMD ["python3", "exporter.py"]
22 changes: 20 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,32 @@ and run the consumer service on our local machine. This setup includes:
docker compose run tools /bin/bash
```

1. Spinning up a consumer service:
### Consumer

Spinning up a consumer service (intended to be a continually-running process; in
a production scenarion, multiple instances could be running simultaneously as
needed):

```bash
docker compose run --env AWS_PROFILE_NAME="some-profile-name" --env \
docker compose run --env AWS_PROFILE=some-profile-name --env \
Q_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-senzing-local-ingest" \
consumer
```

#### Exporter

Spinning up the exporter middleware (this is intended to be an ephemeral
container):

```bash
docker compose run --env AWS_PROFILE=localstack --env S3_BUCKET_NAME=sqs-senzing-local-export exporter
```

You can view information about files in the Localstack S3 bucket by visiting
this URL:

http://localhost:4566/sqs-senzing-local-export

### Using the services

The `tools` container should be configured with the necessary environment
Expand Down
25 changes: 25 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,31 @@ services:
# Note: `.aws` mount might not be needed later.
- ~/.aws:/home/senzing/.aws

exporter:
build:
context: .
dockerfile: Dockerfile.exporter
depends_on:
- db
environment:
AWS_ENDPOINT_URL: http://localstack:4566
SENZING_ENGINE_CONFIGURATION_JSON: >-
{
"PIPELINE": {
"CONFIGPATH": "/etc/opt/senzing",
"LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}",
"RESOURCEPATH": "/opt/senzing/er/resources",
"SUPPORTPATH": "/opt/senzing/data"
},
"SQL": {
"BACKEND": "SQL",
"CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable"
}
}
volumes:
- ~/.aws:/home/senzing/.aws
- ~/tmp:/tmp # Should you wish to write files to host.

volumes:
localstack:
postgres:
150 changes: 150 additions & 0 deletions middleware/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import json
import io
import os
import time
import sys
import boto3
import senzing as sz

from loglib import *
log = retrieve_logger()

try:
log.info('Importing senzing_core library . . .')
import senzing_core as sz_core
log.info('Imported senzing_core successfully.')
except Exception as e:
log.error('Importing senzing_core library failed.')
log.error(e)
sys.exit(1)

if 'SENZING_ENGINE_CONFIGURATION_JSON' not in os.environ:
log.error('SENZING_ENGINE_CONFIGURATION_JSON environment variable required.')
sys.exit(1)
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])

if 'S3_BUCKET_NAME' not in os.environ:
log.error('S3_BUCKET_NAME environment variable required.')
sys.exit(1)
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']

EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS

#-------------------------------------------------------------------------------

def ts():
'''Return current timestamp in ms as a str'''
return str(int(round(time.time() * 1000)))

def make_s3_client():
try:
sess = boto3.Session()
if 'AWS_ENDPOINT_URL' in os.environ:
return sess.client('s3', endpoint_url=os.environ['AWS_ENDPOINT_URL'])
else:
return sess.client('s3')
except Exception as e:
log.error(AWS_TAG + str(e))
sys.exit(1)

def go():
'''
Exports Senzing JSON entity report data into a buffer, then
uploads the buffer as a file into the output S3 bucket.

References:
- https://garage.senzing.com/sz-sdk-python/senzing.html#senzing.szengine.SzEngine.export_json_entity_report
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj.html
'''

# Init S3 client
s3 = make_s3_client()

# Init senzing engine object.
# Note that Senzing engine object cannot be passed around between functions,
# else it will be eagerly cleaned up / destroyed and no longer usable.
sz_eng = None
try:
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
sz_eng = sz_factory.create_engine()
log.info(SZ_TAG + 'Senzing engine object instantiated.')
except sz.SzError as sz_err:
log.error(SZ_TAG + str(sz_err))
sys.exit(1)
except Exception as e:
log.error(str(e))
sys.exit(1)

# init buffer
buff = io.BytesIO()

# Retrieve output from sz into buff
# sz will export JSONL lines; we add the chars necessary to make
# the output as a whole be a single JSON blob.
log.info(SZ_TAG + 'Starting export from Senzing.')
try:
export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS)
log.info(SZ_TAG + 'Obtained export_json_entity_report handle.')
buff.write('['.encode('utf-8'))
while 1:
log.debug(SZ_TAG + 'Fetching chunk...')
chunk = sz_eng.fetch_next(export_handle)
if not chunk:
break
buff.write(chunk.encode('utf-8'))
log.debug('Wrote chunk to buffer.')
buff.write(','.encode('utf-8'))
sz_eng.close_export_report(export_handle)
log.info(SZ_TAG + 'Closed export handle.')
buff.seek(-1, os.SEEK_CUR) # toss out last comma
buff.write(']'.encode('utf-8'))
log.info('Total bytes exported/buffered: ' + str(buff.getbuffer().nbytes))
except sz.SzError as err:
log.error(SZ_TAG + str(err))
except Exception as e:
log.error(str(e))

# rewind buffer
buff.seek(0)
buff.flush()

# write buff to S3 using upload_fileobj
fname = 'output-' + ts() + '.json'
log.info(AWS_TAG + 'About to upload JSON file ' + fname + ' to S3 ...')
try:
s3.upload_fileobj(buff, S3_BUCKET_NAME, fname)
log.info(AWS_TAG + 'Successfully uploaded file.')
except Exception as e:
log.error(AWS_TAG + str(e))

#-------------------------------------------------------------------------------

def main():
log.info('====================')
log.info(' EXPORTER')
log.info(' *STARTED*')
log.info('====================')
go()

if __name__ == '__main__': main()

#-------------------------------------------------------------------------------
# ad-hoc test funcs - might move later

def _upload_test_file_to_s3():
print("Starting test upload to S3 ...")
s3 = make_s3_client()
print(s3)
fname = 'hemingway.txt'
resp = s3.upload_file(fname, S3_BUCKET_NAME, fname)
print(resp)
print('Upload successful.')

def _get_file_from_s3(key):
'''Get file from S3 and write to /tmp (use docker-compose to map this
to desired directory on host machine).'''
print('Grabbing file...')
s3 = make_s3_client()
resp = s3.download_file(S3_BUCKET_NAME, key, '/tmp/'+key)
print(resp)
print('Done.')
24 changes: 24 additions & 0 deletions middleware/loglib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging
import sys

AWS_TAG = '[AWS] '
SZ_TAG = '[SZ] '
DLQ_TAG = '[DLQ] '

_instantiated_loggers = {}

def retrieve_logger(tag='default'):
global _instantiated_loggers
if tag in _instantiated_loggers:
return _instantiated_loggers[tag]
else:
x = logging.getLogger(tag)
x.setLevel(logging.INFO)
handler = logging.StreamHandler()
fmt = logging.Formatter(
'[%(asctime)s] [%(levelname)s] ' \
'[%(filename)s:%(lineno)s] %(message)s')
handler.setFormatter(fmt)
x.addHandler(handler)
_instantiated_loggers[tag] = x
return x
7 changes: 7 additions & 0 deletions middleware/sample-data/hemingway.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
It was very late and everyone had left the cafe except an old man who sat in the
shadow the leaves of the tree made against the electric light. In the day time the
street was dusty, but at night the dew settled the dust and the old man liked to sit
late because he was deaf and now at night it was quiet and he felt the difference.
The two waiters inside the cafe knew that the old man was a little drunk, and while
he was a good client they knew that if he became too drunk he would leave without
paying, so they kept watch on him.