Skip to content

Commit 0ca2ee9

Browse files
committed
Initial commit of consumer service + utils
1 parent 7a68b91 commit 0ca2ee9

File tree

8 files changed

+432
-0
lines changed

8 files changed

+432
-0
lines changed

Dockerfile.consumer

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
FROM senzing/senzingsdk-runtime:latest
2+
3+
USER root
4+
5+
RUN apt-get update \
6+
&& apt-get -y install curl python3 python3-pip python3-boto3 \
7+
&& apt-get -y autoremove \
8+
&& apt-get -y clean
9+
10+
WORKDIR /app
11+
COPY middleware/* .
12+
13+
# Add a new user and switch to it.
14+
RUN useradd -m -u 1001 senzing
15+
USER senzing
16+
17+
ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app
18+
19+
# Flush buffer - helps with print statements.
20+
ENV PYTHONUNBUFFERED=1
21+
22+
CMD ["python3", "consumer.py"]

docker-compose.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,31 @@ services:
7575
}
7676
}
7777
78+
consumer:
79+
build:
80+
context: .
81+
dockerfile: Dockerfile.consumer
82+
depends_on:
83+
- db
84+
environment:
85+
AWS_ENDPOINT_URL: http://localstack:4566
86+
SENZING_ENGINE_CONFIGURATION_JSON: >-
87+
{
88+
"PIPELINE": {
89+
"CONFIGPATH": "/etc/opt/senzing",
90+
"LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}",
91+
"RESOURCEPATH": "/opt/senzing/er/resources",
92+
"SUPPORTPATH": "/opt/senzing/data"
93+
},
94+
"SQL": {
95+
"BACKEND": "SQL",
96+
"CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable"
97+
}
98+
}
99+
volumes:
100+
# Note: `.aws` mount might not be needed later.
101+
- ~/.aws:/home/senzing/.aws
102+
78103
volumes:
79104
localstack:
80105
postgres:

middleware/consumer.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import json
2+
import os
3+
import time
4+
import sys
5+
import boto3
6+
import senzing as sz
7+
try:
8+
print('Importing senzing_core library . . .')
9+
import senzing_core as sz_core
10+
print('Imported senzing_core successfully.')
11+
except Exception as e:
12+
print('Importing senzing_core library failed.')
13+
print(e)
14+
sys.exit(1)
15+
16+
# TODO add DLQ logic (needs jira ticket probably).
17+
18+
AWS_PROFILE_NAME = os.environ['AWS_PROFILE_NAME']
19+
Q_URL = os.environ['Q_URL']
20+
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
21+
22+
POLL_SECONDS = 20 # 20 seconds is SQS max
23+
HIDE_MESSAGE_SECONDS = 600 # SQS visibility timeout
24+
25+
#-------------------------------------------------------------------------------
26+
27+
def _make_boto_session(fpath=None):
28+
'''fpath is path to json file with keys:
29+
- aws_access_key_id
30+
- aws_secret_access_key
31+
- aws_session_token
32+
- region
33+
(Same keys should typically be present in .aws/credentials file
34+
if using profile.)'''
35+
if fpath:
36+
return boto3.Session(**json.load(open(fpath)))
37+
else:
38+
return boto3.Session(profile_name=AWS_PROFILE_NAME)
39+
40+
def _make_sqs_client(boto_session):
41+
return boto_session.client('sqs')
42+
43+
# TODO add try/except code
44+
# TODO add logging
45+
def init():
46+
'''Returns sqs client object'''
47+
sess = _make_boto_session()
48+
sqs = sess.client('sqs')
49+
return sqs
50+
51+
# TODO add try/except code
52+
# TODO add logging
53+
def get_msgs(sqs, q_url):
54+
'''Generator function; returns a single SQS msg at a time.
55+
Pertinent keys in an SQS message include:
56+
- MessageId
57+
- ReceiptHandle -- you'll need this to delete the msg later
58+
- Body -- here, should be the JSONL record as a string
59+
'''
60+
while 1:
61+
print('waiting for msg')
62+
resp = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1,
63+
WaitTimeSeconds=POLL_SECONDS)
64+
if 'Messages' in resp and len(resp['Messages']) == 1:
65+
yield resp['Messages'][0]
66+
67+
# TODO add try/except code
68+
# TODO add logging
69+
def del_msg(sqs, q_url, receipt_handle):
70+
return sqs.delete_message(QueueUrl=q_url, ReceiptHandle=receipt_handle)
71+
72+
#-------------------------------------------------------------------------------
73+
74+
# TODO add more try/except code as needed
75+
# TODO add logging
76+
def go():
77+
'''Starts the Consumer process; runs indefinitely.'''
78+
79+
# SQS client
80+
sqs = init()
81+
82+
# Spin up msgs generator
83+
msgs = get_msgs(sqs, Q_URL)
84+
85+
# Senzing init tasks.
86+
sz_eng = None
87+
try:
88+
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
89+
90+
# Init data source list.
91+
# TODO data source registry logic should be set up as a one-time task
92+
# outside of this app somewhere else.
93+
sz_config_mgr = sz_factory.create_configmanager()
94+
sz_config = sz_config_mgr.create_config_from_config_id(
95+
sz_config_mgr.get_default_config_id())
96+
sz_config.register_data_source("CUSTOMERS")
97+
sz_config_mgr.set_default_config(sz_config.export(), 'default')
98+
99+
# Init senzing engine object.
100+
# Senzing engine object cannot be passed around between functions,
101+
# else it will be eagerly cleaned up / destroyed and no longer usable.
102+
sz_eng = sz_factory.create_engine()
103+
except sz.SzError as err:
104+
# TODO log error
105+
print(err)
106+
sys.exit(1)
107+
108+
# TODO log ReceiptHandle, other *generic* debug-facing information as appropriate.
109+
while 1:
110+
print('Starting primary loop iteration . . .')
111+
msg = next(msgs)
112+
receipt_handle, body = msg['ReceiptHandle'], msg['Body']
113+
rcd = json.loads(body)
114+
try:
115+
# TODO add logging
116+
# TODO Use signal lib to handle stalled records (i.e., still
117+
# processing >5 minutes)
118+
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body,
119+
sz.SzEngineFlags.SZ_WITH_INFO)
120+
print(resp)
121+
except sz.SzError as err:
122+
# TODO log / handle
123+
print(err)
124+
del_msg(sqs, Q_URL, receipt_handle)
125+
126+
def main():
127+
print('====================')
128+
print(' CONSUMER')
129+
print(' STARTED')
130+
print('====================')
131+
go()
132+
133+
if __name__ == '__main__': main()

middleware/redoer.py

Whitespace-only changes.

0 commit comments

Comments
 (0)