Skip to content

Commit ae49c50

Browse files
authored
feat: Initial implementation of the consumer (#9)
1 parent 61b7e2f commit ae49c50

File tree

11 files changed

+589
-2
lines changed

11 files changed

+589
-2
lines changed

.gitignore

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,135 @@ terraform.rc
3737
/**/*/package-lock.json
3838
/**/*/node_modules
3939
/**/*/LICENSE
40+
41+
#######################
42+
# general #
43+
#######################
44+
45+
# logs
46+
log/
47+
logs/
48+
*.log
49+
50+
# vim
51+
*~
52+
*.swp
53+
*.swo
54+
55+
# system files
56+
.DS_Store
57+
Thumbs.db
58+
thumbs.db
59+
Desktop.ini
60+
desktop.ini
61+
__MACOSX/
62+
63+
#######################
64+
# python #
65+
#######################
66+
#
67+
# see: https://github.com/github/gitignore/blob/master/Python.gitignore
68+
69+
# Byte-compiled / optimized / DLL files
70+
__pycache__/
71+
*.py[cod]
72+
*$py.class
73+
74+
# C extensions
75+
*.so
76+
77+
# Distribution / packaging
78+
.Python
79+
build/
80+
develop-eggs/
81+
dist/
82+
downloads/
83+
eggs/
84+
.eggs/
85+
lib/
86+
lib64/
87+
parts/
88+
sdist/
89+
var/
90+
wheels/
91+
*.egg-info/
92+
.installed.cfg
93+
*.egg
94+
MANIFEST
95+
96+
# PyInstaller
97+
# Usually these files are written by a python script from a template
98+
# before PyInstaller builds the exe, so as to inject date/other infos into it.
99+
*.manifest
100+
*.spec
101+
102+
# Installer logs
103+
pip-log.txt
104+
pip-delete-this-directory.txt
105+
106+
# Unit test / coverage reports
107+
htmlcov/
108+
.tox/
109+
.coverage
110+
.coverage.*
111+
.cache
112+
nosetests.xml
113+
coverage.xml
114+
*.cover
115+
.hypothesis/
116+
.pytest_cache/
117+
118+
# Translations
119+
*.mo
120+
*.pot
121+
122+
# Django stuff:
123+
*.log
124+
local_settings.py
125+
db.sqlite3
126+
127+
# Flask stuff:
128+
instance/
129+
.webassets-cache
130+
131+
# Scrapy stuff:
132+
.scrapy
133+
134+
# Sphinx documentation
135+
docs/_build/
136+
137+
# PyBuilder
138+
target/
139+
140+
# Jupyter Notebook
141+
.ipynb_checkpoints
142+
143+
# pyenv
144+
.python-version
145+
146+
# celery beat schedule file
147+
celerybeat-schedule
148+
149+
# SageMath parsed files
150+
*.sage.py
151+
152+
# Environments
153+
.venv
154+
env/
155+
venv/
156+
ENV/
157+
env.bak/
158+
venv.bak/
159+
160+
# Spyder project settings
161+
.spyderproject
162+
.spyproject
163+
164+
# Rope project settings
165+
.ropeproject
166+
167+
# mkdocs documentation
168+
/site
169+
170+
# mypy
171+
.mypy_cache/

.trivyignore.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ misconfigurations:
22
# The tools Dockerfile doesn't need a healthcheck.
33
- id: AVD-DS-0026
44
paths:
5+
- Dockerfile.consumer # Jira -67
56
- Dockerfile.tools

Dockerfile.consumer

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
FROM senzing/senzingsdk-runtime:4.0.0
2+
3+
USER root
4+
5+
RUN apt-get update \
6+
&& apt-get -y install --no-install-recommends curl python3 python3-pip python3-boto3 \
7+
&& apt-get -y autoremove \
8+
&& apt-get -y clean
9+
10+
WORKDIR /app
11+
COPY middleware/* .
12+
13+
# Add a new user and switch to it.
14+
RUN useradd -m -u 1001 senzing
15+
USER senzing
16+
17+
ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app
18+
19+
# Flush buffer - helps with print statements.
20+
ENV PYTHONUNBUFFERED=1
21+
22+
CMD ["python3", "consumer.py"]

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ and run the consumer service on our local machine. This setup includes:
3333
- An SQS queue named `sqs-senzing-local-redo`
3434
- A local PostgreSQL database
3535
- A database initialization container to set up the Senzing schema
36-
- ~~The Senzing consumer service~~ (in development)
36+
- The Senzing consumer service
3737
- A `tools` container with the [Senzing v4 SDK][senzing-sdk] and
3838
[`awslocal`][awslocal] wrapper for interacting with LocalStack services
3939

@@ -57,6 +57,14 @@ and run the consumer service on our local machine. This setup includes:
5757
docker compose run tools /bin/bash
5858
```
5959

60+
1. Spinning up a consumer service:
61+
62+
```bash
63+
docker compose run --env AWS_PROFILE_NAME="some-profile-name" --env \
64+
Q_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-senzing-local-ingest" \
65+
consumer
66+
```
67+
6068
### Using the services
6169

6270
The `tools` container should be configured with the necessary environment

docker-compose.yaml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ services:
2424
depends_on:
2525
- db
2626
environment:
27-
SENZING_TOOLS_DATASOURCES: PEOPLE
27+
SENZING_TOOLS_DATASOURCES: PEOPLE, CUSTOMERS
2828
SENZING_TOOLS_ENGINE_CONFIGURATION_JSON: >-
2929
{
3030
"PIPELINE": {
@@ -75,6 +75,31 @@ services:
7575
}
7676
}
7777
78+
consumer:
79+
build:
80+
context: .
81+
dockerfile: Dockerfile.consumer
82+
depends_on:
83+
- db
84+
environment:
85+
AWS_ENDPOINT_URL: http://localstack:4566
86+
SENZING_ENGINE_CONFIGURATION_JSON: >-
87+
{
88+
"PIPELINE": {
89+
"CONFIGPATH": "/etc/opt/senzing",
90+
"LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}",
91+
"RESOURCEPATH": "/opt/senzing/er/resources",
92+
"SUPPORTPATH": "/opt/senzing/data"
93+
},
94+
"SQL": {
95+
"BACKEND": "SQL",
96+
"CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable"
97+
}
98+
}
99+
volumes:
100+
# Note: `.aws` mount might not be needed later.
101+
- ~/.aws:/home/senzing/.aws
102+
78103
volumes:
79104
localstack:
80105
postgres:

middleware/consumer.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import json
2+
import os
3+
import time
4+
import sys
5+
import boto3
6+
import senzing as sz
7+
8+
from loglib import *
9+
log = retrieve_logger()
10+
11+
try:
12+
log.info('Importing senzing_core library . . .')
13+
import senzing_core as sz_core
14+
log.info('Imported senzing_core successfully.')
15+
except Exception as e:
16+
log.error('Importing senzing_core library failed.')
17+
log.error(e)
18+
sys.exit(1)
19+
20+
# TODO add DLQ logic (see DLG_TAG logging)
21+
22+
Q_URL = os.environ['Q_URL']
23+
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
24+
25+
POLL_SECONDS = 20 # 20 seconds is SQS max
26+
HIDE_MESSAGE_SECONDS = 600 # SQS visibility timeout
27+
28+
#-------------------------------------------------------------------------------
29+
30+
def _make_boto_session(fpath=None):
31+
'''
32+
If `AWS_PROFILE` environment variable is set, then `Session()` can be
33+
called with no arguments.
34+
35+
fpath is path to json file with keys:
36+
- aws_access_key_id
37+
- aws_secret_access_key
38+
- aws_session_token
39+
- region
40+
(Same keys should typically be present in .aws/credentials file
41+
if using profile.)'''
42+
if fpath:
43+
return boto3.Session(**json.load(open(fpath)))
44+
else:
45+
return boto3.Session()
46+
47+
def _make_sqs_client(boto_session):
48+
return boto_session.client('sqs')
49+
50+
def init():
51+
'''Returns sqs client object'''
52+
try:
53+
sess = _make_boto_session()
54+
sqs = sess.client('sqs')
55+
log.info(AWS_TAG + 'SQS client object instantiated.')
56+
return sqs
57+
except Exception as e:
58+
log.error(AWS_TAG + str(e))
59+
sys.exit(1)
60+
61+
def get_msgs(sqs, q_url):
62+
'''Generator function; returns a single SQS msg at a time.
63+
Pertinent keys in an SQS message include:
64+
- MessageId
65+
- ReceiptHandle -- you'll need this to delete the msg later
66+
- Body -- here, should be the JSONL record as a string
67+
'''
68+
while 1:
69+
print('waiting for msg')
70+
try:
71+
log.info(AWS_TAG + 'Polling SQS for the next message')
72+
resp = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1,
73+
WaitTimeSeconds=POLL_SECONDS)
74+
if 'Messages' in resp and len(resp['Messages']) == 1:
75+
yield resp['Messages'][0]
76+
except Exception as e:
77+
log.error(AWS_TAG + str(e))
78+
79+
def del_msg(sqs, q_url, receipt_handle):
80+
try:
81+
return sqs.delete_message(QueueUrl=q_url, ReceiptHandle=receipt_handle)
82+
except Exception as e:
83+
log.error(AWS_TAG + DLQ_TAG + 'SQS delete failure for ReceiptHandle: ' +
84+
ReceiptHandle + ' Additional info: ' + str(e))
85+
86+
#-------------------------------------------------------------------------------
87+
88+
def go():
89+
'''Starts the Consumer process; runs indefinitely.'''
90+
91+
# SQS client
92+
sqs = init()
93+
94+
# Spin up msgs generator
95+
log.info('Spinning up messages generator')
96+
msgs = get_msgs(sqs, Q_URL)
97+
98+
# Senzing init tasks.
99+
sz_eng = None
100+
try:
101+
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
102+
103+
# Init senzing engine object.
104+
# Senzing engine object cannot be passed around between functions,
105+
# else it will be eagerly cleaned up / destroyed and no longer usable.
106+
sz_eng = sz_factory.create_engine()
107+
log.info(SZ_TAG + 'Senzing engine object instantiated.')
108+
except sz.SzError as sz_err:
109+
log.error(SZ_TAG + str(sz_err))
110+
sys.exit(1)
111+
except Exception as e:
112+
log.error(str(e))
113+
sys.exit(1)
114+
115+
while 1:
116+
try:
117+
# Get next message.
118+
msg = next(msgs)
119+
receipt_handle, body = msg['ReceiptHandle'], msg['Body']
120+
log.info('SQS message retrieved, having ReceiptHandle: '
121+
+ receipt_handle)
122+
rcd = json.loads(body)
123+
124+
# Process and send to Senzing.
125+
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body,
126+
sz.SzEngineFlags.SZ_WITH_INFO)
127+
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
128+
+ receipt_handle)
129+
130+
# Delete msg from queue.
131+
del_msg(sqs, Q_URL, receipt_handle)
132+
except sz.SzError as sz_err:
133+
log.error(SZ_TAG + DLQ_TAG + str(sz_err))
134+
except Exception as e:
135+
log.error(str(e))
136+
sys.exit(1)
137+
138+
#-------------------------------------------------------------------------------
139+
140+
def main():
141+
log.info('====================')
142+
log.info(' CONSUMER')
143+
log.info(' STARTED')
144+
log.info('====================')
145+
go()
146+
147+
if __name__ == '__main__': main()

middleware/redoer.py

Whitespace-only changes.

0 commit comments

Comments
 (0)