Skip to content

Commit b64caf1

Browse files
authored
feat: Exporter middleware (#13)
1 parent 693ee0d commit b64caf1

File tree

7 files changed

+249
-2
lines changed

7 files changed

+249
-2
lines changed

.trivyignore.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ misconfigurations:
33
- id: AVD-DS-0026
44
paths:
55
- Dockerfile.consumer # Jira -67
6+
- Dockerfile.exporter # ephemeral container, healthcheck not necessary
67
- Dockerfile.tools

Dockerfile.exporter

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
FROM senzing/senzingsdk-runtime:4.0.0
2+
3+
USER root
4+
5+
RUN apt-get update \
6+
&& apt-get -y install --no-install-recommends curl python3 python3-pip python3-boto3 \
7+
&& apt-get -y autoremove \
8+
&& apt-get -y clean
9+
10+
WORKDIR /app
11+
COPY middleware/* .
12+
13+
# Add a new user and switch to it.
14+
RUN useradd -m -u 1001 senzing
15+
USER senzing
16+
17+
ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app
18+
19+
# Flush buffer - helps with print statements.
20+
ENV PYTHONUNBUFFERED=1
21+
22+
CMD ["python3", "exporter.py"]

README.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,32 @@ and run the consumer service on our local machine. This setup includes:
5757
docker compose run tools /bin/bash
5858
```
5959

60-
1. Spinning up a consumer service:
60+
### Consumer
61+
62+
Spinning up a consumer service (intended to be a continually-running process; in
63+
a production scenarion, multiple instances could be running simultaneously as
64+
needed):
6165

6266
```bash
63-
docker compose run --env AWS_PROFILE_NAME="some-profile-name" --env \
67+
docker compose run --env AWS_PROFILE=some-profile-name --env \
6468
Q_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-senzing-local-ingest" \
6569
consumer
6670
```
6771

72+
#### Exporter
73+
74+
Spinning up the exporter middleware (this is intended to be an ephemeral
75+
container):
76+
77+
```bash
78+
docker compose run --env AWS_PROFILE=localstack --env S3_BUCKET_NAME=sqs-senzing-local-export exporter
79+
```
80+
81+
You can view information about files in the Localstack S3 bucket by visiting
82+
this URL:
83+
84+
http://localhost:4566/sqs-senzing-local-export
85+
6886
### Using the services
6987

7088
The `tools` container should be configured with the necessary environment

docker-compose.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,31 @@ services:
100100
# Note: `.aws` mount might not be needed later.
101101
- ~/.aws:/home/senzing/.aws
102102

103+
exporter:
104+
build:
105+
context: .
106+
dockerfile: Dockerfile.exporter
107+
depends_on:
108+
- db
109+
environment:
110+
AWS_ENDPOINT_URL: http://localstack:4566
111+
SENZING_ENGINE_CONFIGURATION_JSON: >-
112+
{
113+
"PIPELINE": {
114+
"CONFIGPATH": "/etc/opt/senzing",
115+
"LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}",
116+
"RESOURCEPATH": "/opt/senzing/er/resources",
117+
"SUPPORTPATH": "/opt/senzing/data"
118+
},
119+
"SQL": {
120+
"BACKEND": "SQL",
121+
"CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable"
122+
}
123+
}
124+
volumes:
125+
- ~/.aws:/home/senzing/.aws
126+
- ~/tmp:/tmp # Should you wish to write files to host.
127+
103128
volumes:
104129
localstack:
105130
postgres:

middleware/exporter.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import json
2+
import io
3+
import os
4+
import time
5+
import sys
6+
import boto3
7+
import senzing as sz
8+
9+
from loglib import *
10+
log = retrieve_logger()
11+
12+
try:
13+
log.info('Importing senzing_core library . . .')
14+
import senzing_core as sz_core
15+
log.info('Imported senzing_core successfully.')
16+
except Exception as e:
17+
log.error('Importing senzing_core library failed.')
18+
log.error(e)
19+
sys.exit(1)
20+
21+
if 'SENZING_ENGINE_CONFIGURATION_JSON' not in os.environ:
22+
log.error('SENZING_ENGINE_CONFIGURATION_JSON environment variable required.')
23+
sys.exit(1)
24+
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
25+
26+
if 'S3_BUCKET_NAME' not in os.environ:
27+
log.error('S3_BUCKET_NAME environment variable required.')
28+
sys.exit(1)
29+
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
30+
31+
EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS
32+
33+
#-------------------------------------------------------------------------------
34+
35+
def ts():
36+
'''Return current timestamp in ms as a str'''
37+
return str(int(round(time.time() * 1000)))
38+
39+
def make_s3_client():
40+
try:
41+
sess = boto3.Session()
42+
if 'AWS_ENDPOINT_URL' in os.environ:
43+
return sess.client('s3', endpoint_url=os.environ['AWS_ENDPOINT_URL'])
44+
else:
45+
return sess.client('s3')
46+
except Exception as e:
47+
log.error(AWS_TAG + str(e))
48+
sys.exit(1)
49+
50+
def go():
51+
'''
52+
Exports Senzing JSON entity report data into a buffer, then
53+
uploads the buffer as a file into the output S3 bucket.
54+
55+
References:
56+
- https://garage.senzing.com/sz-sdk-python/senzing.html#senzing.szengine.SzEngine.export_json_entity_report
57+
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj.html
58+
'''
59+
60+
# Init S3 client
61+
s3 = make_s3_client()
62+
63+
# Init senzing engine object.
64+
# Note that Senzing engine object cannot be passed around between functions,
65+
# else it will be eagerly cleaned up / destroyed and no longer usable.
66+
sz_eng = None
67+
try:
68+
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
69+
sz_eng = sz_factory.create_engine()
70+
log.info(SZ_TAG + 'Senzing engine object instantiated.')
71+
except sz.SzError as sz_err:
72+
log.error(SZ_TAG + str(sz_err))
73+
sys.exit(1)
74+
except Exception as e:
75+
log.error(str(e))
76+
sys.exit(1)
77+
78+
# init buffer
79+
buff = io.BytesIO()
80+
81+
# Retrieve output from sz into buff
82+
# sz will export JSONL lines; we add the chars necessary to make
83+
# the output as a whole be a single JSON blob.
84+
log.info(SZ_TAG + 'Starting export from Senzing.')
85+
try:
86+
export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS)
87+
log.info(SZ_TAG + 'Obtained export_json_entity_report handle.')
88+
buff.write('['.encode('utf-8'))
89+
while 1:
90+
log.debug(SZ_TAG + 'Fetching chunk...')
91+
chunk = sz_eng.fetch_next(export_handle)
92+
if not chunk:
93+
break
94+
buff.write(chunk.encode('utf-8'))
95+
log.debug('Wrote chunk to buffer.')
96+
buff.write(','.encode('utf-8'))
97+
sz_eng.close_export_report(export_handle)
98+
log.info(SZ_TAG + 'Closed export handle.')
99+
buff.seek(-1, os.SEEK_CUR) # toss out last comma
100+
buff.write(']'.encode('utf-8'))
101+
log.info('Total bytes exported/buffered: ' + str(buff.getbuffer().nbytes))
102+
except sz.SzError as err:
103+
log.error(SZ_TAG + str(err))
104+
except Exception as e:
105+
log.error(str(e))
106+
107+
# rewind buffer
108+
buff.seek(0)
109+
buff.flush()
110+
111+
# write buff to S3 using upload_fileobj
112+
fname = 'output-' + ts() + '.json'
113+
log.info(AWS_TAG + 'About to upload JSON file ' + fname + ' to S3 ...')
114+
try:
115+
s3.upload_fileobj(buff, S3_BUCKET_NAME, fname)
116+
log.info(AWS_TAG + 'Successfully uploaded file.')
117+
except Exception as e:
118+
log.error(AWS_TAG + str(e))
119+
120+
#-------------------------------------------------------------------------------
121+
122+
def main():
123+
log.info('====================')
124+
log.info(' EXPORTER')
125+
log.info(' *STARTED*')
126+
log.info('====================')
127+
go()
128+
129+
if __name__ == '__main__': main()
130+
131+
#-------------------------------------------------------------------------------
132+
# ad-hoc test funcs - might move later
133+
134+
def _upload_test_file_to_s3():
135+
print("Starting test upload to S3 ...")
136+
s3 = make_s3_client()
137+
print(s3)
138+
fname = 'hemingway.txt'
139+
resp = s3.upload_file(fname, S3_BUCKET_NAME, fname)
140+
print(resp)
141+
print('Upload successful.')
142+
143+
def _get_file_from_s3(key):
144+
'''Get file from S3 and write to /tmp (use docker-compose to map this
145+
to desired directory on host machine).'''
146+
print('Grabbing file...')
147+
s3 = make_s3_client()
148+
resp = s3.download_file(S3_BUCKET_NAME, key, '/tmp/'+key)
149+
print(resp)
150+
print('Done.')

middleware/loglib.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import logging
2+
import sys
3+
4+
AWS_TAG = '[AWS] '
5+
SZ_TAG = '[SZ] '
6+
DLQ_TAG = '[DLQ] '
7+
8+
_instantiated_loggers = {}
9+
10+
def retrieve_logger(tag='default'):
11+
global _instantiated_loggers
12+
if tag in _instantiated_loggers:
13+
return _instantiated_loggers[tag]
14+
else:
15+
x = logging.getLogger(tag)
16+
x.setLevel(logging.INFO)
17+
handler = logging.StreamHandler()
18+
fmt = logging.Formatter(
19+
'[%(asctime)s] [%(levelname)s] ' \
20+
'[%(filename)s:%(lineno)s] %(message)s')
21+
handler.setFormatter(fmt)
22+
x.addHandler(handler)
23+
_instantiated_loggers[tag] = x
24+
return x
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
It was very late and everyone had left the cafe except an old man who sat in the
2+
shadow the leaves of the tree made against the electric light. In the day time the
3+
street was dusty, but at night the dew settled the dust and the old man liked to sit
4+
late because he was deaf and now at night it was quiet and he felt the difference.
5+
The two waiters inside the cafe knew that the old man was a little drunk, and while
6+
he was a good client they knew that if he became too drunk he would leave without
7+
paying, so they kept watch on him.

0 commit comments

Comments
 (0)