Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,19 @@ Purge the database:

##### S3

You might need to configure an AWS profile before using these S3-related
You might need to configure an AWS profile before using these S3-related
utilities. See further down below for how to do that.

Copy a file out of the LocalStack S3 bucket into `~/tmp` on your machine (be
sure this folder already exists -- on macOS, that would be
Copy a file out of the LocalStack S3 bucket into `~/tmp` on your machine (be
sure this folder already exists -- on macOS, that would be
`/Users/yourusername/tmp`):

> [!NOTE]
> You will need to manually create `/Users/yourusername/tmp` if it
> doesn't already exist.

# Here, `hemingway.txt` is the file you wish to retrieve from S3.
docker compose run tools python3 dev/s3_get.py hemingway.txt
docker compose run tools python3 dev/s3_get.py hemingway.txt

Purge the LocalStack S3 bucket:

Expand Down Expand Up @@ -174,8 +174,8 @@ instantiating client objects for use with particular LocalStack services, e.g.:

### Consumer

Spinning up the consumer middleware (intended to be a continually-running
process; in a production scenario, multiple instances could be running
Spinning up the consumer middleware (intended to be a continually-running
process; in a production scenario, multiple instances could be running
simultaneously as needed):

```bash
Expand All @@ -184,7 +184,15 @@ Q_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-sen
--env LOG_LEVEL=DEBUG consumer
```

`LOG_LEVEL` is optional; defaults to `INFO`.
Environment variables:

- `LOG_LEVEL` is optional; defaults to `INFO`.
- `SZ_CALL_TIMEOUT_SECONDS`
- Optional; defaults to 420 seconds (7 min.)
- This does two things: sets the (in)visibility of a message when it's
initially retrieved from SQS
- Sets the maximum amount of time the Consumer will wait for a Senzing
`add_record` to complete before bailing and moving on.

### Redoer

Expand All @@ -194,7 +202,13 @@ Similar to the consumer, the redoer is also a continually-running process.
docker compose run --env AWS_PROFILE=localstack --env LOG_LEVEL=DEBUG redoer
```

`LOG_LEVEL` is optional; defaults to `INFO`.
Environment variables:

- `LOG_LEVEL` is optional; defaults to `INFO`.
- `SZ_CALL_TIMEOUT_SECONDS`
- Optional; defaults to 420 seconds (7 min.)
- Sets the maximum amount of time the Exporter will wait for a Senzing
`process_redo_record` to complete before bailing and moving on.

### Exporter

Expand Down
54 changes: 40 additions & 14 deletions middleware/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
import signal
import time
import sys
import boto3
Expand All @@ -8,6 +9,8 @@
from loglib import *
log = retrieve_logger()

from timeout_handling import *

try:
log.info('Importing senzing_core library . . .')
import senzing_core as sz_core
Expand All @@ -18,10 +21,10 @@
sys.exit(1)

Q_URL = os.environ['Q_URL']
SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])

POLL_SECONDS = 20 # 20 seconds is SQS max
HIDE_MESSAGE_SECONDS = 600 # SQS visibility timeout

#-------------------------------------------------------------------------------

Expand Down Expand Up @@ -68,7 +71,8 @@ def get_msgs(sqs, q_url):
try:
log.debug(AWS_TAG + 'Polling SQS for the next message')
resp = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1,
WaitTimeSeconds=POLL_SECONDS)
WaitTimeSeconds=POLL_SECONDS,
VisibilityTimeout=SZ_CALL_TIMEOUT_SECONDS)
if 'Messages' in resp and len(resp['Messages']) == 1:
yield resp['Messages'][0]
except Exception as e:
Expand Down Expand Up @@ -132,6 +136,26 @@ def go():
log.info('Spinning up messages generator')
msgs = get_msgs(sqs, Q_URL)

receipt_handle = None

# Orderly clean-up logic if process is suddenly shut down.
def clean_up(signum, frm):
'''Attempt to cleanly reset the visibility of the current in-flight
message before exiting. (It's possible we are exiting after
a message was deleted but the next has not yet been retrieved; that's ok'''
nonlocal sqs
nonlocal receipt_handle
log.info('***************************')
log.info('SIGINT or SIGTERM received.')
log.info('***************************')
try:
make_msg_visible(sqs, Q_URL, receipt_handle)
except Exception as ex:
log.error(ex)
sys.exit(0)
signal.signal(signal.SIGINT, clean_up)
signal.signal(signal.SIGTERM, clean_up)

# Senzing init tasks.
sz_eng = None
try:
Expand Down Expand Up @@ -160,21 +184,23 @@ def go():

try:
# Process and send to Senzing.
start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS)
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
cancel_alarm_timer()
log.debug(SZ_TAG + 'Successful add_record having ReceiptHandle: '
+ receipt_handle)
except sz.SzUnknownDataSourceError as sz_uds_err:
try:
log.info(SZ_TAG + str(sz_uds_err))
# Encountered a new data source name; register it.
register_data_source(rcd['DATA_SOURCE'])

# Then try again: process and send to Senzing.
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
+ receipt_handle)
except sz.SzError as sz_err:
raise sz_err
log.info(SZ_TAG + str(sz_uds_err))
# Encountered a new data source name; register it.
register_data_source(rcd['DATA_SOURCE'])
# Toss back message for now.
make_msg_visible(sqs, Q_URL, receipt_handle)
except LongRunningCallTimeoutEx as lrex:
log.error(build_sz_timeout_msg(
type(lrex).__module__,
type(lrex).__qualname__,
SZ_CALL_TIMEOUT_SECONDS,
receipt_handle))
except sz.SzError as sz_err:
log.error(SZ_TAG + DLQ_TAG + str(sz_err))
# "Toss back" this message to be re-consumed; we rely on AWS
Expand Down
13 changes: 13 additions & 0 deletions middleware/redoer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from loglib import *
log = retrieve_logger()

from timeout_handling import *

try:
log.info('Importing senzing_core library . . .')
import senzing_core as sz_core
Expand All @@ -17,6 +19,7 @@
log.error(e)
sys.exit(1)

SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])

# How long to wait before attempting next Senzing op.
Expand Down Expand Up @@ -62,7 +65,9 @@ def go():

if have_rcd:
try:
start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS)
sz_eng.process_redo_record(rcd)
cancel_alarm_timer()
have_rcd = 0
log.debug(SZ_TAG + 'Successfully redid one record via process_redo_record().')
continue
Expand All @@ -77,6 +82,14 @@ def go():
+ ' for this record; dropping on the floor and moving on.')
time.sleep(WAIT_SECONDS)
continue
except LongRunningCallTimeoutEx as lrex:
# Abandon and move on.
have_rcd = 0
log.error(build_sz_timeout_msg(
type(lrex).__module__,
type(lrex).__qualname__,
SZ_CALL_TIMEOUT_SECONDS,
receipt_handle))
except sz.SzError as sz_err:
log.error(SZ_TAG + str(sz_err))
sys.exit(1)
Expand Down
26 changes: 26 additions & 0 deletions middleware/timeout_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import signal

from loglib import *

class LongRunningCallTimeoutEx(Exception):
pass

def alarm_handler(signum, _):
raise LongRunningCallTimeoutEx()

def start_alarm_timer(num_seconds):
signal.alarm(num_seconds)

def cancel_alarm_timer():
signal.alarm(0)

signal.signal(signal.SIGALRM, alarm_handler)

def build_sz_timeout_msg(module_name,
class_name,
num_seconds,
receipt_handle):
return (
f'{SZ_TAG} {module_name}.{class_name} :: '
+ f'Long-running Senzing add_record call exceeded {num_seconds} sec.; '
+ f'abandoning and moving on; receipt_handle was: {receipt_handle}')