@@ -86,7 +86,9 @@ async def wait_for(coros, name=""):
86
86
# wrap the coro in a task to work with python 3.10 and 3.11+ where asyncio.wait semantics
87
87
# changed to not accept any awaitable
88
88
start = time .time ()
89
- done , _ = await asyncio .wait ([asyncio .create_task (_ensure_coro (c )) for c in coros ])
89
+ done , _ = await asyncio .wait (
90
+ [asyncio .create_task (_ensure_coro (c )) for c in coros ]
91
+ )
90
92
end = time .time ()
91
93
log .info (f"waiting for { name } took { end - start } s" )
92
94
for d in done :
@@ -108,9 +110,9 @@ class DFRayProcessorPool:
108
110
#
109
111
# This is simple though and will suffice for now
110
112
111
- def __init__ (self , min_workers : int , max_workers : int ):
112
- self .min_workers = min_workers
113
- self .max_workers = max_workers
113
+ def __init__ (self , min_processors : int , max_processors : int ):
114
+ self .min_processors = min_processors
115
+ self .max_processors = max_processors
114
116
115
117
# a map of processor_key (a random identifier) to stage actor reference
116
118
self .pool = {}
@@ -137,11 +139,11 @@ def __init__(self, min_workers: int, max_workers: int):
137
139
# processors available
138
140
self .available = set ()
139
141
140
- for _ in range (min_workers ):
142
+ for _ in range (min_processors ):
141
143
self ._new_processor ()
142
144
143
145
log .info (
144
- f"created ray processor pool (min_workers : { min_workers } , max_workers : { max_workers } )"
146
+ f"created ray processor pool (min_processors : { min_processors } , max_processors : { max_processors } )"
145
147
)
146
148
147
149
async def start (self ):
@@ -159,12 +161,14 @@ async def acquire(self, need=1):
159
161
160
162
have = len (self .available )
161
163
total = len (self .available ) + len (self .acquired )
162
- can_make = self .max_workers - total
164
+ can_make = self .max_processors - total
163
165
164
166
need_to_make = need - have
165
167
166
168
if need_to_make > can_make :
167
- raise Exception (f"Cannot allocate workers above { self .max_workers } " )
169
+ raise Exception (
170
+ f"Cannot allocate processors above { self .max_processors } "
171
+ )
168
172
169
173
if need_to_make > 0 :
170
174
log .debug (f"creating { need_to_make } additional processors" )
@@ -193,9 +197,9 @@ def _new_processor(self):
193
197
self .processors_ready .clear ()
194
198
processor_key = new_friendly_name ()
195
199
log .debug (f"starting processor: { processor_key } " )
196
- processor = DFRayProcessor .options (name = f"Processor : { processor_key } " ). remote (
197
- processor_key
198
- )
200
+ processor = DFRayProcessor .options (
201
+ name = f"Processor : { processor_key } "
202
+ ). remote ( processor_key )
199
203
self .pool [processor_key ] = processor
200
204
self .processors_started .add (processor .start_up .remote ())
201
205
self .available .add (processor_key )
@@ -244,7 +248,9 @@ async def _wait_for_serve(self):
244
248
245
249
async def all_done (self ):
246
250
log .info ("calling processor all done" )
247
- refs = [processor .all_done .remote () for processor in self .pool .values ()]
251
+ refs = [
252
+ processor .all_done .remote () for processor in self .pool .values ()
253
+ ]
248
254
await wait_for (refs , "processors to be all done" )
249
255
log .info ("all processors shutdown" )
250
256
@@ -287,7 +293,9 @@ async def update_plan(
287
293
)
288
294
289
295
async def serve (self ):
290
- log .info (f"[{ self .processor_key } ] serving on { self .processor_service .addr ()} " )
296
+ log .info (
297
+ f"[{ self .processor_key } ] serving on { self .processor_service .addr ()} "
298
+ )
291
299
await self .processor_service .serve ()
292
300
log .info (f"[{ self .processor_key } ] done serving" )
293
301
@@ -321,11 +329,13 @@ def __str__(self):
321
329
class DFRayContextSupervisor :
322
330
def __init__ (
323
331
self ,
324
- worker_pool_min : int ,
325
- worker_pool_max : int ,
332
+ processor_pool_min : int ,
333
+ processor_pool_max : int ,
326
334
) -> None :
327
- log .info (f"Creating DFRayContextSupervisor worker_pool_min: { worker_pool_min } " )
328
- self .pool = DFRayProcessorPool (worker_pool_min , worker_pool_max )
335
+ log .info (
336
+ f"Creating DFRayContextSupervisor processor_pool_min: { processor_pool_min } "
337
+ )
338
+ self .pool = DFRayProcessorPool (processor_pool_min , processor_pool_max )
329
339
self .stages : dict [str , InternalStageData ] = {}
330
340
log .info ("Created DFRayContextSupervisor" )
331
341
@@ -337,7 +347,9 @@ async def wait_for_ready(self):
337
347
338
348
async def get_stage_addrs (self , stage_id : int ):
339
349
addrs = [
340
- sd .remote_addr for sd in self .stages .values () if sd .stage_id == stage_id
350
+ sd .remote_addr
351
+ for sd in self .stages .values ()
352
+ if sd .stage_id == stage_id
341
353
]
342
354
return addrs
343
355
@@ -387,7 +399,10 @@ async def new_query(
387
399
refs .append (
388
400
isd .remote_processor .update_plan .remote (
389
401
isd .stage_id ,
390
- {stage_id : val ["child_addrs" ] for (stage_id , val ) in kid .items ()},
402
+ {
403
+ stage_id : val ["child_addrs" ]
404
+ for (stage_id , val ) in kid .items ()
405
+ },
391
406
isd .partition_group ,
392
407
isd .plan_bytes ,
393
408
)
@@ -419,7 +434,9 @@ async def sort_out_addresses(self):
419
434
]
420
435
421
436
# sanity check
422
- assert all ([op == output_partitions [0 ] for op in output_partitions ])
437
+ assert all (
438
+ [op == output_partitions [0 ] for op in output_partitions ]
439
+ )
423
440
output_partitions = output_partitions [0 ]
424
441
425
442
for child_stage_isd in child_stage_datas :
@@ -452,15 +469,15 @@ def __init__(
452
469
internal_df : DFRayDataFrameInternal ,
453
470
supervisor , # ray.actor.ActorHandle[DFRayContextSupervisor],
454
471
batch_size = 8192 ,
455
- partitions_per_worker : int | None = None ,
472
+ partitions_per_processor : int | None = None ,
456
473
prefetch_buffer_size = 0 ,
457
474
):
458
475
self .df = internal_df
459
476
self .supervisor = supervisor
460
477
self ._stages = None
461
478
self ._batches = None
462
479
self .batch_size = batch_size
463
- self .partitions_per_worker = partitions_per_worker
480
+ self .partitions_per_processor = partitions_per_processor
464
481
self .prefetch_buffer_size = prefetch_buffer_size
465
482
466
483
def stages (self ):
@@ -469,7 +486,7 @@ def stages(self):
469
486
self ._stages = self .df .stages (
470
487
self .batch_size ,
471
488
self .prefetch_buffer_size ,
472
- self .partitions_per_worker ,
489
+ self .partitions_per_processor ,
473
490
)
474
491
475
492
return self ._stages
@@ -503,7 +520,9 @@ def collect(self) -> list[pa.RecordBatch]:
503
520
)
504
521
log .debug (f"last stage addrs { last_stage_addrs } " )
505
522
506
- reader = self .df .read_final_stage (last_stage_id , last_stage_addrs [0 ])
523
+ reader = self .df .read_final_stage (
524
+ last_stage_id , last_stage_addrs [0 ]
525
+ )
507
526
log .debug ("got reader" )
508
527
self ._batches = list (reader )
509
528
return self ._batches
@@ -541,20 +560,20 @@ def __init__(
541
560
self ,
542
561
batch_size : int = 8192 ,
543
562
prefetch_buffer_size : int = 0 ,
544
- partitions_per_worker : int | None = None ,
545
- worker_pool_min : int = 1 ,
546
- worker_pool_max : int = 100 ,
563
+ partitions_per_processor : int | None = None ,
564
+ processor_pool_min : int = 1 ,
565
+ processor_pool_max : int = 100 ,
547
566
) -> None :
548
567
self .ctx = DFRayContextInternal ()
549
568
self .batch_size = batch_size
550
- self .partitions_per_worker = partitions_per_worker
569
+ self .partitions_per_processor = partitions_per_processor
551
570
self .prefetch_buffer_size = prefetch_buffer_size
552
571
553
572
self .supervisor = DFRayContextSupervisor .options (
554
573
name = "RayContextSupersisor" ,
555
574
).remote (
556
- worker_pool_min ,
557
- worker_pool_max ,
575
+ processor_pool_min ,
576
+ processor_pool_max ,
558
577
)
559
578
560
579
# start up our super visor and don't check in on it until its
@@ -603,7 +622,9 @@ def register_csv(self, name: str, path: str):
603
622
"""
604
623
self .ctx .register_csv (name , path )
605
624
606
- def register_listing_table (self , name : str , path : str , file_extention = "parquet" ):
625
+ def register_listing_table (
626
+ self , name : str , path : str , file_extention = "parquet"
627
+ ):
607
628
"""
608
629
Register a directory of parquet files with the given name.
609
630
The path can be a local filesystem path, absolute filesystem path, or a url.
@@ -629,7 +650,7 @@ def sql(self, query: str) -> DFRayDataFrame:
629
650
df ,
630
651
self .supervisor ,
631
652
self .batch_size ,
632
- self .partitions_per_worker ,
653
+ self .partitions_per_processor ,
633
654
self .prefetch_buffer_size ,
634
655
)
635
656
0 commit comments