Skip to content

Commit 710a8c9

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent 32907fb commit 710a8c9

File tree

3 files changed

+212
-44
lines changed

3 files changed

+212
-44
lines changed

pool/pool.py

+59-33
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ def __init__(
149149
# faster.
150150
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
151151

152-
# This is the list of payments that we have not sent yet, to farmers
153-
self.pending_payments: Optional[asyncio.Queue] = None
154-
155152
# Keeps track of the latest state of our node
156153
self.blockchain_state = {"peak": None}
157154

@@ -201,8 +198,6 @@ async def start(self):
201198
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
202199
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
203200

204-
self.pending_payments = asyncio.Queue()
205-
206201
async def stop(self):
207202
if self.confirm_partials_loop_task is not None:
208203
self.confirm_partials_loop_task.cancel()
@@ -356,8 +351,8 @@ async def create_payment_loop(self):
356351
await asyncio.sleep(60)
357352
continue
358353

359-
if self.pending_payments.qsize() != 0:
360-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
354+
if (pending_payment_count := await self.store.get_pending_payment_count()) != 0:
355+
self.log.warning(f"Pending payments ({pending_payment_count}), waiting")
361356
await asyncio.sleep(60)
362357
continue
363358

@@ -387,34 +382,46 @@ async def create_payment_loop(self):
387382
self.log.info(f"Total amount to distribute: {amount_to_distribute / (10 ** 12)}")
388383

389384
async with self.store.lock:
390-
# Get the points of each farmer, as well as payout instructions. Here a chia address is used,
391-
# but other blockchain addresses can also be used.
392-
points_and_ph: List[
393-
Tuple[uint64, bytes]
394-
] = await self.store.get_farmer_points_and_payout_instructions()
395-
total_points = sum([pt for (pt, ph) in points_and_ph])
385+
# Get the launcher_id and points of each farmer, as well as payout instructions.
386+
# Here a chia address is used, but other blockchain addresses can also be used.
387+
launcher_id_and_points_and_ph: List[
388+
Tuple[bytes32, uint64, bytes32]
389+
] = await self.store.get_farmer_launcher_id_and_points_and_payout_instructions()
390+
total_points = sum([pt for (launcher_id, pt, ph) in launcher_id_and_points_and_ph])
396391
if total_points > 0:
397392
mojo_per_point = floor(amount_to_distribute / total_points)
398393
self.log.info(f"Paying out {mojo_per_point} mojo / point")
399394

395+
# Pool fee payment record launcher_id is equal to puzzle_hash, points is equal to 0.
400396
additions_sub_list: List[Dict] = [
401-
{"puzzle_hash": self.pool_fee_puzzle_hash, "amount": pool_coin_amount}
397+
{
398+
"launcher_id": self.pool_fee_puzzle_hash,
399+
"puzzle_hash": self.pool_fee_puzzle_hash,
400+
"amount": pool_coin_amount,
401+
"points": 0,
402+
}
402403
]
403-
for points, ph in points_and_ph:
404+
for launcher_id, points, ph in launcher_id_and_points_and_ph:
404405
if points > 0:
405-
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
406-
407-
if len(additions_sub_list) == self.max_additions_per_transaction:
408-
await self.pending_payments.put(additions_sub_list.copy())
409-
self.log.info(f"Will make payments: {additions_sub_list}")
410-
additions_sub_list = []
406+
additions_sub_list.append({
407+
"launcher_id": launcher_id,
408+
"puzzle_hash": ph,
409+
"amount": points * mojo_per_point,
410+
"points": points,
411+
})
412+
413+
for payment in additions_sub_list:
414+
await self.store.add_payment(
415+
payment["launcher_id"],
416+
payment["puzzle_hash"],
417+
uint64(payment["amount"]),
418+
payment["points"],
419+
uint64(int(time.time())),
420+
False,
421+
)
411422

412423
if len(additions_sub_list) > 0:
413424
self.log.info(f"Will make payments: {additions_sub_list}")
414-
await self.pending_payments.put(additions_sub_list.copy())
415-
416-
# Subtract the points from each farmer
417-
await self.store.clear_farmer_points()
418425
else:
419426
self.log.info(f"No points for any farmer. Waiting {self.payment_interval}")
420427

@@ -437,23 +444,42 @@ async def submit_payment_loop(self):
437444
await asyncio.sleep(60)
438445
continue
439446

440-
payment_targets = await self.pending_payments.get()
441-
assert len(payment_targets) > 0
447+
pending_payments = await self.store.get_pending_payment_records(self.max_additions_per_transaction)
448+
if len(pending_payments) == 0:
449+
self.log.info("No funds to pending payment records")
450+
await asyncio.sleep(60)
451+
continue
452+
self.log.info(f"Submitting a payment: {pending_payments}")
442453

443-
self.log.info(f"Submitting a payment: {payment_targets}")
454+
payment_targets: List[Dict] = [
455+
{
456+
"puzzle_hash": puzzle_hash,
457+
"amount": amount,
458+
}
459+
for _, puzzle_hash, amount, _, _, _ in pending_payments
460+
]
444461

445462
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
446463
# fee itself. Alternatively you can set it to 0 and wait longer
447464
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
448465
blockchain_fee: uint64 = uint64(0)
449466
try:
450-
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
451-
self.wallet_id, payment_targets, fee=blockchain_fee
452-
)
467+
async with self.store.tx():
468+
await self.store.update_is_payment(
469+
[
470+
(launcher_id, points, timestamp)
471+
for launcher_id, _, _, points, timestamp, _ in pending_payments
472+
],
473+
is_payment=True,
474+
auto_commit=False,
475+
)
476+
477+
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
478+
self.wallet_id, payment_targets, fee=blockchain_fee
479+
)
453480
except ValueError as e:
454481
self.log.error(f"Error making payment: {e}")
455482
await asyncio.sleep(10)
456-
await self.pending_payments.put(payment_targets)
457483
continue
458484

