Skip to content

Commit 254c450

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent e627c92 commit 254c450

File tree

3 files changed

+221
-56
lines changed

3 files changed

+221
-56
lines changed

pool/pool.py

+68-45
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,6 @@ def __init__(
155155
# faster.
156156
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
157157

158-
# This is the list of payments that we have not sent yet, to farmers
159-
self.pending_payments: Optional[asyncio.Queue] = None
160-
161158
# Keeps track of the latest state of our node
162159
self.blockchain_state = {"peak": None}
163160

@@ -207,8 +204,6 @@ async def start(self):
207204
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
208205
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
209206

210-
self.pending_payments = asyncio.Queue()
211-
212207
async def stop(self):
213208
if self.confirm_partials_loop_task is not None:
214209
self.confirm_partials_loop_task.cancel()
@@ -362,8 +357,8 @@ async def create_payment_loop(self):
362357
await asyncio.sleep(60)
363358
continue
364359

365-
if self.pending_payments.qsize() != 0:
366-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
360+
if (pending_payment_count := await self.store.get_pending_payment_count()) != 0:
361+
self.log.warning(f"Pending payments ({pending_payment_count}), waiting")
367362
await asyncio.sleep(60)
368363
continue
369364

@@ -391,34 +386,46 @@ async def create_payment_loop(self):
391386
self.log.info(f"Total amount to distribute: {amount_to_distribute / (10 ** 12)}")
392387

393388
async with self.store.lock:
394-
# Get the points of each farmer, as well as payout instructions. Here a chia address is used,
395-
# but other blockchain addresses can also be used.
396-
points_and_ph: List[
397-
Tuple[uint64, bytes]
398-
] = await self.store.get_farmer_points_and_payout_instructions()
399-
total_points = sum([pt for (pt, ph) in points_and_ph])
389+
# Get the launcher_id and points of each farmer, as well as payout instructions.
390+
# Here a chia address is used, but other blockchain addresses can also be used.
391+
launcher_id_and_points_and_ph: List[
392+
Tuple[bytes32, uint64, bytes32]
393+
] = await self.store.get_farmer_launcher_id_and_points_and_payout_instructions()
394+
total_points = sum([pt for (launcher_id, pt, ph) in launcher_id_and_points_and_ph])
400395
if total_points > 0:
401396
mojo_per_point = floor(amount_to_distribute / total_points)
402397
self.log.info(f"Paying out {mojo_per_point} mojo / point")
403398

399+
# Pool fee payment record launcher_id is equal to puzzle_hash, points is equal to 0.
404400
additions_sub_list: List[Dict] = [
405-
{"puzzle_hash": self.pool_fee_puzzle_hash, "amount": pool_coin_amount}
401+
{
402+
"launcher_id": self.pool_fee_puzzle_hash,
403+
"puzzle_hash": self.pool_fee_puzzle_hash,
404+
"amount": pool_coin_amount,
405+
"points": 0,
406+
}
406407
]
407-
for points, ph in points_and_ph:
408+
for launcher_id, points, ph in launcher_id_and_points_and_ph:
408409
if points > 0:
409-
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
410-
411-
if len(additions_sub_list) == self.max_additions_per_transaction:
412-
await self.pending_payments.put(additions_sub_list.copy())
413-
self.log.info(f"Will make payments: {additions_sub_list}")
414-
additions_sub_list = []
410+
additions_sub_list.append({
411+
"launcher_id": launcher_id,
412+
"puzzle_hash": ph,
413+
"amount": points * mojo_per_point,
414+
"points": points,
415+
})
416+
417+
for payment in additions_sub_list:
418+
await self.store.add_payment(
419+
payment["launcher_id"],
420+
payment["puzzle_hash"],
421+
uint64(payment["amount"]),
422+
payment["points"],
423+
uint64(int(time.time())),
424+
False,
425+
)
415426

416427
if len(additions_sub_list) > 0:
417428
self.log.info(f"Will make payments: {additions_sub_list}")
418-
await self.pending_payments.put(additions_sub_list.copy())
419-
420-
# Subtract the points from each farmer
421-
await self.store.clear_farmer_points()
422429
else:
423430
self.log.info(f"No points for any farmer. Waiting {self.payment_interval}")
424431

@@ -441,10 +448,20 @@ async def submit_payment_loop(self):
441448
await asyncio.sleep(60)
442449
continue
443450

444-
payment_targets = await self.pending_payments.get()
445-
assert len(payment_targets) > 0
451+
pending_payments = await self.store.get_pending_payment_records(self.max_additions_per_transaction)
452+
if len(pending_payments) == 0:
453+
self.log.info("No funds to pending payment records")
454+
await asyncio.sleep(60)
455+
continue
456+
self.log.info(f"Submitting a payment: {pending_payments}")
446457

447-
self.log.info(f"Submitting a payment: {payment_targets}")
458+
payment_targets: List[Dict] = [
459+
{
460+
"puzzle_hash": puzzle_hash,
461+
"amount": amount,
462+
}
463+
for _, puzzle_hash, amount, _, _, _ in pending_payments
464+
]
448465

449466
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
450467
# fee itself. Alternatively you can set it to 0 and wait longer
@@ -457,34 +474,40 @@ async def submit_payment_loop(self):
457474
except ValueError as e:
458475
self.log.error(f"Error making payment: {e}")
459476
await asyncio.sleep(10)
460-
await self.pending_payments.put(payment_targets)
461477
continue
462478

463479
self.log.info(f"Transaction: {transaction}")
464480

465-
while (
466-
not transaction.confirmed
467-
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
468-
):
469-
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
470-
peak_height = self.blockchain_state["peak"].height
471-
self.log.info(
472-
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
481+
async with self.store.tx():
482+
await self.store.update_is_payment(
483+
[
484+
(launcher_id, points, timestamp)
485+
for launcher_id, _, _, points, timestamp, _ in pending_payments
486+
],
487+
is_payment=True,
488+
auto_commit=False,
473489
)
474-
if not transaction.confirmed:
475-
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
476-
else:
477-
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
478-
await asyncio.sleep(10)
479490

480-
# TODO(pool): persist in DB
481-
self.log.info(f"Successfully confirmed payments {payment_targets}")
491+
while (
492+
not transaction.confirmed
493+
or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold
494+
):
495+
transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name)
496+
peak_height = self.blockchain_state["peak"].height
497+
self.log.info(
498+
f"Waiting for transaction to obtain {self.confirmation_security_threshold} confirmations"
499+
)
500+
if not transaction.confirmed:
501+
self.log.info(f"Not confirmed. In mempool? {transaction.is_in_mempool()}")
502+
else:
503+
self.log.info(f"Confirmations: {peak_height - transaction.confirmed_at_height}")
504+
await asyncio.sleep(10)
482505

506+
self.log.info(f"Successfully confirmed payments {pending_payments}")
483507
except asyncio.CancelledError:
484508
self.log.info("Cancelled submit_payment_loop, closing")
485509
return
486510
except Exception as e:
487-
# TODO(pool): retry transaction if failed
488511
self.log.error(f"Unexpected error in submit_payment_loop: {e}")
489512
await asyncio.sleep(60)
490513

pool/store/abstract.py

+40-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def __init__(self):
2323
async def connect(self):
2424
"""Perform IO-related initialization"""
2525

26+
@abstractmethod
27+
async def tx(self):
28+
"""Performing Transactions for async with statement"""
29+
2630
@abstractmethod
2731
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
2832
"""Persist a new Farmer in the store"""
@@ -54,7 +58,7 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
5458
"""Fetch Farmers matching given puzzle hashes"""
5559

5660
@abstractmethod
57-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
61+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
5862
"""Fetch all farmers and their respective payout instructions"""
5963

6064
@abstractmethod
@@ -68,3 +72,38 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6872
@abstractmethod
6973
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7074
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
75+
76+
@abstractmethod
77+
async def add_payment(
78+
self,
79+
launcher_id: bytes32,
80+
puzzle_hash: bytes32,
81+
amount: uint64,
82+
points: int,
83+
timestamp: uint64,
84+
is_payment: bool,
85+
):
86+
"""Persist a new payment record in the store"""
87+
88+
@abstractmethod
89+
async def update_is_payment(
90+
self,
91+
launcher_id_and_points_and_timestamp: List[Tuple[bytes32, uint64, uint64]],
92+
is_payment: bool,
93+
auto_commit: bool = True,
94+
):
95+
"""
96+
Update is_payment for payment records identified by ``launcher_id`` and ``timestamp``.
97+
When is_payment is True, the payouts is completed, subtract the points from farmer.
98+
auto_commit decides whether to commit the transaction.
99+
"""
100+
101+
@abstractmethod
102+
async def get_pending_payment_records(self, count: int) -> List[
103+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool]
104+
]:
105+
"""Fetch ``count`` pending payment records"""
106+
107+
@abstractmethod
108+
async def get_pending_payment_count(self) -> int:
109+
"""Fetch pending payment records count"""

