Skip to content

Commit 3a0d08f

Browse files
authored
Polish payload handling + logging (#46)
Incoming records require these two keys: DATA_SOURCE and RECORD_ID; ensure misformed records are handled gracefully. Also includes other minor updates.
1 parent 44c3137 commit 3a0d08f

File tree

5 files changed

+41
-46
lines changed

5 files changed

+41
-46
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,13 @@ _Mounts in docker-compose.yaml:_
208208
volumes:
209209
- ~/.aws:/home/senzing/.aws
210210

211+
_Required record keys:_
212+
213+
- The record to be sent into Senzing requires, at the very least, these two
214+
keys:
215+
- `DATA_SOURCE`
216+
- `RECORD_ID`
217+
211218
### Redoer
212219

213220
Similar to the consumer, the redoer is also a continually-running process.

middleware/consumer.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
log.info('Imported senzing_core successfully.')
1818
except Exception as e:
1919
log.error('Importing senzing_core library failed.')
20-
log.error(e)
21-
sys.exit(1)
20+
log.error(fmterr(e))
2221

2322
Q_URL = os.environ['Q_URL']
2423
SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
@@ -57,8 +56,7 @@ def init():
5756
else:
5857
return sess.client('sqs')
5958
except Exception as e:
60-
log.error(AWS_TAG + str(e))
61-
sys.exit(1)
59+
log.error(AWS_TAG + fmterr(e))
6260

6361
def get_msgs(sqs, q_url):
6462
'''Generator function; emits a single SQS msg at a time.
@@ -76,16 +74,15 @@ def get_msgs(sqs, q_url):
7674
if 'Messages' in resp and len(resp['Messages']) == 1:
7775
yield resp['Messages'][0]
7876
except Exception as e:
79-
log.error(f'{AWS_TAG} {type(e).__module__}.{type(e).__qualname__} :: {e}')
80-
sys.exit(1)
77+
log.error(f'{AWS_TAG} {type(e).__module__}.{type(e).__qualname__} :: {fmterr(e)}')
8178

8279
def del_msg(sqs, q_url, receipt_handle):
8380
try:
8481
log.debug(AWS_TAG + 'Deleting message having ReceiptHandle: ' + receipt_handle)
8582
return sqs.delete_message(QueueUrl=q_url, ReceiptHandle=receipt_handle)
8683
except Exception as e:
8784
log.error(AWS_TAG + DLQ_TAG + 'SQS delete failure for ReceiptHandle: ' +
88-
ReceiptHandle + ' Additional info: ' + str(e))
85+
ReceiptHandle + ' Additional info: ' + fmterr(e))
8986

9087
def make_msg_visible(sqs, q_url, receipt_handle):
9188
'''Setting visibility timeout to 0 on an SQS message makes it visible again,
@@ -97,7 +94,7 @@ def make_msg_visible(sqs, q_url, receipt_handle):
9794
ReceiptHandle=receipt_handle,
9895
VisibilityTimeout=0)
9996
except Exception as e:
100-
log.error(AWS_TAG + str(e))
97+
log.error(AWS_TAG + fmterr(e))
10198

10299
#-------------------------------------------------------------------------------
103100

@@ -121,8 +118,7 @@ def f():
121118
f()
122119
log.info(SZ_TAG + 'Successfully registered data_source: ' + data_source_name)
123120
except sz.SzError as err:
124-
log.error(SZ_TAG + str(err))
125-
sys.exit(1)
121+
log.error(SZ_TAG + fmterr(err))
126122

127123
#-------------------------------------------------------------------------------
128124