459485
self.log.info(f"Transaction: {transaction}")
@@ -474,7 +500,7 @@ async def submit_payment_loop(self):
474500
await asyncio.sleep(10)
475501

476502
# TODO(pool): persist in DB
477-
self.log.info(f"Successfully confirmed payments {payment_targets}")
503+
self.log.info(f"Successfully confirmed payments {pending_payments}")
478504

479505
except asyncio.CancelledError:
480506
self.log.info("Cancelled submit_payment_loop, closing")

pool/store/abstract.py

+40-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def __init__(self):
2424
async def connect(self):
2525
"""Perform IO-related initialization"""
2626

27+
@abstractmethod
28+
async def tx(self):
29+
"""Performing Transactions for async with statement"""
30+
2731
@abstractmethod
2832
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
2933
"""Persist a new Farmer in the store"""
@@ -55,7 +59,7 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
5559
"""Fetch Farmers matching given puzzle hashes"""
5660

5761
@abstractmethod
58-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
62+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
5963
"""Fetch all farmers and their respective payout instructions"""
6064

6165
@abstractmethod
@@ -69,3 +73,38 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6973
@abstractmethod
7074
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7175
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
76+
77+
@abstractmethod
78+
async def add_payment(
79+
self,
80+
launcher_id: bytes32,
81+
puzzle_hash: bytes32,
82+
amount: uint64,
83+
points: int,
84+
timestamp: uint64,
85+
is_payment: bool,
86+
):
87+
"""Persist a new payment record in the store"""
88+
89+
@abstractmethod
90+
async def update_is_payment(
91+
self,
92+
launcher_id_and_points_and_timestamp: List[Tuple[bytes32, uint64, uint64]],
93+
is_payment: bool,
94+
auto_commit: bool = True,
95+
):
96+
"""
97+
Update is_payment for payment records identified by ``launcher_id`` and ``timestamp``.
98+
When is_payment is True, the payouts is completed, subtract the points from farmer.
99+
auto_commit decides whether to commit the transaction.
100+
"""
101+
102+
@abstractmethod
103+
async def get_pending_payment_records(self, count: int) -> List[
104+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool]
105+
]:
106+
"""Fetch ``count`` pending payment records"""
107+
108+
@abstractmethod
109+
async def get_pending_payment_count(self) -> int:
110+
"""Fetch pending payment records count"""

pool/store/sqlite_store.py

+113-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import asynccontextmanager
12
from pathlib import Path
23
from typing import Optional, Set, List, Tuple, Dict
34

@@ -48,9 +49,23 @@ async def connect(self):
4849
"CREATE TABLE IF NOT EXISTS partial(launcher_id text, timestamp bigint, difficulty bigint)"
4950
)
5051

52+
await self.connection.execute(
53+
"""
54+
CREATE TABLE IF NOT EXISTS payment(
55+
launcher_id text,
56+
puzzle_hash text,
57+
amount bigint,
58+
points bigint,
59+
timestamp bigint,
60+
is_payment tinyint
61+
)
62+
"""
63+
)
64+
5165
await self.connection.execute("CREATE INDEX IF NOT EXISTS scan_ph on farmer(p2_singleton_puzzle_hash)")
5266
await self.connection.execute("CREATE INDEX IF NOT EXISTS timestamp_index on partial(timestamp)")
5367
await self.connection.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on partial(launcher_id)")
68+
await self.connection.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on payment(launcher_id)")
5469

5570
await self.connection.commit()
5671

@@ -70,6 +85,15 @@ def _row_to_farmer_record(row) -> FarmerRecord:
7085
True if row[10] == 1 else False,
7186
)
7287

