Skip to content

Commit cbee373

Browse files
committed
feat: Exporter middleware
1 parent ae49c50 commit cbee373

File tree

5 files changed

+202
-0
lines changed

5 files changed

+202
-0
lines changed

Dockerfile.exporter

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
FROM senzing/senzingsdk-runtime:4.0.0
2+
3+
USER root
4+
5+
RUN apt-get update \
6+
&& apt-get -y install --no-install-recommends curl python3 python3-pip python3-boto3 \
7+
&& apt-get -y autoremove \
8+
&& apt-get -y clean
9+
10+
WORKDIR /app
11+
COPY middleware/* .
12+
13+
# Add a new user and switch to it.
14+
RUN useradd -m -u 1001 senzing
15+
USER senzing
16+
17+
ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app
18+
19+
# Flush buffer - helps with print statements.
20+
ENV PYTHONUNBUFFERED=1
21+
22+
CMD ["python3", "exporter.py"]

docker-compose.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,31 @@ services:
100100
# Note: `.aws` mount might not be needed later.
101101
- ~/.aws:/home/senzing/.aws
102102

103+
exporter:
104+
build:
105+
context: .
106+
dockerfile: Dockerfile.exporter
107+
depends_on:
108+
- db
109+
environment:
110+
AWS_ENDPOINT_URL: http://localstack:4566
111+
SENZING_ENGINE_CONFIGURATION_JSON: >-
112+
{
113+
"PIPELINE": {
114+
"CONFIGPATH": "/etc/opt/senzing",
115+
"LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}",
116+
"RESOURCEPATH": "/opt/senzing/er/resources",
117+
"SUPPORTPATH": "/opt/senzing/data"
118+
},
119+
"SQL": {
120+
"BACKEND": "SQL",
121+
"CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable"
122+
}
123+
}
124+
volumes:
125+
- ~/.aws:/home/senzing/.aws
126+
- ~/tmp:/tmp # Should you wish to write files to host.
127+
103128
volumes:
104129
localstack:
105130
postgres:

middleware/exporter.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import json
2+
import io
3+
import os
4+
import time
5+
import sys
6+
import boto3
7+
import senzing as sz
8+
9+
from loglib import *
10+
log = retrieve_logger()
11+
12+
try:
13+
log.info('Importing senzing_core library . . .')
14+
import senzing_core as sz_core
15+
log.info('Imported senzing_core successfully.')
16+
except Exception as e:
17+
log.error('Importing senzing_core library failed.')
18+
log.error(e)
19+
sys.exit(1)
20+
21+
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
22+
S3_BUCKET = 'sqs-senzing-local-export'
23+
24+
# TODO which flags do we need?
25+
EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS
26+
27+
#-------------------------------------------------------------------------------
28+
29+
def ts():
30+
'''Return current timestamp in ms as a str'''
31+
return str(int(round(time.time() * 1000)))
32+
33+
def make_s3_client():
34+
sess = boto3.Session()
35+
if 'AWS_ENDPOINT_URL'in os.environ:
36+
return sess.client('s3', endpoint_url=os.environ['AWS_ENDPOINT_URL'])
37+
else:
38+
return sess.client('s3')
39+
40+
def go():
41+
'''
42+
Exports Senzing JSON entity report data into a buffer, then
43+
uploads the buffer as a file into the output S3 bucket.
44+
45+
References:
46+
- https://garage.senzing.com/sz-sdk-python/senzing.html#senzing.szengine.SzEngine.export_json_entity_report
47+
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj.html
48+
'''
49+
50+
# Init S3 client
51+
s3 = make_s3_client()
52+
53+
# Init senzing engine object.
54+
# Note that Senzing engine object cannot be passed around between functions,
55+
# else it will be eagerly cleaned up / destroyed and no longer usable.
56+
sz_eng = None
57+
try:
58+
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
59+
sz_eng = sz_factory.create_engine()
60+
log.info(SZ_TAG + 'Senzing engine object instantiated.')
61+
except sz.SzError as sz_err:
62+
log.error(SZ_TAG + str(sz_err))
63+
sys.exit(1)
64+
except Exception as e:
65+
log.error(str(e))
66+
sys.exit(1)
67+
68+
# init buffer
69+
buff = io.BytesIO()
70+
71+
# retrieve output from sz into buff
72+
try:
73+
export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS)
74+
while 1:
75+
chunk = sz_eng.fetch_next(export_handle)
76+
if not chunk:
77+
break
78+
buff.write(chunk.encode('utf-8'))
79+
sz_eng.close_export_report(export_handle)
80+
except sz.SzError as err:
81+
print(err)
82+
83+
# rewind buffer
84+
buff.seek(0)
85+
86+
# write buff to S3 using upload_fileobj
87+
fname = 'output-' + ts() + '.json'
88+
resp = s3.upload_fileobj(buff, S3_BUCKET, fname)
89+
90+
print(resp)
91+
return resp
92+
93+
#-------------------------------------------------------------------------------
94+
95+
def main():
96+
log.info('====================')
97+
log.info(' EXPORTER')
98+
log.info(' *STARTED*')
99+
log.info('====================')
100+
go()
101+
102+
if __name__ == '__main__': main()
103+
104+
#-------------------------------------------------------------------------------
105+
# test funcs (to maybe relocate)
106+
107+
def upload_test_file():
108+
print("Start test upload to S3 ...")
109+
s3 = make_s3_client()
110+
print(s3)
111+
fname = 'hemingway.txt'
112+
resp = s3.upload_file(fname, S3_BUCKET, fname)
113+
print(resp)
114+
print("SUCCESSFUL")
115+
116+
def get_file():
117+
key = 'output-1757986924786.json'
118+
print('Grabbing file...')
119+
s3 = make_s3_client()
120+
resp = s3.download_file(S3_BUCKET, key, '/tmp/'+key)
121+
print(resp)
122+
print('done grabbing file.')
123+
#f = open('/tmp/'+key)
124+
#print(f.readlines())

middleware/loglib.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import logging
2+
import sys
3+
4+
AWS_TAG = '[AWS] '
5+
SZ_TAG = '[SZ] '
6+
DLQ_TAG = '[DLQ] '
7+
8+
_instantiated_loggers = {}
9+
10+
def retrieve_logger(tag='default'):
11+
global _instantiated_loggers
12+
if tag in _instantiated_loggers:
13+
return _instantiated_loggers[tag]
14+
else:
15+
x = logging.getLogger(tag)
16+
x.setLevel(logging.INFO)
17+
handler = logging.StreamHandler()
18+
fmt = logging.Formatter(
19+
'[%(asctime)s] [%(levelname)s] ' \
20+
'[%(filename)s:%(lineno)s] %(message)s')
21+
handler.setFormatter(fmt)
22+
x.addHandler(handler)
23+
_instantiated_loggers[tag] = x
24+
return x
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
It was very late and everyone had left the cafe except an old man who sat in the
2+
shadow the leaves of the tree made against the electric light. In the day time the
3+
street was dusty, but at night the dew settled the dust and the old man liked to sit
4+
late because he was deaf and now at night it was quiet and he felt the difference.
5+
The two waiters inside the cafe knew that the old man was a little drunk, and while
6+
he was a good client they knew that if he became too drunk he would leave without
7+
paying, so they kept watch on him.

0 commit comments

Comments
 (0)