Skip to content

Commit 45ceda7

Browse files
committed
Fixing dates timestamps
1 parent 824a699 commit 45ceda7

2 files changed

Lines changed: 84 additions & 100 deletions

File tree

src/snovault/elasticsearch/indexer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ def _load_indexing(request, session, connection, indexer_state):
378378
first_txn = None
379379
snapshot_id = None
380380
(xmin, invalidated, restart) = indexer_state.priority_cycle(request)
381+
indexer_store.set_state(indexer_store.state_load_indexing)
381382
indexer_state.log_reindex_init_state()
382383
# OPTIONAL: restart support
383384
if restart: # Currently not bothering with restart!!!

src/snovault/elasticsearch/local_indexer_store.py

Lines changed: 83 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -29,44 +29,6 @@ def includeme(config):
2929
config.registry[INDEXER_STORE] = IndexerStore(config.registry.settings)
3030

3131

32-
def _convert_dt_str_to_ts(dt_str):
33-
'''Expecting dt_str to be in the format 2020-10-23 17:09:36.442405'''
34-
try:
35-
return datetime.datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f').timestamp()
36-
except ValueError:
37-
return None
38-
39-
40-
def _duration_with_unis_str(ts_start, ts_end=None):
41-
'''
42-
Return duration in seconds, minutes, or hours
43-
- From now by default
44-
- from provided end timestamp
45-
'''
46-
try:
47-
ts_start = int(ts_start)
48-
except ValueError:
49-
ts_start = _convert_dt_str_to_ts(ts_start)
50-
if not ts_start:
51-
return f"Could not determine duration with ts_start={ts_start}"
52-
try:
53-
if not ts_end:
54-
ts_end = time.time()
55-
ts_end = int(ts_end)
56-
except ValueError:
57-
ts_end = _convert_dt_str_to_ts(ts_end)
58-
if not ts_end:
59-
return f"Could not determine duration with ts_end={ts_end}"
60-
seconds = float(int(ts_end) - int(ts_start))
61-
div = 1.0
62-
unit = 'seconds'
63-
if seconds > 60:
64-
div = 60.0
65-
unit = 'minutes'
66-
if seconds > 3600:
67-
div = 3600.0
68-
unit = 'hours'
69-
return f"{seconds/div:0.2f} {unit}"
7032

7133

7234
@view_config(route_name='indexer_store', request_method='GET', request_param='events')
@@ -103,7 +65,7 @@ def indexer_store_events(request):
10365
event_tags = indexer_store.list_get(INDEXER_EVENTS_LIST, start, stop)
10466
result['events'] = []
10567
for event_tag in event_tags:
106-
result['events'].append(indexer_store.get_event(event_tag))
68+
result['events'].append(indexer_store.get_event_msg(event_tag))
10769
else:
10870
# Events arg is bad format?
10971
result['error'] = f"Bad events value '{request_argument}'"
@@ -148,63 +110,64 @@ def indexer_store_state_split(request):
148110
result['dynamic'][key] = state_obj[key]
149111
# Determine if run events and run state
150112
event_key = None
151-
formatted_duration = None
113+
duration = None
152114
if current_state in [
153115
IndexerStore.state_endpoint_start[0],
154116
IndexerStore.state_waiting[0],
155117
IndexerStore.state_load_indexing[0],
156118
]:
157119
event_key = 'previous_event'
158-
formatted_duration = _duration_with_unis_str(state_obj['start_ts'], ts_end=state_obj['end_time'])
120+
duration = indexer_store._duration_with_unis_str(state_obj['start_dt'], ts_end=state_obj['end_dt'])
159121
elif current_state == IndexerStore.state_run_indexing[0]:
160-
if state_obj['end_time'] == 'tbd':
122+
if state_obj['end_dt'] == 'tbd':
161123
event_key = 'current_event'
162-
formatted_duration = _duration_with_unis_str(state_obj['start_ts'])
124+
duration = indexer_store._duration_with_unis_str(state_obj['start_dt'])
163125
else:
164126
event_key = 'previous_event'
165-
formatted_duration = _duration_with_unis_str(state_obj['start_ts'], ts_end=state_obj['end_time'])
127+
duration = indexer_store._duration_with_unis_str(state_obj['start_dt'], ts_end=state_obj['end_dt'])
166128
# Add current or previous run event keys
167129
if event_key:
168130
result[event_key] = {}
169131
for key in indexer_store.event_keys:
170132
result[event_key][key] = state_obj[key]
171-
result[event_key]['formated_duration'] = formatted_duration
133+
result[event_key]['duration'] = duration
172134
request.query_string = "format=json"
173135
return result
174136