88+
@asynccontextmanager
89+
async def tx(self):
90+
try:
91+
yield
92+
await self.connection.commit()
93+
except Exception:
94+
await self.connection.rollback()
95+
raise
96+
7397
async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata):
7498
cursor = await self.connection.execute(
7599
f"INSERT OR REPLACE INTO farmer VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
@@ -146,21 +170,22 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes
146170
rows = await cursor.fetchall()
147171
return [self._row_to_farmer_record(row) for row in rows]
148172

149-
async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]:
150-
cursor = await self.connection.execute(f"SELECT points, payout_instructions from farmer")
173+
async def get_farmer_launcher_id_and_points_and_payout_instructions(self) -> List[Tuple[bytes32, uint64, bytes32]]:
174+
cursor = await self.connection.execute(f"SELECT launcher_id, points, payout_instructions from farmer")
151175
rows = await cursor.fetchall()
152-
accumulated: Dict[bytes32, uint64] = {}
176+
accumulated: Dict[Tuple[bytes32, bytes32], uint64] = {}
153177
for row in rows:
154-
points: uint64 = uint64(row[0])
155-
ph: bytes32 = bytes32(bytes.fromhex(row[1]))
178+
launcher_id: bytes32 = bytes32(bytes.fromhex(row[0]))
179+
points: uint64 = uint64(row[1])
180+
ph: bytes32 = bytes32(bytes.fromhex(row[2]))
156181
if ph in accumulated:
157-
accumulated[ph] += points
182+
accumulated[(launcher_id, ph)] += points
158183
else:
159-
accumulated[ph] = points
184+
accumulated[(launcher_id, ph)] = points
160185

161-
ret: List[Tuple[uint64, bytes32]] = []
162-
for ph, total_points in accumulated.items():
163-
ret.append((total_points, ph))
186+
ret: List[Tuple[bytes32, uint64, bytes32]] = []
187+
for (launcher_id, ph), total_points in accumulated.items():
188+
ret.append((launcher_id, total_points, ph))
164189
return ret
165190

166191
async def clear_farmer_points(self) -> None:
@@ -192,3 +217,81 @@ async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tu
192217
rows = await cursor.fetchall()
193218
ret: List[Tuple[uint64, uint64]] = [(uint64(timestamp), uint64(difficulty)) for timestamp, difficulty in rows]
194219
return ret
220+
221+
async def add_payment(
222+
self,
223+
launcher_id: bytes32,
224+
puzzle_hash: bytes32,
225+
amount: uint64,
226+
points: int,
227+
timestamp: uint64,
228+
is_payment: bool,
229+
):
230+
cursor = await self.connection.execute(
231+
"INSERT INTO payment VALUES(?, ?, ?, ?, ?, ?)",
232+
(launcher_id.hex(), puzzle_hash.hex(), amount, points, timestamp, int(is_payment)),
233+
)
234+
await cursor.close()
235+
await self.connection.commit()
236+
237+
async def update_is_payment(
238+
self,
239+
launcher_id_and_points_and_timestamp: List[Tuple[bytes32, uint64, uint64]],
240+
is_payment: bool,
241+
auto_commit: bool = True,
242+
):
243+
cursor = await self.connection.executemany(
244+
"UPDATE payment SET is_payment=? WHERE launcher_id=? AND timestamp=?",
245+
tuple(
246+
(int(is_payment), launcher_id.hex(), timestamp)
247+
for launcher_id, _, timestamp in launcher_id_and_points_and_timestamp
248+
),
249+
)
250+
await cursor.close()
251+
252+
if is_payment is True:
253+
cursor = await self.connection.executemany(
254+
"UPDATE farmer SET points=points-? WHERE launcher_id=?",
255+
tuple(
256+
(points, launcher_id.hex())
257+
for launcher_id, points, _ in launcher_id_and_points_and_timestamp
258+
),
259+
)
260+
await cursor.close()
261+
262+
if auto_commit is True:
263+
await self.connection.commit()
264+
265+
async def get_pending_payment_records(self, count: int) -> List[
266+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool]
267+
]:
268+
cursor = await self.connection.execute(
269+
"""
270+
SELECT
271+
launcher_id, puzzle_hash, amount, points, timestamp, is_payment
272+
FROM payment
273+
WHERE is_payment=0 ORDER BY timestamp ASC LIMIT ?
274+
""",
275+
(count,),
276+
)
277+
rows = await cursor.fetchall()
278+
await cursor.close()
279+
ret: List[
280+
Tuple[bytes32, bytes32, uint64, uint64, uint64, bool]
281+
] = [
282+
(
283+
bytes32(bytes.fromhex(launcher_id)),
284+
bytes32(bytes.fromhex(puzzle_hash)),
285+
uint64(amount),
286+
uint64(points),
287+
uint64(timestamp),
288+
bool(is_payment),
289+
) for launcher_id, puzzle_hash, amount, points, timestamp, is_payment in rows
290+
]
291+
return ret
292+
293+
async def get_pending_payment_count(self) -> int:
294+
cursor = await self.connection.execute("SELECT COUNT(*) FROM payment WHERE is_payment=0")
295+
count: int = (await cursor.fetchone())[0]
296+
await cursor.close()
297+
return count

0 commit comments

Comments
 (0)