Skip to content

Commit 6d63286

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent 32907fb commit 6d63286

File tree

3 files changed

+323
-71
lines changed

3 files changed

+323
-71
lines changed

pool/pool.py

+105-56
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ def __init__(
149149
# faster.
150150
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
151151

152-
# This is the list of payments that we have not sent yet, to farmers
153-
self.pending_payments: Optional[asyncio.Queue] = None
154-
155152
# Keeps track of the latest state of our node
156153
self.blockchain_state = {"peak": None}
157154

@@ -167,6 +164,7 @@ def __init__(
167164
self.collect_pool_rewards_loop_task: Optional[asyncio.Task] = None
168165
self.create_payment_loop_task: Optional[asyncio.Task] = None
169166
self.submit_payment_loop_task: Optional[asyncio.Task] = None
167+
self.confirm_payment_loop_task: Optional[asyncio.Task] = None
170168
self.get_peak_loop_task: Optional[asyncio.Task] = None
171169

172170
self.node_rpc_client: Optional[FullNodeRpcClient] = None
@@ -199,10 +197,9 @@ async def start(self):
199197
self.collect_pool_rewards_loop_task = asyncio.create_task(self.collect_pool_rewards_loop())
200198
self.create_payment_loop_task = asyncio.create_task(self.create_payment_loop())
201199
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
200+
self.confirm_payment_loop_task = asyncio.create_task(self.confirm_payment_loop())
202201
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
203202

204-
self.pending_payments = asyncio.Queue()
205-
206203
async def stop(self):
207204
if self.confirm_partials_loop_task is not None:
208205
self.confirm_partials_loop_task.cancel()
@@ -212,6 +209,8 @@ async def stop(self):
212209
self.create_payment_loop_task.cancel()
213210
if self.submit_payment_loop_task is not None:
214211
self.submit_payment_loop_task.cancel()
212+
if self.confirm_payment_loop_task is not None:
213+
self.confirm_payment_loop_task.cancel()
215214
if self.get_peak_loop_task is not None:
216215
self.get_peak_loop_task.cancel()
217216

@@ -221,6 +220,23 @@ async def stop(self):
221220
await self.node_rpc_client.await_closed()
222221
await self.store.connection.close()
223222

223+
async def confirm_transaction(self, transaction):
224+
peak_height = self.blockchain_state["peak"].height
225+
while (
226+
not transaction.confirmed
227+
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
228+
):
229+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
230+
peak_height = self.blockchain_state["peak"].height
231+
self.log.info(
232+
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
233+
)
234+
if not transaction.confirmed:
235+
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
236+
else:
237+
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
238+
await asyncio.sleep(10)
239+
224240
async def get_peak_loop(self):
225241
"""
226242
Periodically contacts the full node to get the latest state of the blockchain
@@ -356,8 +372,8 @@ async def create_payment_loop(self):
356372
await asyncio.sleep(60)
357373
continue
358374

359-
if self.pending_payments.qsize() != 0:
360-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
375+
if (pending_payment_count := await self.store.get_pending_payment_count()) != 0:
376+
self.log.warning(f"Pending payments ({pending_payment_count}), waiting")
361377
await asyncio.sleep(60)
362378
continue
363379

@@ -374,7 +390,7 @@ async def create_payment_loop(self):
374390
await asyncio.sleep(120)
375391
continue
376392

377-
total_amount_claimed = sum([c.coin.amount for c in coin_records])
393+
total_amount_claimed = sum(c.coin.amount for c in coin_records)
378394
pool_coin_amount = int(total_amount_claimed * self.pool_fee)
379395
amount_to_distribute = total_amount_claimed - pool_coin_amount
380396

@@ -387,34 +403,50 @@ async def create_payment_loop(self):
387403
self.log.info(f"Total amount to distribute: {amount_to_distribute / (10 ** 12)}")
388404

389405
async with self.store.lock:
390-
# Get the points of each farmer, as well as payout instructions. Here a chia address is used,
391-
# but other blockchain addresses can also be used.
392-
points_and_ph: List[
393-
Tuple[uint64, bytes]
394-
] = await self.store.get_farmer_points_and_payout_instructions()
395-
total_points = sum([pt for (pt, ph) in points_and_ph])
406+
# Get the launcher_id and points of each farmer, as well as payout instructions.
407+
# Here a chia address is used, but other blockchain addresses can also be used.
408+
launcher_id_and_points_and_ph: List[
409+
Tuple[bytes32, uint64, bytes32]
410+
] = await self.store.get_farmer_launcher_id_and_points_and_payout_instructions()
411+
total_points = sum([pt for (launcher_id, pt, ph) in launcher_id_and_points_and_ph])
396412
if total_points > 0:
397413
mojo_per_point = floor(amount_to_distribute / total_points)
398414
self.log.info(f"Paying out {mojo_per_point} mojo / point")
399415

416+
# Pool fee payment record launcher_id is equal to puzzle_hash, points is 0.
400417
additions_sub_list: List[Dict] = [
401-
{"puzzle_hash": self.pool_fee_puzzle_hash, "amount": pool_coin_amount}
418+
{
419+
"launcher_id": self.pool_fee_puzzle_hash,
420+
"puzzle_hash": self.pool_fee_puzzle_hash,
421+
"amount": pool_coin_amount,
422+
"points": 0,
423+
}
402424
]
403-
for points, ph in points_and_ph:
425+
for launcher_id, points, ph in launcher_id_and_points_and_ph:
404426
if points > 0:
405-
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
406-
407-
if len(additions_sub_list) == self.max_additions_per_transaction:
408-
await self.pending_payments.put(additions_sub_list.copy())
409-
self.log.info(f"Will make payments: {additions_sub_list}")
410-
additions_sub_list = []
411-
412-
if len(additions_sub_list) > 0:
413-
self.log.info(f"Will make payments: {additions_sub_list}")
414-
await self.pending_payments.put(additions_sub_list.copy())
415-
416-
# Subtract the points from each farmer
417-
await self.store.clear_farmer_points()
427+
additions_sub_list.append({
428+
"launcher_id": launcher_id,
429+
"puzzle_hash": ph,
430+
"amount": points * mojo_per_point,
431+
"points": points,
432+
})
433+
434+
async with self.store.tx():
435+
for payment in additions_sub_list:
436+
await self.store.add_payment(
437+
payment["launcher_id"],
438+
payment["puzzle_hash"],
439+
uint64(payment["amount"]),
440+
payment["points"],
441+
uint64(int(time.time())),
442+
False,
443+
auto_commit=False,
444+
)
445+
446+
# Subtract the points from each farmer
447+
await self.store.clear_farmer_points(auto_commit=False)
448+
449+
self.log.info(f"Will make payments: {additions_sub_list}")
418450
else:
419451
self.log.info(f"No points for any farmer. Waiting {self.payment_interval}")
420452

@@ -430,60 +462,77 @@ async def create_payment_loop(self):
430462
async def submit_payment_loop(self):
431463
while True:
432464
try:
433-
peak_height = self.blockchain_state["peak"].height
434465
await self.wallet_rpc_client.log_in_and_skip(fingerprint=self.wallet_fingerprint)
435466
if not self.blockchain_state["sync"]["synced"] or not self.wallet_synced:
436467
self.log.warning("Waiting for wallet sync")
437468
await asyncio.sleep(60)
438469
continue
439470

440-
payment_targets = await self.pending_payments.get()
441-
assert len(payment_targets) > 0
471+
pending_payments = await self.store.get_pending_payment_records(self.max_additions_per_transaction)
472+
if len(pending_payments) == 0:
473+
self.log.info("No funds to pending payment records")
474+
await asyncio.sleep(60)
475+
continue
476+
self.log.info(f"Submitting a payment: {pending_payments}")
442477

443-
self.log.info(f"Submitting a payment: {payment_targets}")
478+
payment_targets: List[Dict] = []
479+
payment_records: List[Tuple[bytes32, uint64]] = []
480+
481+
for launcher_id, puzzle_hash, amount, _, timestamp, _, _ in pending_payments:
482+
payment_targets.append({"puzzle_hash": puzzle_hash, "amount": amount})
483+
payment_records.append((launcher_id, timestamp))
444484

445485
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
446486
# fee itself. Alternatively you can set it to 0 and wait longer
447487
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
448488
blockchain_fee: uint64 = uint64(0)
449489
try:
450-
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
451-
self.wallet_id, payment_targets, fee=blockchain_fee
452-
)
490+
async with self.store.tx():
491+
await self.store.update_is_payment(payment_records, auto_commit=False)
492+
493+
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
494+
self.wallet_id, payment_targets, fee=blockchain_fee
495+
)
453496
except ValueError as e:
454497
self.log.error(f"Error making payment: {e}")
455498
await asyncio.sleep(10)
456-
await self.pending_payments.put(payment_targets)
457499
continue
458500

501+
await self.store.update_transaction_id(payment_records, transaction_id=transaction.name)
459502
self.log.info(f"Transaction: {transaction}")
460503

461-
while (
462-
not transaction.confirmed
463-
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
464-
):
465-
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
466-
peak_height = self.blockchain_state["peak"].height
467-
self.log.info(
468-
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
469-
)
470-
if not transaction.confirmed:
471-
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
472-
else:
473-
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
474-
await asyncio.sleep(10)
475-
476-
# TODO(pool): persist in DB
477-
self.log.info(f"Successfully confirmed payments {payment_targets}")
478-
504+
await self.confirm_transaction(transaction)
479505
except asyncio.CancelledError:
480506
self.log.info("Cancelled submit_payment_loop, closing")
481507
return
482508
except Exception as e:
483-
# TODO(pool): retry transaction if failed
484509
self.log.error(f"Unexpected error in submit_payment_loop: {e}")
485510
await asyncio.sleep(60)
486511

512+
async def confirm_payment_loop(self):
513+
while True:
514+
try:
515+
confirming_payments = await self.store.get_confirming_payment_records()
516+
if len(confirming_payments) == 0:
517+
self.log.info("No funds to confirming payment records")
518+
await asyncio.sleep(60)
519+
continue
520+
self.log.info(f"Confirming a payment: {confirming_payments}")
521+
522+
for transaction_id in confirming_payments:
523+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction_id)
524+
await self.confirm_transaction(transaction)
525+
526+
await self.store.update_is_confirmed(transaction_id)
527+
self.log.info(f"Successfully confirmed payment {transaction_id}")
528+
529+
except asyncio.CancelledError:
530+
self.log.info("Cancelled confirm_payment_loop, closing")
531+
return
532+
except Exception as e:
533+
self.log.error(f"Unexpected error in confirm_payment_loop: {e}")
534+
await asyncio.sleep(60)
535+
487536
async def confirm_partials_loop(self):
488537
"""
489538
Pulls things from the queue of partials one at a time, and adjusts balances.

pool/store/abstract.py

+63-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def __init__(self):
2424
async def connect(self):
2525
"""Perform IO-related initialization"""
2626

27+
@abstractmethod
28+
async def tx(self):
29+
"""Performing Transactions for async with statement"""
30+
2731
@abstractmethod
2832
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
2933
"""Persist a new Farmer in the store"""
@@ -55,12 +59,15 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
5559
"""Fetch Farmers matching given puzzle hashes"""
5660

5761
@abstractmethod
58-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
62+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
5963
"""Fetch all farmers and their respective payout instructions"""
6064

6165
@abstractmethod
62-
async def clear_farmer_points(self) -> None:
63-
"""Rest all Farmers' points to 0"""
66+
async def clear_farmer_points(self, auto_commit: bool = True) -> None:
67+
"""
68+
Rest all Farmers' points to 0
69+
auto_commit decides whether to commit the transaction.
70+
"""
6471

6572
@abstractmethod
6673
async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty: uint64):
@@ -69,3 +76,56 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6976
@abstractmethod
7077
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7178
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
79+
80+
@abstractmethod
81+
async def add_payment(
82+
self,
83+
launcher_id: bytes32,
84+
puzzle_hash: bytes32,
85+
amount: uint64,
86+
points: int,
87+
timestamp: uint64,
88+
is_payment: bool,
89+
auto_commit: bool = True,
90+
):
91+
"""
92+
Persist a new payment record in the store
93+
auto_commit decides whether to commit the transaction.
94+
"""
95+
96+
@abstractmethod
97+
async def update_is_payment(
98+
self,
99+
launcher_id_and_timestamp: List[Tuple[bytes32, uint64]],
100+
auto_commit: bool = True,
101+
):
102+
"""
103+
Update is_payment is True for payment records identified by ``launcher_id`` and ``timestamp``.
104+
auto_commit decides whether to commit the transaction.
105+
"""
106+
107+
@abstractmethod
108+
async def get_pending_payment_records(self, count: int) -> List[
109+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool, bool]
110+
]:
111+
"""Fetch ``count`` pending payment records"""
112+
113+
@abstractmethod
114+
async def get_pending_payment_count(self) -> int:
115+
"""Fetch pending payment records count"""
116+
117+
@abstractmethod
118+
async def get_confirming_payment_records(self) -> List[bytes32]:
119+
"""Fetch confirming payment records"""
120+
121+
@abstractmethod
122+
async def update_transaction_id(
123+
self,
124+
launcher_id_and_timestamp: List[Tuple[bytes32, uint64]],
125+
transaction_id: bytes32,
126+
):
127+
"""Update transaction_id for payment records identified by ``launcher_id`` and ``timestamp``."""
128+
129+
@abstractmethod
130+
async def update_is_confirmed(self, transaction_id: bytes32):
131+
"""Update is_confirmed is True for payment records identified by ``transaction_id``"""

0 commit comments

Comments
 (0)