Skip to content

Commit 0ceaa9b

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent 32907fb commit 0ceaa9b

File tree

3 files changed

+315
-69
lines changed

3 files changed

+315
-69
lines changed

pool/pool.py

+100-54
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ def __init__(
149149
# faster.
150150
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
151151

152-
# This is the list of payments that we have not sent yet, to farmers
153-
self.pending_payments: Optional[asyncio.Queue] = None
154-
155152
# Keeps track of the latest state of our node
156153
self.blockchain_state = {"peak": None}
157154

@@ -167,6 +164,7 @@ def __init__(
167164
self.collect_pool_rewards_loop_task: Optional[asyncio.Task] = None
168165
self.create_payment_loop_task: Optional[asyncio.Task] = None
169166
self.submit_payment_loop_task: Optional[asyncio.Task] = None
167+
self.confirm_payment_loop_task: Optional[asyncio.Task] = None
170168
self.get_peak_loop_task: Optional[asyncio.Task] = None
171169

172170
self.node_rpc_client: Optional[FullNodeRpcClient] = None
@@ -199,10 +197,9 @@ async def start(self):
199197
self.collect_pool_rewards_loop_task = asyncio.create_task(self.collect_pool_rewards_loop())
200198
self.create_payment_loop_task = asyncio.create_task(self.create_payment_loop())
201199
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
200+
self.confirm_payment_loop_task = asyncio.create_task(self.confirm_payment_loop())
202201
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
203202

204-
self.pending_payments = asyncio.Queue()
205-
206203
async def stop(self):
207204
if self.confirm_partials_loop_task is not None:
208205
self.confirm_partials_loop_task.cancel()
@@ -212,6 +209,8 @@ async def stop(self):
212209
self.create_payment_loop_task.cancel()
213210
if self.submit_payment_loop_task is not None:
214211
self.submit_payment_loop_task.cancel()
212+
if self.confirm_payment_loop_task is not None:
213+
self.confirm_payment_loop_task.cancel()
215214
if self.get_peak_loop_task is not None:
216215
self.get_peak_loop_task.cancel()
217216

@@ -356,8 +355,8 @@ async def create_payment_loop(self):
356355
await asyncio.sleep(60)
357356
continue
358357

359-
if self.pending_payments.qsize() != 0:
360-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
358+
if (pending_payment_count := await self.store.get_pending_payment_count()) != 0:
359+
self.log.warning(f"Pending payments ({pending_payment_count}), waiting")
361360
await asyncio.sleep(60)
362361
continue
363362

@@ -387,34 +386,50 @@ async def create_payment_loop(self):
387386
self.log.info(f"Total amount to distribute: {amount_to_distribute / (10 ** 12)}")
388387

389388
async with self.store.lock:
390-
# Get the points of each farmer, as well as payout instructions. Here a chia address is used,
391-
# but other blockchain addresses can also be used.
392-
points_and_ph: List[
393-
Tuple[uint64, bytes]
394-
] = await self.store.get_farmer_points_and_payout_instructions()
395-
total_points = sum([pt for (pt, ph) in points_and_ph])
389+
# Get the launcher_id and points of each farmer, as well as payout instructions.
390+
# Here a chia address is used, but other blockchain addresses can also be used.
391+
launcher_id_and_points_and_ph: List[
392+
Tuple[bytes32, uint64, bytes32]
393+
] = await self.store.get_farmer_launcher_id_and_points_and_payout_instructions()
394+
total_points = sum([pt for (launcher_id, pt, ph) in launcher_id_and_points_and_ph])
396395
if total_points > 0:
397396
mojo_per_point = floor(amount_to_distribute / total_points)
398397
self.log.info(f"Paying out {mojo_per_point} mojo / point")
399398

399+
# Pool fee payment record launcher_id is equal to puzzle_hash, points is 0.
400400
additions_sub_list: List[Dict] = [
401-
{"puzzle_hash": self.pool_fee_puzzle_hash, "amount": pool_coin_amount}
401+
{
402+
"launcher_id": self.pool_fee_puzzle_hash,
403+
"puzzle_hash": self.pool_fee_puzzle_hash,
404+
"amount": pool_coin_amount,
405+
"points": 0,
406+
}
402407
]
403-
for points, ph in points_and_ph:
408+
for launcher_id, points, ph in launcher_id_and_points_and_ph:
404409
if points > 0:
405-
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
406-
407-
if len(additions_sub_list) == self.max_additions_per_transaction:
408-
await self.pending_payments.put(additions_sub_list.copy())
409-
self.log.info(f"Will make payments: {additions_sub_list}")
410-
additions_sub_list = []
411-
412-
if len(additions_sub_list) > 0:
413-
self.log.info(f"Will make payments: {additions_sub_list}")
414-
await self.pending_payments.put(additions_sub_list.copy())
415-
416-
# Subtract the points from each farmer
417-
await self.store.clear_farmer_points()
410+
additions_sub_list.append({
411+
"launcher_id": launcher_id,
412+
"puzzle_hash": ph,
413+
"amount": points * mojo_per_point,
414+
"points": points,
415+
})
416+
417+
async with self.store.tx():
418+
for payment in additions_sub_list:
419+
await self.store.add_payment(
420+
payment["launcher_id"],
421+
payment["puzzle_hash"],
422+
uint64(payment["amount"]),
423+
payment["points"],
424+
uint64(int(time.time())),
425+
False,
426+
auto_commit=False,
427+
)
428+
429+
# Subtract the points from each farmer
430+
await self.store.clear_farmer_points(auto_commit=False)
431+
432+
self.log.info(f"Will make payments: {additions_sub_list}")
418433
else:
419434
self.log.info(f"No points for any farmer. Waiting {self.payment_interval}")
420435

@@ -430,58 +445,89 @@ async def create_payment_loop(self):
430445
async def submit_payment_loop(self):
431446
while True:
432447
try:
433-
peak_height = self.blockchain_state["peak"].height
434448
await self.wallet_rpc_client.log_in_and_skip(fingerprint=self.wallet_fingerprint)
435449
if not self.blockchain_state["sync"]["synced"] or not self.wallet_synced:
436450
self.log.warning("Waiting for wallet sync")
437451
await asyncio.sleep(60)
438452
continue
439453

440-
payment_targets = await self.pending_payments.get()
441-
assert len(payment_targets) > 0
454+
pending_payments = await self.store.get_pending_payment_records(self.max_additions_per_transaction)
455+
if len(pending_payments) == 0:
456+
self.log.info("No funds to pending payment records")
457+
await asyncio.sleep(60)
458+
continue
459+
self.log.info(f"Submitting a payment: {pending_payments}")
460+
461+
payment_targets: List[Dict] = []
462+
payment_records: List[Tuple[bytes32, uint64]] = []
442463

443-
self.log.info(f"Submitting a payment: {payment_targets}")
464+
for launcher_id, puzzle_hash, amount, _, timestamp, _, _, _ in pending_payments:
465+
payment_targets.append({"puzzle_hash": puzzle_hash, "amount": amount})
466+
payment_records.append((launcher_id, timestamp))
444467

445468
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
446469
# fee itself. Alternatively you can set it to 0 and wait longer
447470
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
448471
blockchain_fee: uint64 = uint64(0)
449472
try:
450-
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
451-
self.wallet_id, payment_targets, fee=blockchain_fee
452-
)
473+
async with self.store.tx():
474+
await self.store.update_is_payment(payment_records, auto_commit=False)
475+
476+
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
477+
self.wallet_id, payment_targets, fee=blockchain_fee
478+
)
453479
except ValueError as e:
454480
self.log.error(f"Error making payment: {e}")
455481
await asyncio.sleep(10)
456-
await self.pending_payments.put(payment_targets)
457482
continue
458483

