44import sys
55import boto3
66import senzing as sz
7+
8+ from loglib import *
9+ log = retrieve_logger ()
10+
711try :
8- print ('Importing senzing_core library . . .' )
12+ log . info ('Importing senzing_core library . . .' )
913 import senzing_core as sz_core
10- print ('Imported senzing_core successfully.' )
14+ log . info ('Imported senzing_core successfully.' )
1115except Exception as e :
12- print ('Importing senzing_core library failed.' )
13- print (e )
16+ log . error ('Importing senzing_core library failed.' )
17+ log . error (e )
1418 sys .exit (1 )
15-
16- # TODO add DLQ logic (needs jira ticket probably).
19+
20+ # TODO add DLQ logic (see DLG_TAG logging)
1721
1822Q_URL = os .environ ['Q_URL' ]
1923SZ_CONFIG = json .loads (os .environ ['SENZING_ENGINE_CONFIGURATION_JSON' ])
@@ -43,16 +47,17 @@ def _make_boto_session(fpath=None):
4347def _make_sqs_client (boto_session ):
4448 return boto_session .client ('sqs' )
4549
46- # TODO add try/except code
47- # TODO add logging
4850def init ():
4951 '''Returns sqs client object'''
50- sess = _make_boto_session ()
51- sqs = sess .client ('sqs' )
52- return sqs
52+ try :
53+ sess = _make_boto_session ()
54+ sqs = sess .client ('sqs' )
55+ log .info (AWS_TAG + 'SQS client object instantiated.' )
56+ return sqs
57+ except Exception as e :
58+ log .error (AWS_TAG + str (e ))
59+ sys .exit (1 )
5360
54- # TODO add try/except code
55- # TODO add logging
5661def get_msgs (sqs , q_url ):
5762 '''Generator function; returns a single SQS msg at a time.
5863 Pertinent keys in an SQS message include:
@@ -62,27 +67,32 @@ def get_msgs(sqs, q_url):
6267 '''
6368 while 1 :
6469 print ('waiting for msg' )
65- resp = sqs .receive_message (QueueUrl = q_url , MaxNumberOfMessages = 1 ,
66- WaitTimeSeconds = POLL_SECONDS )
67- if 'Messages' in resp and len (resp ['Messages' ]) == 1 :
68- yield resp ['Messages' ][0 ]
70+ try :
71+ log .info (AWS_TAG + 'Polling SQS for the next message' )
72+ resp = sqs .receive_message (QueueUrl = q_url , MaxNumberOfMessages = 1 ,
73+ WaitTimeSeconds = POLL_SECONDS )
74+ if 'Messages' in resp and len (resp ['Messages' ]) == 1 :
75+ yield resp ['Messages' ][0 ]
76+ except Exception as e :
77+ log .error (AWS_TAG + str (e ))
6978
70- # TODO add try/except code
71- # TODO add logging
7279def del_msg (sqs , q_url , receipt_handle ):
73- return sqs .delete_message (QueueUrl = q_url , ReceiptHandle = receipt_handle )
80+ try :
81+ return sqs .delete_message (QueueUrl = q_url , ReceiptHandle = receipt_handle )
82+ except Exception as e :
83+ log .error (AWS_TAG + DLQ_TAG + 'SQS delete failure for ReceiptHandle: ' +
84+ ReceiptHandle + ' Additional info: ' + str (e ))
7485
7586#-------------------------------------------------------------------------------
7687
77- # TODO add more try/except code as needed
78- # TODO add logging
7988def go ():
8089 '''Starts the Consumer process; runs indefinitely.'''
8190
8291 # SQS client
8392 sqs = init ()
8493
8594 # Spin up msgs generator
95+ log .info ('Spinning up messages generator' )
8696 msgs = get_msgs (sqs , Q_URL )
8797
8898 # Senzing init tasks.
@@ -94,40 +104,44 @@ def go():
94104 # Senzing engine object cannot be passed around between functions,
95105 # else it will be eagerly cleaned up / destroyed and no longer usable.
96106 sz_eng = sz_factory .create_engine ()
97- except sz .SzError as err :
98- # TODO log error
99- print (err )
107+ log .info (SZ_TAG + 'Senzing engine object instantiated.' )
108+ except sz .SzError as sz_err :
109+ log .error (SZ_TAG + str (sz_err ))
110+ sys .exit (1 )
111+ except Exception as e :
112+ log .error (str (e ))
100113 sys .exit (1 )
101114
102- # TODO log ReceiptHandle, other *generic* debug-facing information as appropriate.
103115 while 1 :
104- print ('Starting primary loop iteration . . .' )
105-
106- # Get next message.
107- msg = next (msgs )
108-
109- # Process and send to Senzing.
110- receipt_handle , body = msg ['ReceiptHandle' ], msg ['Body' ]
111- rcd = json .loads (body )
112116 try :
113- # TODO add logging
114- # TODO Use signal lib to handle stalled records (i.e., still
115- # processing >5 minutes)
117+ # Get next message.
118+ msg = next (msgs )
119+ receipt_handle , body = msg ['ReceiptHandle' ], msg ['Body' ]
120+ log .info ('SQS message retrieved, having ReceiptHandle: '
121+ + receipt_handle )
122+ rcd = json .loads (body )
123+
124+ # Process and send to Senzing.
116125 resp = sz_eng .add_record (rcd ['DATA_SOURCE' ], rcd ['RECORD_ID' ], body ,
117126 sz .SzEngineFlags .SZ_WITH_INFO )
118- print (resp )
119- except sz .SzError as err :
120- # TODO log / handle
121- print (err )
127+ log .info (SZ_TAG + 'Successful add_record having ReceiptHandle: '
128+ + receipt_handle )
129+
130+ # Delete msg from queue.
131+ del_msg (sqs , Q_URL , receipt_handle )
132+ except sz .SzError as sz_err :
133+ log .error (SZ_TAG + DLQ_TAG + str (sz_err ))
134+ except Exception as e :
135+ log .error (str (e ))
136+ sys .exit (1 )
122137
123- # Delete msg from queue.
124- del_msg (sqs , Q_URL , receipt_handle )
138+ #-------------------------------------------------------------------------------
125139
126140def main ():
127- print ('====================' )
128- print (' CONSUMER' )
129- print (' STARTED' )
130- print ('====================' )
141+ log . info ('====================' )
142+ log . info (' CONSUMER' )
143+ log . info (' STARTED' )
144+ log . info ('====================' )
131145 go ()
132146
133147if __name__ == '__main__' : main ()
0 commit comments