pool/store/sqlite_store.py

+113-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import asynccontextmanager
12
from pathlib import Path
23
from typing import Optional, Set, List, Tuple, Dict
34

@@ -48,9 +49,23 @@ async def connect(self):
4849
"CREATE TABLE IF NOT EXISTS partial(launcher_id text, timestamp bigint, difficulty bigint)"
4950
)
5051

52+
await self.connection.execute(
53+
"""
54+
CREATE TABLE IF NOT EXISTS payment(
55+
launcher_id text,
56+
puzzle_hash text,
57+
amount bigint,
58+
points bigint,
59+
timestamp bigint,
60+
is_payment tinyint
61+
)
62+
"""
63+
)
64+
5165
await self.connection.execute("CREATE INDEX IF NOT EXISTS scan_ph on farmer(p2_singleton_puzzle_hash)")
5266
await self.connection.execute("CREATE INDEX IF NOT EXISTS timestamp_index on partial(timestamp)")
5367
await self.connection.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on partial(launcher_id)")
68+
await self.connection.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on payment(launcher_id)")
5469

5570
await self.connection.commit()
5671

@@ -70,6 +85,15 @@ def _row_to_farmer_record(row) -> FarmerRecord:
7085
True if row[10] == 1 else False,
7186
)
7287

88+
@asynccontextmanager
89+
async def tx(self):
90+
try:
91+
yield
92+
await self.connection.commit()
93+
except Exception:
94+
await self.connection.rollback()
95+
raise
96+
7397
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
7498
cursor = await self.connection.execute(
7599
f"INSERT OR REPLACE INTO farmer VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
@@ -146,21 +170,22 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
146170
rows = await cursor.fetchall()
147171
return [self._row_to_farmer_record(row) for row in rows]
148172

149-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
150-
cursor = await self.connection.execute(f"SELECT points, payout_instructions from farmer")
173+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
174+
cursor = await self.connection.execute(f"SELECT launcher_id, points, payout_instructions from farmer")
151175
rows = await cursor.fetchall()
152-
accumulated: Dict[bytes32, uint64] = {}
176+
accumulated: Dict[Tuple[bytes32, bytes32], uint64] = {}
153177
for row in rows:
154-
points: uint64 = uint64(row[0])
155-
ph: bytes32 = bytes32(bytes.fromhex(row[1]))
178+
launcher_id: bytes32 = bytes32(bytes.fromhex(row[0]))
179+
points: uint64 = uint64(row[1])
180+
ph: bytes32 = bytes32(bytes.fromhex(row[2]))
156181
if ph in accumulated:
157-
accumulated[ph] += points
182+
accumulated[(launcher_id, ph)] += points
158183
else:
159-
accumulated[ph] = points
184+
accumulated[(launcher_id, ph)] = points
160185

161-
ret: List[Tuple[uint64, bytes32]] = []
162-
for ph, total_points in accumulated.items():
163-
ret.append((total_points, ph))
186+
ret: List[Tuple[bytes32, uint64, bytes32]] = []
187+
for (launcher_id, ph), total_points in accumulated.items():
188+
ret.append((launcher_id, total_points, ph))
164189
return ret
165190

166191
async def clear_farmer_points(self) -> None:
@@ -192,3 +217,81 @@ async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tu
192217
rows = await cursor.fetchall()
193218
ret: List[Tuple[uint64, uint64]] = [(uint64(timestamp), uint64(difficulty)) for timestamp, difficulty in rows]
194219
return ret
220+
221+
async def add_payment(
222+
self,
223+
launcher_id: bytes32,
224+
puzzle_hash: bytes32,
225+
amount: uint64,
226+
points: int,
227+
timestamp: uint64,
228+
is_payment: bool,
229+
):
230+
cursor = await self.connection.execute(
231+
"INSERT INTO payment VALUES(?, ?, ?, ?, ?, ?)",
232+
(launcher_id.hex(), puzzle_hash.hex(), amount, points, timestamp, int(is_payment)),
233+
)
234+
await cursor.close()
235+
await self.connection.commit()
236+
237+
async def update_is_payment(
238+
self,
239+
launcher_id_and_points_and_timestamp: List[Tuple[bytes32, uint64, uint64]],
240+
is_payment: bool,
241+
auto_commit: bool = True,
242+
):
243+
cursor = await self.connection.executemany(
244+
"UPDATE payment SET is_payment=? WHERE launcher_id=? AND timestamp=?",
245+
tuple(
246+
(int(is_payment), launcher_id.hex(), timestamp)
247+
for launcher_id, _, timestamp in launcher_id_and_points_and_timestamp
248+
),
249+
)
250+
await cursor.close()
251+
252+
if is_payment is True:
253+
cursor = await self.connection.executemany(
254+
"UPDATE farmer SET points=points-? WHERE launcher_id=?",
255+
tuple(
256+
(points, launcher_id.hex())
257+
for launcher_id, points, _ in launcher_id_and_points_and_timestamp
258+
),
259+
)
260+
await cursor.close()
261+
262+
if auto_commit is True:
263+
await self.connection.commit()
264+
265+
async def get_pending_payment_records(self, count: int) -> List[
266+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool]
267+
]:
268+
cursor = await self.connection.execute(
269+
"""
270+
SELECT
271+
launcher_id, puzzle_hash, amount, points, timestamp, is_payment
272+
FROM payment
273+
WHERE is_payment=0 ORDER BY timestamp ASC LIMIT ?
274+
""",
275+
(count,),
276+
)
277+
rows = await cursor.fetchall()
278+
await cursor.close()
279+
ret: List[
280+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool]
281+
] = [
282+
(
283+
bytes32(bytes.fromhex(launcher_id)),
284+
bytes32(bytes.fromhex(puzzle_hash)),
285+
uint64(amount),
286+
uint64(points),
287+
uint64(timestamp),
288+
bool(is_payment),
289+
) for launcher_id, puzzle_hash, amount, points, timestamp, is_payment in rows
290+
]
291+
return ret
292+
293+
async def get_pending_payment_count(self) -> int:
294+
cursor = await self.connection.execute("SELECT COUNT(*) FROM payment WHERE is_payment=0")
295+
count: int = (await cursor.fetchone())[0]
296+
await cursor.close()
297+
return count

0 commit comments

Comments
 (0)