484+
await self.store.update_transaction_id(payment_records, transaction_id=transaction.name)
459485
self.log.info(f"Transaction: {transaction}")
460486

461-
while (
462-
not transaction.confirmed
463-
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
464-
):
465-
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
487+
except asyncio.CancelledError:
488+
self.log.info("Cancelled submit_payment_loop, closing")
489+
return
490+
except Exception as e:
491+
self.log.error(f"Unexpected error in submit_payment_loop: {e}")
492+
await asyncio.sleep(60)
493+
494+
async def confirm_payment_loop(self):
495+
while True:
496+
try:
497+
confirming_payments = await self.store.get_confirming_payment_records()
498+
if len(confirming_payments) == 0:
499+
self.log.info("No funds to confirming payment records")
500+
await asyncio.sleep(60)
501+
continue
502+
self.log.info(f"Confirming a payment: {confirming_payments}")
503+
504+
for transaction_id in confirming_payments:
505+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction_id)
466506
peak_height = self.blockchain_state["peak"].height
467-
self.log.info(
468-
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
469-
)
470-
if not transaction.confirmed:
471-
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
472-
else:
473-
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
474-
await asyncio.sleep(10)
475507

476-
# TODO(pool): persist in DB
477-
self.log.info(f"Successfully confirmed payments {payment_targets}")
508+
while (
509+
not transaction.confirmed
510+
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
511+
):
512+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
513+
peak_height = self.blockchain_state["peak"].height
514+
self.log.info(
515+
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
516+
)
517+
if not transaction.confirmed:
518+
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
519+
else:
520+
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
521+
await asyncio.sleep(10)
522+
523+
await self.store.update_is_confirmed(transaction_id)
524+
self.log.info(f"Successfully confirmed payment {transaction_id}")
478525