@@ -151,7 +147,7 @@ def clean_up(signum, frm):
151147
try:
152148
make_msg_visible(sqs, Q_URL, receipt_handle)
153149
except Exception as ex:
154-
log.error(ex)
150+
log.error(fmterr(ex))
155151
sys.exit(0)
156152
signal.signal(signal.SIGINT, clean_up)
157153
signal.signal(signal.SIGTERM, clean_up)
@@ -167,11 +163,9 @@ def clean_up(signum, frm):
167163
sz_eng = sz_factory.create_engine()
168164
log.info(SZ_TAG + 'Senzing engine object instantiated.')
169165
except sz.SzError as sz_err:
170-
log.error(SZ_TAG + str(sz_err))
171-
sys.exit(1)
166+
log.error(SZ_TAG + fmterr(sz_err))
172167
except Exception as e:
173-
log.error(str(e))
174-
sys.exit(1)
168+
log.error(fmterr(e))
175169

176170
while 1:
177171
try:
@@ -189,6 +183,9 @@ def clean_up(signum, frm):
189183
cancel_alarm_timer()
190184
log.debug(SZ_TAG + 'Successful add_record having ReceiptHandle: '
191185
+ receipt_handle)
186+
except KeyError as ke:
187+
log.error(fmterr(ke))
188+
make_msg_visible(sqs, Q_URL, receipt_handle)
192189
except sz.SzUnknownDataSourceError as sz_uds_err:
193190
log.info(SZ_TAG + str(sz_uds_err))
194191
# Encountered a new data source name; register it.
@@ -202,7 +199,7 @@ def clean_up(signum, frm):
202199
SZ_CALL_TIMEOUT_SECONDS,
203200
receipt_handle))
204201
except sz.SzError as sz_err:
205-
log.error(SZ_TAG + DLQ_TAG + str(sz_err))
202+
log.error(SZ_TAG + DLQ_TAG + fmterr(sz_err))
206203
# "Toss back" this message to be re-consumed; we rely on AWS
207204
# config to move out-of-order messages into the DLQ at some point.
208205
make_msg_visible(sqs, Q_URL, receipt_handle)
@@ -212,8 +209,7 @@ def clean_up(signum, frm):
212209
del_msg(sqs, Q_URL, receipt_handle)
213210

214211
except Exception as e:
215-
log.error(str(e))
216-
sys.exit(1)
212+
log.error(fmterr(e))
217213

218214
#-------------------------------------------------------------------------------
219215

middleware/exporter.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
log.info('Imported senzing_core successfully.')
1717
except Exception as e:
1818
log.error('Importing senzing_core library failed.')
19-
log.error(e)
20-
sys.exit(1)
19+
log.error(fmterr(e))
2120

2221
if 'SENZING_ENGINE_CONFIGURATION_JSON' not in os.environ:
2322
log.error('SENZING_ENGINE_CONFIGURATION_JSON environment variable required.')
@@ -46,8 +45,7 @@ def make_s3_client():
4645
else:
4746
return sess.client('s3')
4847
except Exception as e:
49-
log.error(AWS_TAG + str(e))
50-
sys.exit(1)
48+
log.error(AWS_TAG + fmterr(e))
5149

