17
17
from typing import Callable , List , Tuple
18
18
import struct
19
19
import signal
20
+ import os
20
21
21
22
import numpy as np
22
23
@@ -92,10 +93,15 @@ class Task(ABC):
92
93
the staging mechanism.
93
94
"""
94
95
96
+ def __init__ (self ):
97
+ self .statistics = {"datasize" : 0 , "duration" : 0 }
98
+
95
99
@abstractmethod
96
100
def __call__ (self ):
97
101
pass
98
102
103
+ def stats (self ):
104
+ return self .statistics
99
105
100
106
class ForwardTask (Task ):
101
107
"""
@@ -112,7 +118,7 @@ def __init__(self, i_queue, o_queue, callback):
112
118
"""
113
119
initializes a ForwardTask class with the queues and the callback.
114
120
"""
115
-
121
+ super (). __init__ ()
116
122
if not isinstance (callback , Callable ):
117
123
raise TypeError (f"{ callback } argument is not Callable" )
118
124
@@ -142,6 +148,7 @@ def __call__(self):
142
148
the output to the output queue. In the case of receiving a 'termination' messages informs
143
149
the tasks waiting on the output queues about the terminations and returns from the function.
144
150
"""
151
+ start = time .time ()
145
152
146
153
while True :
147
154
# This is a blocking call
@@ -152,9 +159,14 @@ def __call__(self):
152
159
elif item .is_process ():
153
160
inputs , outputs = self ._action (item .data ())
154
161
self .o_queue .put (QueueMessage (MessageType .Process , DataBlob (inputs , outputs )))
162
+ self .statistics ["datasize" ] += (inputs .nbytes + outputs .nbytes )
155
163
elif item .is_new_model ():
156
164
# This is not handled yet
157
165
continue
166
+
167
+ end = time .time ()
168
+ self .statistics ["duration" ] = end - start
169
+ print (f"Spend { end - start } at { self .__class__ .__name__ } ({ self .statistics } )" )
158
170
return
159
171
160
172
@@ -171,6 +183,7 @@ class FSLoaderTask(Task):
171
183
"""
172
184
173
185
def __init__ (self , o_queue , loader , pattern ):
186
+ super ().__init__ ()
174
187
self .o_queue = o_queue
175
188
self .pattern = pattern
176
189
self .loader = loader
@@ -193,10 +206,13 @@ def __call__(self):
193
206
output_batches = np .array_split (output_data , num_batches )
194
207
for j , (i , o ) in enumerate (zip (input_batches , output_batches )):
195
208
self .o_queue .put (QueueMessage (MessageType .Process , DataBlob (i , o )))
209
+ self .statistics ["datasize" ] += (input_data .nbytes + output_data .nbytes )
210
+
196
211
self .o_queue .put (QueueMessage (MessageType .Terminate , None ))
197
212
198
213
end = time .time ()
199
- print (f"Spend { end - start } at { self .__class__ .__name__ } " )
214
+ self .statistics ["duration" ] += (end - start )
215
+ print (f"Spend { end - start } at { self .__class__ .__name__ } ({ self .statistics } )" )
200
216
201
217
202
218
class RMQMessage (object ):
@@ -343,7 +359,8 @@ class RMQLoaderTask(Task):
343
359
prefetch_count: Number of messages prefected by RMQ (impact performance)
344
360
"""
345
361
346
- def __init__ (self , o_queue , credentials , cacert , rmq_queue , prefetch_count = 1 ):
362
+ def __init__ (self , o_queue , credentials , cacert , rmq_queue , prefetch_count = 1 ):
363
+ super ().__init__ ()
347
364
self .o_queue = o_queue
348
365
self .credentials = credentials
349
366
self .cacert = cacert
@@ -387,6 +404,8 @@ def callback_message(self, ch, basic_deliver, properties, body):
387
404
input_batches = np .array_split (input_data , num_batches )
388
405
output_batches = np .array_split (output_data , num_batches )
389
406
407
+ self .statistics ["datasize" ] += (input_data .nbytes + output_data .nbytes )
408
+
390
409
for j , (i , o ) in enumerate (zip (input_batches , output_batches )):
391
410
self .o_queue .put (QueueMessage (MessageType .Process , DataBlob (i , o )))
392
411
@@ -397,15 +416,14 @@ def handler(signum, frame):
397
416
print (f"Received SIGNUM={ signum } for { name } [pid={ pid } ]: stopping process" )
398
417
self .rmq_consumer .stop ()
399
418
self .o_queue .put (QueueMessage (MessageType .Terminate , None ))
400
- print (f"Spend { self .total_time } at { self .__class__ .__name__ } " )
419
+ self .statistics ["duration" ] += (end - start )
420
+ print (f"Spend { self .total_time } at { self .__class__ .__name__ } ({ self .statistics } )" )
401
421
402
422
return handler
403
423
404
424
def __call__ (self ):
405
425
"""
406
- Busy loop of reading all files matching the pattern and creating
407
- '100' batches which will be pushed on the queue. Upon reading all files
408
- the Task pushes a 'Terminate' message to the queue and returns.
426
+ Busy loop of consuming messages from RMQ queue
409
427
"""
410
428
self .rmq_consumer .run ()
411
429
@@ -426,6 +444,7 @@ def __init__(self, i_queue, o_queue, writer_cls, out_dir):
426
444
initializes the writer task to read data from the i_queue write them using
427
445
the writer_cls and store the data in the out_dir.
428
446
"""
447
+ super ().__init__ ()
429
448
self .data_writer_cls = writer_cls
430
449
self .out_dir = out_dir
431
450
self .i_queue = i_queue
@@ -470,7 +489,9 @@ def __call__(self):
470
489
break
471
490
472
491
end = time .time ()
473
- print (f"Spend { end - start } { total_bytes_written } at { self .__class__ .__name__ } " )
492
+ self .statistics ["datasize" ] = total_bytes_written
493
+ self .statistics ["duration" ] += (end - start )
494
+ print (f"Spend { end - start } { total_bytes_written } at { self .__class__ .__name__ } ({ self .statistics } )" )
474
495
475
496
476
497
class PushToStore (Task ):
@@ -491,7 +512,7 @@ def __init__(self, i_queue, ams_config, db_path, store):
491
512
is not under db_path, it copies the file to this location and if store defined
492
513
it makes the kosh-store aware about the existence of the file.
493
514
"""
494
-
515
+ super (). __init__ ()
495
516
self .ams_config = ams_config
496
517
self .i_queue = i_queue
497
518
self .dir = Path (db_path ).absolute ()
@@ -521,9 +542,12 @@ def __call__(self):
521
542
522
543
if self ._store :
523
544
db_store .add_candidates ([str (dest_file )])
545
+
546
+ self .statistics ["datasize" ] += os .stat (src_fn ).st_size
524
547
525
548
end = time .time ()
526
- print (f"Spend { end - start } at { self .__class__ .__name__ } " )
549
+ self .statistics ["duration" ] += (end - start )
550
+ print (f"Spend { end - start } at { self .__class__ .__name__ } ({ self .statistics } )" )
527
551
528
552
529
553
class Pipeline (ABC ):
0 commit comments