Skip to content

Commit 11046d5

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent 32907fb commit 11046d5

File tree

3 files changed

+290
-61
lines changed

3 files changed

+290
-61
lines changed

pool/pool.py

+94-50
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ def __init__(
149149
# faster.
150150
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
151151

152-
# This is the list of payments that we have not sent yet, to farmers
153-
self.pending_payments: Optional[asyncio.Queue] = None
154-
155152
# Keeps track of the latest state of our node
156153
self.blockchain_state = {"peak": None}
157154

@@ -167,6 +164,7 @@ def __init__(
167164
self.collect_pool_rewards_loop_task: Optional[asyncio.Task] = None
168165
self.create_payment_loop_task: Optional[asyncio.Task] = None
169166
self.submit_payment_loop_task: Optional[asyncio.Task] = None
167+
self.confirm_payment_loop_task: Optional[asyncio.Task] = None
170168
self.get_peak_loop_task: Optional[asyncio.Task] = None
171169

172170
self.node_rpc_client: Optional[FullNodeRpcClient] = None
@@ -199,10 +197,9 @@ async def start(self):
199197
self.collect_pool_rewards_loop_task = asyncio.create_task(self.collect_pool_rewards_loop())
200198
self.create_payment_loop_task = asyncio.create_task(self.create_payment_loop())
201199
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
200+
self.confirm_payment_loop_task = asyncio.create_task(self.confirm_payment_loop())
202201
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
203202

204-
self.pending_payments = asyncio.Queue()
205-
206203
async def stop(self):
207204
if self.confirm_partials_loop_task is not None:
208205
self.confirm_partials_loop_task.cancel()
@@ -212,6 +209,8 @@ async def stop(self):
212209
self.create_payment_loop_task.cancel()
213210
if self.submit_payment_loop_task is not None:
214211
self.submit_payment_loop_task.cancel()
212+
if self.confirm_payment_loop_task is not None:
213+
self.confirm_payment_loop_task.cancel()
215214
if self.get_peak_loop_task is not None:
216215
self.get_peak_loop_task.cancel()
217216

@@ -356,8 +355,8 @@ async def create_payment_loop(self):
356355
await asyncio.sleep(60)
357356
continue
358357

359-
if self.pending_payments.qsize() != 0:
360-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
358+
if (pending_payment_count := await self.store.get_pending_payment_count()) != 0:
359+
self.log.warning(f"Pending payments ({pending_payment_count}), waiting")
361360
await asyncio.sleep(60)
362361
continue
363362

@@ -387,31 +386,45 @@ async def create_payment_loop(self):
387386
self.log.info(f"Total amount to distribute: {amount_to_distribute / (10 ** 12)}")
388387

389388
async with self.store.lock:
390-
# Get the points of each farmer, as well as payout instructions. Here a chia address is used,
391-
# but other blockchain addresses can also be used.
392-
points_and_ph: List[
393-
Tuple[uint64, bytes]
394-
] = await self.store.get_farmer_points_and_payout_instructions()
395-
total_points = sum([pt for (pt, ph) in points_and_ph])
389+
# Get the launcher_id and points of each farmer, as well as payout instructions.
390+
# Here a chia address is used, but other blockchain addresses can also be used.
391+
launcher_id_and_points_and_ph: List[
392+
Tuple[bytes32, uint64, bytes32]
393+
] = await self.store.get_farmer_launcher_id_and_points_and_payout_instructions()
394+
total_points = sum([pt for (launcher_id, pt, ph) in launcher_id_and_points_and_ph])
396395
if total_points > 0:
397396
mojo_per_point = floor(amount_to_distribute / total_points)
398397
self.log.info(f"Paying out {mojo_per_point} mojo / point")
399398

399+
# Pool fee payment record launcher_id is equal to puzzle_hash, points is 0.
400400
additions_sub_list: List[Dict] = [
401-
{"puzzle_hash": self.pool_fee_puzzle_hash, "amount": pool_coin_amount}
401+
{
402+
"launcher_id": self.pool_fee_puzzle_hash,
403+
"puzzle_hash": self.pool_fee_puzzle_hash,
404+
"amount": pool_coin_amount,
405+
"points": 0,
406+
}
402407
]
403-
for points, ph in points_and_ph:
408+
for launcher_id, points, ph in launcher_id_and_points_and_ph:
404409
if points > 0:
405-
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
406-
407-
if len(additions_sub_list) == self.max_additions_per_transaction:
408-
await self.pending_payments.put(additions_sub_list.copy())
409-
self.log.info(f"Will make payments: {additions_sub_list}")
410-
additions_sub_list = []
410+
additions_sub_list.append({
411+
"launcher_id": launcher_id,
412+
"puzzle_hash": ph,
413+
"amount": points * mojo_per_point,
414+
"points": points,
415+
})
416+
417+
for payment in additions_sub_list:
418+
await self.store.add_payment(
419+
payment["launcher_id"],
420+
payment["puzzle_hash"],
421+
uint64(payment["amount"]),
422+
payment["points"],
423+
uint64(int(time.time())),
424+
False,
425+
)
411426

412-
if len(additions_sub_list) > 0:
413-
self.log.info(f"Will make payments: {additions_sub_list}")
414-
await self.pending_payments.put(additions_sub_list.copy())
427+
self.log.info(f"Will make payments: {additions_sub_list}")
415428

416429
# Subtract the points from each farmer
417430
await self.store.clear_farmer_points()
@@ -430,58 +443,89 @@ async def create_payment_loop(self):
430443
async def submit_payment_loop(self):
431444
while True:
432445
try:
433-
peak_height = self.blockchain_state["peak"].height
434446
await self.wallet_rpc_client.log_in_and_skip(fingerprint=self.wallet_fingerprint)
435447
if not self.blockchain_state["sync"]["synced"] or not self.wallet_synced:
436448
self.log.warning("Waiting for wallet sync")
437449
await asyncio.sleep(60)
438450
continue
439451

440-
payment_targets = await self.pending_payments.get()
441-
assert len(payment_targets) > 0
452+
pending_payments = await self.store.get_pending_payment_records(self.max_additions_per_transaction)
453+
if len(pending_payments) == 0:
454+
self.log.info("No funds to pending payment records")
455+
await asyncio.sleep(60)
456+
continue
457+
self.log.info(f"Submitting a payment: {pending_payments}")
458+
459+
payment_targets: List[Dict] = []
460+
payment_records: List[Tuple[bytes32, uint64]] = []
442461

443-
self.log.info(f"Submitting a payment: {payment_targets}")
462+
for launcher_id, puzzle_hash, amount, _, timestamp, _, _, _ in pending_payments:
463+
payment_targets.append({"puzzle_hash": puzzle_hash, "amount": amount})
464+
payment_records.append((launcher_id, timestamp))
444465

445466
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
446467
# fee itself. Alternatively you can set it to 0 and wait longer
447468
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
448469
blockchain_fee: uint64 = uint64(0)
449470
try:
450-
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
451-
self.wallet_id, payment_targets, fee=blockchain_fee
452-
)
471+
async with self.store.tx():
472+
await self.store.update_is_payment(payment_records, auto_commit=False)
473+
474+
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
475+
self.wallet_id, payment_targets, fee=blockchain_fee
476+
)
453477
except ValueError as e:
454478
self.log.error(f"Error making payment: {e}")
455479
await asyncio.sleep(10)
456-
await self.pending_payments.put(payment_targets)
457480
continue
458481