5250
def build_output_filename(tag='exporter-output', kind='json'):
5351
'''Returns a str, e.g.,
@@ -80,11 +78,9 @@ def go():
8078
sz_eng = sz_factory.create_engine()
8179
log.info(SZ_TAG + 'Senzing engine object instantiated.')
8280
except sz.SzError as sz_err:
83-
log.error(SZ_TAG + str(sz_err))
84-
sys.exit(1)
81+
log.error(SZ_TAG + fmterr(sz_err))
8582
except Exception as e:
86-
log.error(str(e))
87-
sys.exit(1)
83+
log.error(fmterr(e))
8884

8985
# init buffer
9086
buff = io.BytesIO()
@@ -111,9 +107,9 @@ def go():
111107
buff.write(']'.encode('utf-8'))
112108
log.info('Total bytes exported/buffered: ' + str(buff.getbuffer().nbytes))
113109
except sz.SzError as err:
114-
log.error(SZ_TAG + str(err))
110+
log.error(SZ_TAG + fmterr(err))
115111
except Exception as e:
116-
log.error(str(e))
112+
log.error(fmterr(e))
117113

118114
# rewind buffer
119115
buff.seek(0)
@@ -126,7 +122,7 @@ def go():
126122
s3.upload_fileobj(buff, S3_BUCKET_NAME, full_path)
127123
log.info(AWS_TAG + 'Successfully uploaded file.')
128124
except Exception as e:
129-
log.error(AWS_TAG + str(e))
125+
log.error(AWS_TAG + fmterr(e))
130126

131127
#-------------------------------------------------------------------------------
132128

middleware/loglib.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ def retrieve_logger(tag='default'):
2525
x.addHandler(handler)
2626
_instantiated_loggers[tag] = x
2727
return x
28+
29+
def fmterr(e):
30+
return ('%s :: %s' % (str(type(e)), str(e)))

middleware/redoer.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
log.info('Imported senzing_core successfully.')
1717
except Exception as e:
1818
log.error('Importing senzing_core library failed.')
19-
log.error(e)
20-
sys.exit(1)
19+
log.error(fmterr(e))
2120

2221
SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
2322
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
@@ -44,11 +43,9 @@ def go():
4443
sz_eng = sz_factory.create_engine()
4544
log.info(SZ_TAG + 'Senzing engine object instantiated.')
4645
except sz.SzError as sz_err:
47-
log.error(SZ_TAG + str(sz_err))
48-
sys.exit(1)
46+
log.error(SZ_TAG + fmterr(sz_err))
4947
except Exception as e:
50-
log.error(str(e))
51-
sys.exit(1)
48+
log.error(fmterr(e))
5249

5350
log.info('Starting primary loop.')
5451

@@ -78,7 +75,7 @@ def go():
7875
continue
7976
except sz.SzRetryableError as sz_ret_err:
8077
# We'll try to process this record again.
81-
log.error(SZ_TAG + str(sz_ret_err))
78+
log.error(SZ_TAG + fmterr(sz_ret_err))
8279
attempts_left -= 1
8380
log.debug(SZ_TAG + f'Remaining attempts for this record: {attempts_left}')
8481
if not attempts_left:
@@ -96,20 +93,18 @@ def go():
9693
SZ_CALL_TIMEOUT_SECONDS,
9794
receipt_handle))
9895
except sz.SzError as sz_err:
99-
log.error(SZ_TAG + str(sz_err))
100-
sys.exit(1)
96+
log.error(SZ_TAG + fmterr(sz_err))
10197

10298
else:
10399
try:
104100
tally = sz_eng.count_redo_records()
105101
log.debug(SZ_TAG + 'Current redo count: ' + str(tally))
106102
except sz.SzRetryableError as sz_ret_err:
107-
log.error(SZ_TAG + str(sz_ret_err))
103+
log.error(SZ_TAG + fmterr(sz_ret_err))
108104
time.sleep(WAIT_SECONDS)
109105
continue
110106
except sz.SzError as sz_err:
111-
log.error(SZ_TAG + str(sz_err))
112-
sys.exit(1)
107+
log.error(SZ_TAG + fmterr(sz_err))
113108

114109
if tally:
115110

@@ -126,18 +121,16 @@ def go():
126121
+ 'nothing from get_redo_record')
127122
except sz.SzRetryableError as sz_ret_err:
128123
# No additional action needed; we'll just try getting again.
129-
log.error(SZ_TAG + str(sz_ret_err))
124+
log.error(SZ_TAG + fmterr(sz_ret_err))
130125
except sz.SzError as sz_err:
131-
log.error(SZ_TAG + str(sz_err))
132-
sys.exit(1)
126+
log.error(SZ_TAG + fmterr(sz_err))
133127

134128
else:
135129
log.debug('No redo records. Will wait ' + str(WAIT_SECONDS) + ' seconds.')
136130
time.sleep(WAIT_SECONDS)
137131

138132
except Exception as e:
139-
log.error(str(e))
140-
sys.exit(1)
133+
log.error(fmterr(e))
141134

142135
#-------------------------------------------------------------------------------
143136

0 commit comments

Comments
 (0)