@@ -155,9 +155,6 @@ def __init__(
155
155
# faster.
156
156
self .max_additions_per_transaction = pool_config ["max_additions_per_transaction" ]
157
157
158
- # This is the list of payments that we have not sent yet, to farmers
159
- self .pending_payments : Optional [asyncio .Queue ] = None
160
-
161
158
# Keeps track of the latest state of our node
162
159
self .blockchain_state = {"peak" : None }
163
160
@@ -207,8 +204,6 @@ async def start(self):
207
204
self .submit_payment_loop_task = asyncio .create_task (self .submit_payment_loop ())
208
205
self .get_peak_loop_task = asyncio .create_task (self .get_peak_loop ())
209
206
210
- self .pending_payments = asyncio .Queue ()
211
-
212
207
async def stop (self ):
213
208
if self .confirm_partials_loop_task is not None :
214
209
self .confirm_partials_loop_task .cancel ()
@@ -362,8 +357,8 @@ async def create_payment_loop(self):
362
357
await asyncio .sleep (60 )
363
358
continue
364
359
365
- if self .pending_payments . qsize ( ) != 0 :
366
- self .log .warning (f"Pending payments ({ self . pending_payments . qsize () } ), waiting" )
360
+ if ( pending_payment_count := await self .store . get_payment_count_by_is_payment ( False ) ) != 0 :
361
+ self .log .warning (f"Pending payments ({ pending_payment_count } ), waiting" )
367
362
await asyncio .sleep (60 )
368
363
continue
369
364
@@ -408,14 +403,11 @@ async def create_payment_loop(self):
408
403
if points > 0 :
409
404
additions_sub_list .append ({"puzzle_hash" : ph , "amount" : points * mojo_per_point })
410
405
411
- if len (additions_sub_list ) == self .max_additions_per_transaction :
412
- await self .pending_payments .put (additions_sub_list .copy ())
413
- self .log .info (f"Will make payments: { additions_sub_list } " )
414
- additions_sub_list = []
406
+ for ph , amount in additions_sub_list :
407
+ await self .store .add_payment (ph , uint64 (amount ), uint64 (int (time .time ())), False )
415
408
416
409
if len (additions_sub_list ) > 0 :
417
410
self .log .info (f"Will make payments: { additions_sub_list } " )
418
- await self .pending_payments .put (additions_sub_list .copy ())
419
411
420
412
# Subtract the points from each farmer
421
413
await self .store .clear_farmer_points ()
@@ -441,9 +433,24 @@ async def submit_payment_loop(self):
441
433
await asyncio .sleep (60 )
442
434
continue
443
435
444
- payment_targets = await self .pending_payments .get ()
445
- assert len (payment_targets ) > 0
436
+ payment_records = await self .store .get_payment_records_by_is_payment (
437
+ False , self .max_additions_per_transaction
438
+ )
439
+ if len (payment_records ) == 0 :
440
+ self .log .info ("No funds to payment record" )
441
+ await asyncio .sleep (60 )
442
+ continue
446
443
444
+ payment_targets : List [Dict ] = [
445
+ {
446
+ "puzzle_hash" : puzzle_hash ,
447
+ "amount" : amount ,
448
+ }
449
+ for puzzle_hash , amount , _ , _ in payment_records
450
+ ]
451
+ await self .store .update_payments_is_payment (
452
+ [(puzzle_hash , timestamp ) for puzzle_hash , _ , timestamp , _ in payment_records ], True
453
+ )
447
454
self .log .info (f"Submitting a payment: { payment_targets } " )
448
455
449
456
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
@@ -457,7 +464,9 @@ async def submit_payment_loop(self):
457
464
except ValueError as e :
458
465
self .log .error (f"Error making payment: { e } " )
459
466
await asyncio .sleep (10 )
460
- await self .pending_payments .put (payment_targets )
467
+ await self .store .update_payments_is_payment (
468
+ [(puzzle_hash , timestamp ) for puzzle_hash , _ , timestamp , _ in payment_records ], False
469
+ )
461
470
continue
462
471
463
472
self .log .info (f"Transaction: { transaction } " )
0 commit comments