Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ and run the consumer service on our local machine. This setup includes:
docker compose up -d
```

1. Access the `tools` container to interact with the services:

```bash
docker compose run tools /bin/bash
```

### Consumer

Spinning up a consumer service (intended to be a continually-running process; in
Expand All @@ -69,7 +63,7 @@ needed):
consumer
```

#### Exporter
### Exporter

Spinning up the exporter middleware (this is intended to be an ephemeral
container):
Expand All @@ -83,7 +77,13 @@ this URL:

http://localhost:4566/sqs-senzing-local-export

### Using the services
### Using the services (Tools container)

1. Access the `tools` container to interact with the services:

```bash
docker compose run tools /bin/bash
```

The `tools` container should be configured with the necessary environment
variables to interact with the SQS and S3 services in LocalStack, as well as the
Expand All @@ -98,6 +98,16 @@ awslocal sqs send-message \
--message-body '{"NAME_FULL":"Robert Smith", "DATE_OF_BIRTH":"7/4/1976", "PHONE_NUMBER":"555-555-2088"}'
```

View queues:

awslocal sqs list-queues

View queue message count, etc.:

awslocal sqs get-queue-attributes --queue-url \
http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-senzing-local-ingest \
--attribute-names All

You can use the Senzing SDK's `sz_*` commands to interact with the Senzing
database. For example, to add a new entity:

Expand All @@ -106,7 +116,6 @@ sz_command -C add_record \
PEOPLE 1 '{"NAME_FULL":"Robert Smith", "DATE_OF_BIRTH":"7/4/1976", "PHONE_NUMBER":"555-555-2088"}'
```


[awslocal]: https://docs.localstack.cloud/aws/integrations/aws-native-tools/aws-cli/#localstack-aws-cli-awslocal
[localstack]: https://www.localstack.cloud/
[senzing]: https://senzing.com
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
db:
image: bitnami/postgresql:17.6.0
image: bitnamisecure/postgresql:latest
environment:
# See https://github.com/bitnami/bitnami-docker-postgresql#configuration
POSTGRESQL_DATABASE: ${POSTGRES_DB:-G2}
Expand All @@ -24,7 +24,7 @@ services:
depends_on:
- db
environment:
SENZING_TOOLS_DATASOURCES: PEOPLE, CUSTOMERS
SENZING_TOOLS_DATASOURCES: PEOPLE
SENZING_TOOLS_ENGINE_CONFIGURATION_JSON: >-
{
"PIPELINE": {
Expand Down
75 changes: 65 additions & 10 deletions middleware/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def init():
sys.exit(1)

def get_msgs(sqs, q_url):
'''Generator function; returns a single SQS msg at a time.
'''Generator function; emits a single SQS msg at a time.
Pertinent keys in an SQS message include:
- MessageId
- ReceiptHandle -- you'll need this to delete the msg later
Expand All @@ -78,11 +78,49 @@ def get_msgs(sqs, q_url):

def del_msg(sqs, q_url, receipt_handle):
try:
log.debug(AWS_TAG + 'Deleting message having ReceiptHandle: ' + receipt_handle)
return sqs.delete_message(QueueUrl=q_url, ReceiptHandle=receipt_handle)
except Exception as e:
log.error(AWS_TAG + DLQ_TAG + 'SQS delete failure for ReceiptHandle: ' +
ReceiptHandle + ' Additional info: ' + str(e))

def make_msg_visible(sqs, q_url, receipt_handle):
'''Setting visibility timeout to 0 on an SQS message makes it visible again,
making it available (again) for consuming.'''
try:
log.debug(AWS_TAG + 'Restoring message visibility for ReceiptHandle: ' + receipt_handle)
sqs.change_message_visibility(
QueueUrl=q_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=0)
except Exception as e:
log.error(AWS_TAG + str(e))

#-------------------------------------------------------------------------------

def register_data_source(data_source_name):
'''References:
- https://github.com/senzing-garage/knowledge-base/blob/main/lists/environment-variables.md#senzing_tools_datasources
- https://github.com/senzing-garage/knowledge-base/blob/4c397efacdb0d2feecd89fa0f00ec10f99320d0c/proposals/working-with-config/mjd.md?plain=1#L98
'''
def f():
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
sz_config_mgr = sz_factory.create_configmanager()
default_config_id = sz_config_mgr.get_default_config_id()
sz_config = sz_config_mgr.create_config_from_config_id(default_config_id)
sz_config.register_data_source(data_source_name)
sz_config_mgr.set_default_config(sz_config.export(), 'default')
sz_factory.reinitialize(default_config_id)
try:
log.info(SZ_TAG + 'Registering new data_source: ' + data_source_name)
# For reasons unknown to me, this has to be done 2x before it sticks.
f()
f()
log.info(SZ_TAG + 'Successfully registered data_source: ' + data_source_name)
except sz.SzError as err:
log.error(SZ_TAG + str(err))
sys.exit(1)

#-------------------------------------------------------------------------------

def go():
Expand Down Expand Up @@ -121,16 +159,33 @@ def go():
+ receipt_handle)
rcd = json.loads(body)

# Process and send to Senzing.
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body,
sz.SzEngineFlags.SZ_WITH_INFO)
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
+ receipt_handle)
try:
# Process and send to Senzing.
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
+ receipt_handle)
except sz.SzUnknownDataSourceError as sz_uds_err:
try:
log.info(SZ_TAG + str(sz_uds_err))
# Encountered a new data source name; register it.
register_data_source(rcd['DATA_SOURCE'])

# Then try again: process and send to Senzing.
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
+ receipt_handle)
except sz.SzError as sz_err:
raise sz_err
except sz.SzError as sz_err:
log.error(SZ_TAG + DLQ_TAG + str(sz_err))
# "Toss back" this message to be re-consumed; we rely on AWS
# config to move out-of-order messages into the DLQ at some point.
make_msg_visible(sqs, Q_URL, receipt_handle)

# Lastly, delete msg if no errors.
else:
del_msg(sqs, Q_URL, receipt_handle)

# Delete msg from queue.
del_msg(sqs, Q_URL, receipt_handle)
except sz.SzError as sz_err:
log.error(SZ_TAG + DLQ_TAG + str(sz_err))
except Exception as e:
log.error(str(e))
sys.exit(1)
Expand Down