482+
await self.store.update_transaction_id(payment_records, transaction_id=transaction.name)
459483
self.log.info(f"Transaction: {transaction}")
460484

461-
while (
462-
not transaction.confirmed
463-
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
464-
):
465-
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
485+
except asyncio.CancelledError:
486+
self.log.info("Cancelled submit_payment_loop, closing")
487+
return
488+
except Exception as e:
489+
self.log.error(f"Unexpected error in submit_payment_loop: {e}")
490+
await asyncio.sleep(60)
491+
492+
async def confirm_payment_loop(self):
493+
while True:
494+
try:
495+
confirming_payments = await self.store.get_confirming_payment_records()
496+
if len(confirming_payments) == 0:
497+
self.log.info("No funds to confirming payment records")
498+
await asyncio.sleep(60)
499+
continue
500+
self.log.info(f"Confirming a payment: {confirming_payments}")
501+
502+
for transaction_id in confirming_payments:
503+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction_id)
466504
peak_height = self.blockchain_state["peak"].height
467-
self.log.info(
468-
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
469-
)
470-
if not transaction.confirmed:
471-
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
472-
else:
473-
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
474-
await asyncio.sleep(10)
475505

476-
# TODO(pool): persist in DB
477-
self.log.info(f"Successfully confirmed payments {payment_targets}")
506+
while (
507+
not transaction.confirmed
508+
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
509+
):
510+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
511+
peak_height = self.blockchain_state["peak"].height
512+
self.log.info(
513+
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
514+
)
515+
if not transaction.confirmed:
516+
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
517+
else:
518+
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
519+
await asyncio.sleep(10)
520+
521+
await self.store.update_is_confirmed(transaction_id)
522+
self.log.info(f"Successfully confirmed payment {transaction_id}")
478523

