@@ -149,9 +149,6 @@ def __init__(
149
149
# faster.
150
150
self .max_additions_per_transaction = pool_config ["max_additions_per_transaction" ]
151
151
152
- # This is the list of payments that we have not sent yet, to farmers
153
- self .pending_payments : Optional [asyncio .Queue ] = None
154
-
155
152
# Keeps track of the latest state of our node
156
153
self .blockchain_state = {"peak" : None }
157
154
@@ -167,6 +164,7 @@ def __init__(
167
164
self .collect_pool_rewards_loop_task : Optional [asyncio .Task ] = None
168
165
self .create_payment_loop_task : Optional [asyncio .Task ] = None
169
166
self .submit_payment_loop_task : Optional [asyncio .Task ] = None
167
+ self .confirm_payment_loop_task : Optional [asyncio .Task ] = None
170
168
self .get_peak_loop_task : Optional [asyncio .Task ] = None
171
169
172
170
self .node_rpc_client : Optional [FullNodeRpcClient ] = None
@@ -199,10 +197,9 @@ async def start(self):
199
197
self .collect_pool_rewards_loop_task = asyncio .create_task (self .collect_pool_rewards_loop ())
200
198
self .create_payment_loop_task = asyncio .create_task (self .create_payment_loop ())
201
199
self .submit_payment_loop_task = asyncio .create_task (self .submit_payment_loop ())
200
+ self .confirm_payment_loop_task = asyncio .create_task (self .confirm_payment_loop ())
202
201
self .get_peak_loop_task = asyncio .create_task (self .get_peak_loop ())
203
202
204
- self .pending_payments = asyncio .Queue ()
205
-
206
203
async def stop (self ):
207
204
if self .confirm_partials_loop_task is not None :
208
205
self .confirm_partials_loop_task .cancel ()
@@ -212,6 +209,8 @@ async def stop(self):
212
209
self .create_payment_loop_task .cancel ()
213
210
if self .submit_payment_loop_task is not None :
214
211
self .submit_payment_loop_task .cancel ()
212
+ if self .confirm_payment_loop_task is not None :
213
+ self .confirm_payment_loop_task .cancel ()
215
214
if self .get_peak_loop_task is not None :
216
215
self .get_peak_loop_task .cancel ()
217
216
@@ -356,8 +355,8 @@ async def create_payment_loop(self):
356
355
await asyncio .sleep (60 )
357
356
continue
358
357
359
- if self .pending_payments . qsize ( ) != 0 :
360
- self .log .warning (f"Pending payments ({ self . pending_payments . qsize () } ), waiting" )
358
+ if ( pending_payment_count := await self .store . get_pending_payment_count () ) != 0 :
359
+ self .log .warning (f"Pending payments ({ pending_payment_count } ), waiting" )
361
360
await asyncio .sleep (60 )
362
361
continue
363
362
@@ -387,34 +386,50 @@ async def create_payment_loop(self):
387
386
self .log .info (f"Total amount to distribute: { amount_to_distribute / (10 ** 12 )} " )
388
387
389
388
async with self .store .lock :
390
- # Get the points of each farmer, as well as payout instructions. Here a chia address is used,
391
- # but other blockchain addresses can also be used.
392
- points_and_ph : List [
393
- Tuple [uint64 , bytes ]
394
- ] = await self .store .get_farmer_points_and_payout_instructions ()
395
- total_points = sum ([pt for (pt , ph ) in points_and_ph ])
389
+ # Get the launcher_id and points of each farmer, as well as payout instructions.
390
+ # Here a chia address is used, but other blockchain addresses can also be used.
391
+ launcher_id_and_points_and_ph : List [
392
+ Tuple [bytes32 , uint64 , bytes32 ]
393
+ ] = await self .store .get_farmer_launcher_id_and_points_and_payout_instructions ()
394
+ total_points = sum ([pt for (launcher_id , pt , ph ) in launcher_id_and_points_and_ph ])
396
395
if total_points > 0 :
397
396
mojo_per_point = floor (amount_to_distribute / total_points )
398
397
self .log .info (f"Paying out { mojo_per_point } mojo / point" )
399
398
399
+ # Pool fee payment record launcher_id is equal to puzzle_hash, points is 0.
400
400
additions_sub_list : List [Dict ] = [
401
- {"puzzle_hash" : self .pool_fee_puzzle_hash , "amount" : pool_coin_amount }
401
+ {
402
+ "launcher_id" : self .pool_fee_puzzle_hash ,
403
+ "puzzle_hash" : self .pool_fee_puzzle_hash ,
404
+ "amount" : pool_coin_amount ,
405
+ "points" : 0 ,
406
+ }
402
407
]
403
- for points , ph in points_and_ph :
408
+ for launcher_id , points , ph in launcher_id_and_points_and_ph :
404
409
if points > 0 :
405
- additions_sub_list .append ({"puzzle_hash" : ph , "amount" : points * mojo_per_point })
406
-
407
- if len (additions_sub_list ) == self .max_additions_per_transaction :
408
- await self .pending_payments .put (additions_sub_list .copy ())
409
- self .log .info (f"Will make payments: { additions_sub_list } " )
410
- additions_sub_list = []
411
-
412
- if len (additions_sub_list ) > 0 :
413
- self .log .info (f"Will make payments: { additions_sub_list } " )
414
- await self .pending_payments .put (additions_sub_list .copy ())
415
-
416
- # Subtract the points from each farmer
417
- await self .store .clear_farmer_points ()
410
+ additions_sub_list .append ({
411
+ "launcher_id" : launcher_id ,
412
+ "puzzle_hash" : ph ,
413
+ "amount" : points * mojo_per_point ,
414
+ "points" : points ,
415
+ })
416
+
417
+ async with self .store .tx ():
418
+ for payment in additions_sub_list :
419
+ await self .store .add_payment (
420
+ payment ["launcher_id" ],
421
+ payment ["puzzle_hash" ],
422
+ uint64 (payment ["amount" ]),
423
+ payment ["points" ],
424
+ uint64 (int (time .time ())),
425
+ False ,
426
+ auto_commit = False ,
427
+ )
428
+
429
+ # Subtract the points from each farmer
430
+ await self .store .clear_farmer_points (auto_commit = False )
431
+
432
+ self .log .info (f"Will make payments: { additions_sub_list } " )
418
433
else :
419
434
self .log .info (f"No points for any farmer. Waiting { self .payment_interval } " )
420
435
@@ -430,58 +445,89 @@ async def create_payment_loop(self):
430
445
async def submit_payment_loop (self ):
431
446
while True :
432
447
try :
433
- peak_height = self .blockchain_state ["peak" ].height
434
448
await self .wallet_rpc_client .log_in_and_skip (fingerprint = self .wallet_fingerprint )
435
449
if not self .blockchain_state ["sync" ]["synced" ] or not self .wallet_synced :
436
450
self .log .warning ("Waiting for wallet sync" )
437
451
await asyncio .sleep (60 )
438
452
continue
439
453
440
- payment_targets = await self .pending_payments .get ()
441
- assert len (payment_targets ) > 0
454
+ pending_payments = await self .store .get_pending_payment_records (self .max_additions_per_transaction )
455
+ if len (pending_payments ) == 0 :
456
+ self .log .info ("No funds to pending payment records" )
457
+ await asyncio .sleep (60 )
458
+ continue
459
+ self .log .info (f"Submitting a payment: { pending_payments } " )
460
+
461
+ payment_targets : List [Dict ] = []
462
+ payment_records : List [Tuple [bytes32 , uint64 ]] = []
442
463
443
- self .log .info (f"Submitting a payment: { payment_targets } " )
464
+ for launcher_id , puzzle_hash , amount , _ , timestamp , _ , _ , _ in pending_payments :
465
+ payment_targets .append ({"puzzle_hash" : puzzle_hash , "amount" : amount })
466
+ payment_records .append ((launcher_id , timestamp ))
444
467
445
468
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
446
469
# fee itself. Alternatively you can set it to 0 and wait longer
447
470
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
448
471
blockchain_fee : uint64 = uint64 (0 )
449
472
try :
450
- transaction : TransactionRecord = await self .wallet_rpc_client .send_transaction_multi (
451
- self .wallet_id , payment_targets , fee = blockchain_fee
452
- )
473
+ async with self .store .tx ():
474
+ await self .store .update_is_payment (payment_records , auto_commit = False )
475
+
476
+ transaction : TransactionRecord = await self .wallet_rpc_client .send_transaction_multi (
477
+ self .wallet_id , payment_targets , fee = blockchain_fee
478
+ )
453
479
except ValueError as e :
454
480
self .log .error (f"Error making payment: { e } " )
455
481
await asyncio .sleep (10 )
456
- await self .pending_payments .put (payment_targets )
457
482
continue
458
483
484
+ await self .store .update_transaction_id (payment_records , transaction_id = transaction .name )
459
485
self .log .info (f"Transaction: { transaction } " )
460
486
461
- while (
462
- not transaction .confirmed
463
- or not (peak_height - transaction .confirmed_at_height ) > self .confirmation_security_threshold
464
- ):
465
- transaction = await self .wallet_rpc_client .get_transaction (self .wallet_id , transaction .name )
487
+ except asyncio .CancelledError :
488
+ self .log .info ("Cancelled submit_payment_loop, closing" )
489
+ return
490
+ except Exception as e :
491
+ self .log .error (f"Unexpected error in submit_payment_loop: { e } " )
492
+ await asyncio .sleep (60 )
493
+
494
+ async def confirm_payment_loop (self ):
495
+ while True :
496
+ try :
497
+ confirming_payments = await self .store .get_confirming_payment_records ()
498
+ if len (confirming_payments ) == 0 :
499
+ self .log .info ("No funds to confirming payment records" )
500
+ await asyncio .sleep (60 )
501
+ continue
502
+ self .log .info (f"Confirming a payment: { confirming_payments } " )
503
+
504
+ for transaction_id in confirming_payments :
505
+ transaction = await self .wallet_rpc_client .get_transaction (self .wallet_id , transaction_id )
466
506
peak_height = self .blockchain_state ["peak" ].height
467
- self .log .info (
468
- f"Waiting for transaction to obtain { self .confirmation_security_threshold } confirmations"
469
- )
470
- if not transaction .confirmed :
471
- self .log .info (f"Not confirmed. In mempool? { transaction .is_in_mempool ()} " )
472
- else :
473
- self .log .info (f"Confirmations: { peak_height - transaction .confirmed_at_height } " )
474
- await asyncio .sleep (10 )
475
507
476
- # TODO(pool): persist in DB
477
- self .log .info (f"Successfully confirmed payments { payment_targets } " )
508
+ while (
509
+ not transaction .confirmed
510
+ or not (peak_height - transaction .confirmed_at_height ) > self .confirmation_security_threshold
511
+ ):
512
+ transaction = await self .wallet_rpc_client .get_transaction (self .wallet_id , transaction .name )
513
+ peak_height = self .blockchain_state ["peak" ].height
514
+ self .log .info (
515
+ f"Waiting for transaction to obtain { self .confirmation_security_threshold } confirmations"
516
+ )
517
+ if not transaction .confirmed :
518
+ self .log .info (f"Not confirmed. In mempool? { transaction .is_in_mempool ()} " )
519
+ else :
520
+ self .log .info (f"Confirmations: { peak_height - transaction .confirmed_at_height } " )
521
+ await asyncio .sleep (10 )
522
+
523
+ await self .store .update_is_confirmed (transaction_id )
524
+ self .log .info (f"Successfully confirmed payment { transaction_id } " )
478
525
479
526
except asyncio .CancelledError :
480
- self .log .info ("Cancelled submit_payment_loop , closing" )
527
+ self .log .info ("Cancelled confirm_payment_loop , closing" )
481
528
return
482
529
except Exception as e :
483
- # TODO(pool): retry transaction if failed
484
- self .log .error (f"Unexpected error in submit_payment_loop: { e } " )
530
+ self .log .error (f"Unexpected error in confirm_payment_loop: { e } " )
485
531
await asyncio .sleep (60 )
486
532
487
533
async def confirm_partials_loop (self ):
0 commit comments