17
17
from typing import Callable , List , Tuple
18
18
import struct
19
19
import signal
20
- import os
21
20
22
21
import numpy as np
23
22
@@ -93,15 +92,10 @@ class Task(ABC):
93
92
the staging mechanism.
94
93
"""
95
94
96
- def __init__ (self ):
97
- self .statistics = {"datasize" : 0 , "duration" : 0 }
98
-
99
95
@abstractmethod
100
96
def __call__ (self ):
101
97
pass
102
98
103
- def stats (self ):
104
- return self .statistics
105
99
106
100
class ForwardTask (Task ):
107
101
"""
@@ -118,7 +112,7 @@ def __init__(self, i_queue, o_queue, callback):
118
112
"""
119
113
initializes a ForwardTask class with the queues and the callback.
120
114
"""
121
- super (). __init__ ()
115
+
122
116
if not isinstance (callback , Callable ):
123
117
raise TypeError (f"{ callback } argument is not Callable" )
124
118
@@ -148,7 +142,6 @@ def __call__(self):
148
142
the output to the output queue. In the case of receiving a 'termination' messages informs
149
143
the tasks waiting on the output queues about the terminations and returns from the function.
150
144
"""
151
- start = time .time ()
152
145
153
146
while True :
154
147
# This is a blocking call
@@ -159,14 +152,9 @@ def __call__(self):
159
152
elif item .is_process ():
160
153
inputs , outputs = self ._action (item .data ())
161
154
self .o_queue .put (QueueMessage (MessageType .Process , DataBlob (inputs , outputs )))
162
- self .statistics ["datasize" ] += (inputs .nbytes + outputs .nbytes )
163
155
elif item .is_new_model ():
164
156
# This is not handled yet
165
157
continue
166
-
167
- end = time .time ()
168
- self .statistics ["duration" ] = end - start
169
- print (f"Spend { end - start } at { self .__class__ .__name__ } ({ self .statistics } )" )
170
158
return
171
159
172
160
@@ -183,7 +171,6 @@ class FSLoaderTask(Task):
183
171
"""
184
172
185
173
def __init__ (self , o_queue , loader , pattern ):
186
- super ().__init__ ()
187
174
self .o_queue = o_queue
188
175
self .pattern = pattern
189
176
self .loader = loader
@@ -206,13 +193,10 @@ def __call__(self):
206
193
output_batches = np .array_split (output_data , num_batches )
207
194
for j , (i , o ) in enumerate (zip (input_batches , output_batches )):
208
195
self .o_queue .put (QueueMessage (MessageType .Process , DataBlob (i , o )))
209
- self .statistics ["datasize" ] += (input_data .nbytes + output_data .nbytes )
210
-
211
196
self .o_queue .put (QueueMessage (MessageType .Terminate , None ))
212
197
213
198
end = time .time ()
214
- self .statistics ["duration" ] += (end - start )
215
- print (f"Spend { end - start } at { self .__class__ .__name__ } ({ self .statistics } )" )
199
+ print (f"Spend { end - start } at { self .__class__ .__name__ } " )
216
200
217
201
218
202
class RMQMessage (object ):
@@ -359,8 +343,7 @@ class RMQLoaderTask(Task):
359
343
prefetch_count: Number of messages prefected by RMQ (impact performance)
360
344
"""
361
345
362
- def __init__ (self , o_queue , credentials , cacert , rmq_queue , prefetch_count = 1 ):
363
- super ().__init__ ()
346
+ def __init__ (self , o_queue , credentials , cacert , rmq_queue , prefetch_count = 1 ):
364
347
self .o_queue = o_queue
365
348
self .credentials = credentials
366
349
self .cacert = cacert
@@ -404,8 +387,6 @@ def callback_message(self, ch, basic_deliver, properties, body):
404
387
input_batches = np .array_split (input_data , num_batches )
405
388
output_batches = np .array_split (output_data , num_batches )
406
389
407
- self .statistics ["datasize" ] += (input_data .nbytes + output_data .nbytes )
408
-
409
390
for j , (i , o ) in enumerate (zip (input_batches , output_batches )):
410
391
self .o_queue .put (QueueMessage (MessageType .Process , DataBlob (i , o )))
411
392
@@ -416,14 +397,15 @@ def handler(signum, frame):
416
397
print (f"Received SIGNUM={ signum } for { name } [pid={ pid } ]: stopping process" )
417
398
self .rmq_consumer .stop ()
418
399
self .o_queue .put (QueueMessage (MessageType .Terminate , None ))
419
- self .statistics ["duration" ] += (end - start )
420
- print (f"Spend { self .total_time } at { self .__class__ .__name__ } ({ self .statistics } )" )
400
+ print (f"Spend { self .total_time } at { self .__class__ .__name__ } " )
421
401
422
402
return handler
423
403
424
404
def __call__ (self ):
425
405
"""
426
- Busy loop of consuming messages from RMQ queue
406
+ Busy loop of reading all files matching the pattern and creating
407
+ '100' batches which will be pushed on the queue. Upon reading all files
408
+ the Task pushes a 'Terminate' message to the queue and returns.
427
409
"""
428
410
self .rmq_consumer .run ()
429
411
@@ -444,7 +426,6 @@ def __init__(self, i_queue, o_queue, writer_cls, out_dir):
444
426
initializes the writer task to read data from the i_queue write them using
445
427
the writer_cls and store the data in the out_dir.
446
428
"""
447
- super ().__init__ ()
448
429
self .data_writer_cls = writer_cls
449
430
self .out_dir = out_dir
450
431
self .i_queue = i_queue
@@ -489,9 +470,7 @@ def __call__(self):
489
470
break
490
471
491
472
end = time .time ()
492
- self .statistics ["datasize" ] = total_bytes_written
493
- self .statistics ["duration" ] += (end - start )
494
- print (f"Spend { end - start } { total_bytes_written } at { self .__class__ .__name__ } ({ self .statistics } )" )
473
+ print (f"Spend { end - start } { total_bytes_written } at { self .__class__ .__name__ } " )
495
474
496
475
497
476
class PushToStore (Task ):
@@ -512,7 +491,7 @@ def __init__(self, i_queue, ams_config, db_path, store):
512
491
is not under db_path, it copies the file to this location and if store defined
513
492
it makes the kosh-store aware about the existence of the file.
514
493
"""
515
- super (). __init__ ()
494
+
516
495
self .ams_config = ams_config
517
496
self .i_queue = i_queue
518
497
self .dir = Path (db_path ).absolute ()
@@ -542,12 +521,9 @@ def __call__(self):
542
521
543
522
if self ._store :
544
523
db_store .add_candidates ([str (dest_file )])
545
-
546
- self .statistics ["datasize" ] += os .stat (src_fn ).st_size
547
524
548
525
end = time .time ()
549
- self .statistics ["duration" ] += (end - start )
550
- print (f"Spend { end - start } at { self .__class__ .__name__ } ({ self .statistics } )" )
526
+ print (f"Spend { end - start } at { self .__class__ .__name__ } " )
551
527
552
528
553
529
class Pipeline (ABC ):
0 commit comments