175137

176138
@view_config(route_name='indexer_store', request_method='GET')
177139
def indexer_store_state(request):
178140
'''Minimal view for current state and additional info if running'''
179-
state_obj = request.registry[INDEXER_STORE].get_state()
141+
indexer_store = request.registry[INDEXER_STORE]
142+
state_obj = indexer_store.get_state()
180143
current_state = state_obj.get('state', 'not initialized')
181144
result = {
182145
'state': current_state,
183-
'state_duration': 'could not calculate',
146+
'time_in_state': 'could not calculate',
184147
}
185-
if current_state == IndexerStore.state_initialized[0]:
186-
result['state_duration'] = _duration_with_unis_str(state_obj['init_dt'])
148+
if current_state == IndexerStore.state_waiting[0]:
149+
result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_end_dt'])
150+
result['description'] = f"Remains in state for {state_obj['loop_time']} seconds."
151+
elif current_state == IndexerStore.state_initialized[0]:
152+
result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['init_dt'])
187153
result['description'] = 'Very short duration. Happens once during deployment'
188154
elif current_state == IndexerStore.state_endpoint_start[0]:
189-
result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start'])
155+
result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_start'])
190156
result['description'] = 'Very short duration. Happens once a minute.'
191157
elif current_state == IndexerStore.state_load_indexing[0]:
192-
result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start'])
158+
result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_start'])
193159
result['description'] = 'Time depends on number of uuids to index. Could take minutes.'
194160
elif current_state == IndexerStore.state_run_indexing[0]:
195-
if state_obj['end_time'] == 'tbd':
196-
result['state_duration'] = _duration_with_unis_str(state_obj['start_ts'])
161+
if state_obj['end_dt'] == 'tbd':
162+
result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['start_dt'])
197163
result['description'] = 'Time depends on number of uuids to index. Could take hours.'
198164
result['current_event_tag'] = state_obj['event_tag']
199165
result['current_invalidated_cnt'] = state_obj['invalidated_cnt']
200166
else:
201-
result['state_duration'] = _duration_with_unis_str(state_obj['end_time'])
167+
result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['end_dt'])
202168
result['description'] = 'Short duration. Should go to waiting soon.'
203169
result['just_finished_event_tag'] = state_obj.get('event_tag', 'not initialized')
204170
result['just_finished_invalidated_cnt'] = state_obj['invalidated_cnt']
205-
elif current_state == IndexerStore.state_waiting[0]:
206-
result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_end'])
207-
result['description'] = f"Remains in state for {state_obj['loop_time']} seconds."
208171
request.query_string = "format=json"
209172
return result
210173

@@ -227,21 +190,19 @@ class IndexerStore(LocalStoreClient):
227190
'state_key',
228191
]
229192
dynamic_keys = [
230-
'endpoint_end',
231-
'endpoint_start',
232-
'endpoint_start_ts',
193+
'endpoint_end_dt',
194+
'endpoint_start_dt',
233195
'state',
234196
'state_desc',
235197
'state_error',
236-
'sub_state',
237198
]
238199
event_keys = [
239200
'duration',
240-
'end_time',
201+
'end_dt',
241202
'errors_cnt',
242203
'event_tag',
243204
'invalidated_cnt',
244-
'start_ts',
205+
'start_dt',
245206
]
246207

