Skip to content

Commit c931361

Browse files
committed
add signal alarm timeout logic
1 parent a780eb3 commit c931361

File tree

3 files changed

+31
-5
lines changed

3 files changed

+31
-5
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ Q_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-sen
187187
Environment variables:
188188

189189
- `LOG_LEVEL` is optional; defaults to `INFO`.
190-
- `SQS_VISIBILITY_TIMEOUT_SECONDS`
190+
- `SZ_CALL_TIMEOUT_SECONDS`
191191
- Optional; defaults to 420 seconds (7 min.)
192192
- This does two things: sets the (in)visiblity of a message when it's
193193
initially retrieved from SQS

middleware/consumer.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from loglib import *
99
log = retrieve_logger()
1010

11+
from timeout_handling import *
12+
1113
try:
1214
log.info('Importing senzing_core library . . .')
1315
import senzing_core as sz_core
@@ -18,7 +20,7 @@
1820
sys.exit(1)
1921

2022
Q_URL = os.environ['Q_URL']
21-
SQS_VISIBILITY_TIMEOUT_SECONDS = int(os.environ.get('SQS_VISIBILITY_TIMEOUT_SECONDS', 420))
23+
SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
2224
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
2325

2426
POLL_SECONDS = 20 # 20 seconds is SQS max
@@ -69,7 +71,7 @@ def get_msgs(sqs, q_url):
6971
log.debug(AWS_TAG + 'Polling SQS for the next message')
7072
resp = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1,
7173
WaitTimeSeconds=POLL_SECONDS,
72-
VisibilityTimeout=SQS_VISIBILITY_TIMEOUT_SECONDS)
74+
VisibilityTimeout=SZ_CALL_TIMEOUT_SECONDS)
7375
if 'Messages' in resp and len(resp['Messages']) == 1:
7476
yield resp['Messages'][0]
7577
except Exception as e:
@@ -161,7 +163,9 @@ def go():
161163

162164
try:
163165
# Process and send to Senzing.
166+
start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS)
164167
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
168+
cancel_alarm_timer()
165169
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
166170
+ receipt_handle)
167171
except sz.SzUnknownDataSourceError as sz_uds_err:
@@ -171,11 +175,17 @@ def go():
171175
register_data_source(rcd['DATA_SOURCE'])
172176

173177
# Then try again: process and send to Senzing.
178+
start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS)
174179
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
180+
cancel_alarm_timer()
175181
log.info(SZ_TAG + 'Successful add_record having ReceiptHandle: '
176182
+ receipt_handle)
177-
except sz.SzError as sz_err:
178-
raise sz_err
183+
except Exception as ex:
184+
raise ex
185+
except LongRunningCallTimeoutEx as lrex:
186+
log.error(f'{SZ_TAG} {type(lrex).__module__}.{type(lrex).__qualname__} :: '
187+
+ f'Long-running Senzing add_record call exceeded {SZ_CALL_TIMEOUT_SECONDS} sec.; '
188+
+ f'abandoning and moving on; receipt_handle was: {receipt_handle}')
179189
except sz.SzError as sz_err:
180190
log.error(SZ_TAG + DLQ_TAG + str(sz_err))
181191
# "Toss back" this message to be re-consumed; we rely on AWS

middleware/timeout_handling.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import signal
2+
3+
class LongRunningCallTimeoutEx(Exception):
4+
pass
5+
6+
def alarm_handler(signum, _):
7+
print('FFFOOOOOOOOOOOOOOOOOOOOOOOOOO')
8+
raise LongRunningCallTimeoutEx()
9+
10+
def start_alarm_timer(num_seconds):
11+
signal.alarm(num_seconds)
12+
13+
def cancel_alarm_timer():
14+
signal.alarm(0)
15+
16+
signal.signal(signal.SIGALRM, alarm_handler)

0 commit comments

Comments
 (0)