479524
except asyncio.CancelledError:
480-
self.log.info("Cancelled submit_payment_loop, closing")
525+
self.log.info("Cancelled confirm_payment_loop, closing")
481526
return
482527
except Exception as e:
483-
# TODO(pool): retry transaction if failed
484-
self.log.error(f"Unexpected error in submit_payment_loop: {e}")
528+
self.log.error(f"Unexpected error in confirm_payment_loop: {e}")
485529
await asyncio.sleep(60)
486530

487531
async def confirm_partials_loop(self):

pool/store/abstract.py

+54-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def __init__(self):
2424
async def connect(self):
2525
"""Perform IO-related initialization"""
2626

27+
@abstractmethod
28+
async def tx(self):
29+
"""Performing Transactions for async with statement"""
30+
2731
@abstractmethod
2832
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
2933
"""Persist a new Farmer in the store"""
@@ -55,7 +59,7 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
5559
"""Fetch Farmers matching given puzzle hashes"""
5660

5761
@abstractmethod
58-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
62+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
5963
"""Fetch all farmers and their respective payout instructions"""
6064

6165
@abstractmethod
@@ -69,3 +73,52 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6973
@abstractmethod
7074
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7175
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
76+
77+
@abstractmethod
78+
async def add_payment(
79+
self,
80+
launcher_id: bytes32,
81+
puzzle_hash: bytes32,
82+
amount: uint64,
83+
points: int,
84+
timestamp: uint64,
85+
is_payment: bool,
86+
):
87+
"""Persist a new payment record in the store"""
88+
89+
@abstractmethod
90+
async def update_is_payment(
91+
self,
92+
launcher_id_and_timestamp: List[Tuple[bytes32, uint64]],
93+
auto_commit: bool = True,
94+
):
95+
"""
96+
Update is_payment is True for payment records identified by ``launcher_id`` and ``timestamp``.
97+
auto_commit decides whether to commit the transaction.
98+
"""
99+
100+
@abstractmethod
101+
async def get_pending_payment_records(self, count: int) -> List[
102+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool, bytes32, bool]
103+
]:
104+
"""Fetch ``count`` pending payment records"""
105+
106+
@abstractmethod
107+
async def get_pending_payment_count(self) -> int:
108+
"""Fetch pending payment records count"""
109+
110+
@abstractmethod
111+
async def get_confirming_payment_records(self) -> List[bytes32]:
112+
"""Fetch confirming payment records"""
113+
114+
@abstractmethod
115+
async def update_transaction_id(
116+
self,
117+
launcher_id_and_timestamp: List[Tuple[bytes32, uint64]],
118+
transaction_id: bytes32,
119+
):
120+
"""Update transaction_id for payment records identified by ``launcher_id`` and ``timestamp``."""
121+
122+
@abstractmethod
123+
async def update_is_confirmed(self, transaction_id: bytes32):
124+
"""Update is_confirmed is True for payment records identified by ``transaction_id``"""

0 commit comments

Comments
 (0)