247208
def __init__(self, settings):
@@ -261,6 +222,43 @@ def __init__(self, settings):
261222
if not curr_state:
262223
self._init_state()
263224

225+
@staticmethod
226+
def _dt_now():
227+
return str(datetime.datetime.utcnow())
228+
229+
@staticmethod
230+
def _convert_dt_to_ts(dt_str):
231+
'''Expecting dt_str to be in the format 2020-10-23 17:09:36.442405'''
232+
try:
233+
return datetime.datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f').timestamp()
234+
except ValueError:
235+
return None
236+
237+
def _duration_with_unis_str(self, start_dt, end_dt=None):
238+
'''
239+
Return duration in seconds, minutes, or hours
240+
- Input expected to be in '2020-10-23 17:09:36.442405' format
241+
- From now if end_dt not provided
242+
'''
243+
start_ts = self._convert_dt_to_ts(start_dt)
244+
if not start_ts:
245+
return f"Could not determine duration with start_dt={start_dt}"
246+
if not end_dt:
247+
end_dt = self._dt_now()
248+
end_ts = self._convert_dt_to_ts(end_dt)
249+
if not end_ts:
250+
return f"Could not determine duration with end_dt={end_dt}"
251+
seconds = float(end_ts - start_ts)
252+
div = 1.0
253+
unit = 'seconds'
254+
if seconds > 60:
255+
div = 60.0
256+
unit = 'minutes'
257+
if seconds > 3600:
258+
div = 3600.0
259+
unit = 'hours'
260+
return f"{seconds/div:0.2f} {unit}"
261+
264262
def _init_state(self):
265263
'''Indexer state in redis is named after the process in the config'''
266264
init_state = {
@@ -285,7 +283,7 @@ def _init_state(self):
285283

286284
def _end_event(self, event_tag, state):
287285
'''Close event with only certain event keys in state. Also add human readable date time'''
288-
for event_key in ['duration', 'end_time', 'errors_cnt']:
286+
for event_key in ['end_dt', 'errors_cnt']:
289287
self.item_set(f"{event_tag}:{event_key}", state[event_key])
290288
self.item_set(f"{event_tag}:end", str(datetime.datetime.utcnow()))
291289

@@ -296,11 +294,16 @@ def _start_event(self, event_tag, state):
296294
for event_key in self.event_keys:
297295
self.item_set(f"{event_tag}:{event_key}", state[event_key])
298296

299-
def get_event(self, event_tag):
300-
end = str(self.item_get(event_tag + ':end'))
297+
def get_event_msg(self, event_tag):
298+
end_dt = str(self.item_get(event_tag + ':end_dt'))
299+
start_dt = str(self.item_get(event_tag + ':start_dt'))
300+
errors_cnt = str(self.item_get(event_tag + ':errors_cnt'))
301301
invalidated_cnt = str(self.item_get(event_tag + ':invalidated_cnt'))
302-
duration = str(self.item_get(event_tag + ':duration'))
303-
msg = f"Indexed '{invalidated_cnt}' uuids in '{duration}' seconds. Ended at '{end}'"
302+
duration = self._duration_with_unis_str(start_dt, end=end_dt)
303+
msg = (
304+
f"Indexed '{invalidated_cnt}' uuids in '{duration}'"
305+
f"with '{errors_cnt}' errors. Ended at '{end_dt}'."
306+
)
304307
return f"{event_tag}: {msg}"
305308

306309
def get_state(self):
@@ -312,72 +315,52 @@ def _set_state(self, state_tuple, new_state):
312315
new_state['state_desc'] = state_tuple[1]
313316
self.dict_set(INDEXER_STATE_TAG, new_state)
314317

315-
def _set_state_load_indexing(self, **kwargs):
316-
state = self.get_state()
317-
state['sub_state'] = kwargs.get('sub_state', 'where is your sub_state!?')
318-
# Set sub state keys
319-
allowed_keys = []
320-
if state['sub_state'] == 'priority_cycle':
321-
allowed_keys = ['priority_xmin', 'priority_invalidated', 'priority_did_restart']
322-
elif state['sub_state'] == 'current_cycle':
323-
allowed_keys = ['xmin', 'last_xmin']
324-
# Update state
325-
for key in allowed_keys:
326-
if key in kwargs:
327-
if isinstance(kwargs[key], list):
328-
kwargs[key] = f"[{', '.join(kwargs[key])}]"
329-
state[key] = str(kwargs[key])
330-
self._set_state(self.state_load_indexing, state)
331-
return self.get_state(), None
332-
333318
def set_state(self, state_tuple, **kwargs):
334319
state = self.get_state()
320+
state['state_error'] = ''
335321
if not isinstance(state_tuple, tuple):
336322
# Invalid State type
337323
state['state_error'] = f"state, '{str(state_tuple)}', is not a tuple"
338324
self.dict_set(INDEXER_STATE_TAG, state)
339325
return self.get_state(), None
326+
elif state_tuple[0] == self.state_waiting[0]:
327+
# Waiting in es_index_listener for timeout
328+
state['endpoint_start_dt'] = 'tbd'
329+
state['endpoint_end_dt'] = self._dt_now()
330+
self._set_state(state_tuple, state)
331+
return self.get_state(), None
340332
elif not len(state_tuple) == 2:
341333
# Invalid State len
342334
state['state_error'] = f"state, {state_tuple}, is wrong length"
343335
self.dict_set(INDEXER_STATE_TAG, state)
344336
return self.get_state(), None
345337
elif state_tuple[0] == self.state_endpoint_start[0]:
346338
# Reset
347-
state['endpoint_end'] = 'tbd'
348-
state['endpoint_start'] = str(datetime.datetime.utcnow())
349-
state['endpoint_start_ts'] = str(int(time.time()))
350-
state['state_error'] = 'tbd'
339+
state['endpoint_end_dt'] = 'tbd'
340+
state['endpoint_start_dt'] = self._dt_now()
351341
self._set_state(state_tuple, state)
352342
return self.get_state(), None
353343
elif state_tuple[0] == self.state_load_indexing[0]:
354344
# Indexer is checking for uuids to index
355-
return self._set_state_load_indexing(**kwargs)
345+
pass
356346
elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('invalidated_cnt'):
357347
# Reset event keys
358348
for event_key in self.event_keys:
359349
state[event_key] = 'tbd'
360350
# Start indexing
361351
state['event_tag'] = self.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN)
362352
state['invalidated_cnt'] = str(kwargs['invalidated_cnt'])
363-
state['start_ts'] = str(int(time.time()))
353+
state['start_dt'] = self._dt_now()
364354
self._start_event(state['event_tag'], state)
365355
self._set_state(state_tuple, state)
366356
return self.get_state(), state['event_tag']
367357
elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('event_tag'):
368358
# End indexing
369-
state['end_time'] = str(datetime.datetime.utcnow())
370-
state['end_ts'] = int(time.time())
359+
state['end_dt'] = self._dt_now()
371360
state['errors_cnt'] = str(kwargs.get('errors_cnt', 0))
372-
state['duration'] = _duration_with_unis_str(state['start_ts'], ts_end=state['end_ts'])
373361
self._end_event(kwargs['event_tag'], state)
374362
self._set_state(state_tuple, state)
375363
return self.get_state(), kwargs['event_tag']
376-
elif state_tuple[0] == self.state_waiting[0]:
377-
# Waiting in es_index_listener for timeout
378-
state['endpoint_end'] = str(datetime.datetime.utcnow())
379-
self._set_state(state_tuple, state)
380-
return self.get_state(), None
381364
elif state_tuple[0] == self.state_run_indexing[0]:
382365
# Invalid State
383366
state['state_error'] = f"{str(state_tuple)} requries additional arguments"

0 commit comments

Comments
 (0)