479526
except asyncio.CancelledError:
480-
self.log.info("Cancelled submit_payment_loop, closing")
527+
self.log.info("Cancelled confirm_payment_loop, closing")
481528
return
482529
except Exception as e:
483-
# TODO(pool): retry transaction if failed
484-
self.log.error(f"Unexpected error in submit_payment_loop: {e}")
530+
self.log.error(f"Unexpected error in confirm_payment_loop: {e}")
485531
await asyncio.sleep(60)
486532

487533
async def confirm_partials_loop(self):

pool/store/abstract.py

+63-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def __init__(self):
2424
async def connect(self):
2525
"""Perform IO-related initialization"""
2626

27+
@abstractmethod
28+
async def tx(self):
29+
"""Performing Transactions for async with statement"""
30+
2731
@abstractmethod
2832
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
2933
"""Persist a new Farmer in the store"""
@@ -55,12 +59,15 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
5559
"""Fetch Farmers matching given puzzle hashes"""
5660

5761
@abstractmethod
58-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
62+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
5963
"""Fetch all farmers and their respective payout instructions"""
6064

6165
@abstractmethod
62-
async def clear_farmer_points(self) -> None:
63-
"""Rest all Farmers' points to 0"""
66+
async def clear_farmer_points(self, auto_commit: bool = True) -> None:
67+
"""
68+
Rest all Farmers' points to 0
69+
auto_commit decides whether to commit the transaction.
70+
"""
6471

6572
@abstractmethod
6673
async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty: uint64):
@@ -69,3 +76,56 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6976
@abstractmethod
7077
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7178
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
79+
80+
@abstractmethod
81+
async def add_payment(
82+
self,
83+
launcher_id: bytes32,
84+
puzzle_hash: bytes32,
85+
amount: uint64,
86+
points: int,
87+
timestamp: uint64,
88+
is_payment: bool,
89+
auto_commit: bool = True,
90+
):
91+
"""
92+
Persist a new payment record in the store
93+
auto_commit decides whether to commit the transaction.
94+
"""
95+
96+
@abstractmethod
97+
async def update_is_payment(
98+
self,
99+
launcher_id_and_timestamp: List[Tuple[bytes32, uint64]],
100+
auto_commit: bool = True,
101+
):
102+
"""
103+
Update is_payment is True for payment records identified by ``launcher_id`` and ``timestamp``.
104+
auto_commit decides whether to commit the transaction.
105+
"""
106+
107+
@abstractmethod
108+
async def get_pending_payment_records(self, count: int) -> List[
109+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool, bytes32, bool]
110+
]:
111+
"""Fetch ``count`` pending payment records"""
112+
113+
@abstractmethod
114+
async def get_pending_payment_count(self) -> int:
115+
"""Fetch pending payment records count"""
116+
117+
@abstractmethod
118+
async def get_confirming_payment_records(self) -> List[bytes32]:
119+
"""Fetch confirming payment records"""
120+
121+
@abstractmethod
122+
async def update_transaction_id(
123+
self,
124+
launcher_id_and_timestamp: List[Tuple[bytes32, uint64]],
125+
transaction_id: bytes32,
126+
):
127+
"""Update transaction_id for payment records identified by ``launcher_id`` and ``timestamp``."""
128+
129+
@abstractmethod
130+
async def update_is_confirmed(self, transaction_id: bytes32):
131+
"""Update is_confirmed is True for payment records identified by ``transaction_id``"""

0 commit comments

Comments
 (0)