Skip to content

Commit 0e59c97

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent e627c92 commit 0e59c97

File tree

3 files changed

+108
-17
lines changed

3 files changed

+108
-17
lines changed

pool/pool.py

+31-17
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,6 @@ def __init__(
155155
# faster.
156156
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
157157

158-
# This is the list of payments that we have not sent yet, to farmers
159-
self.pending_payments: Optional[asyncio.Queue] = None
160-
161158
# Keeps track of the latest state of our node
162159
self.blockchain_state = {"peak": None}
163160

@@ -207,8 +204,6 @@ async def start(self):
207204
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
208205
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
209206

210-
self.pending_payments = asyncio.Queue()
211-
212207
async def stop(self):
213208
if self.confirm_partials_loop_task is not None:
214209
self.confirm_partials_loop_task.cancel()
@@ -362,8 +357,8 @@ async def create_payment_loop(self):
362357
await asyncio.sleep(60)
363358
continue
364359

365-
if self.pending_payments.qsize() != 0:
366-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
360+
if (pending_payment_count := await self.store.get_payment_count_by_is_payment(False)) != 0:
361+
self.log.warning(f"Pending payments ({pending_payment_count}), waiting")
367362
await asyncio.sleep(60)
368363
continue
369364

@@ -408,14 +403,16 @@ async def create_payment_loop(self):
408403
if points > 0:
409404
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
410405

411-
if len(additions_sub_list) == self.max_additions_per_transaction:
412-
await self.pending_payments.put(additions_sub_list.copy())
413-
self.log.info(f"Will make payments: {additions_sub_list}")
414-
additions_sub_list = []
406+
for payment in additions_sub_list:
407+
await self.store.add_payment(
408+
payment["puzzle_hash"],
409+
uint64(payment["amount"]),
410+
uint64(int(time.time())),
411+
False,
412+
)
415413

416414
if len(additions_sub_list) > 0:
417415
self.log.info(f"Will make payments: {additions_sub_list}")
418-
await self.pending_payments.put(additions_sub_list.copy())
419416

420417
# Subtract the points from each farmer
421418
await self.store.clear_farmer_points()
@@ -441,23 +438,40 @@ async def submit_payment_loop(self):
441438
await asyncio.sleep(60)
442439
continue
443440

444-
payment_targets = await self.pending_payments.get()
445-
assert len(payment_targets) > 0
441+
payment_records = await self.store.get_payment_records_by_is_payment(
442+
False, self.max_additions_per_transaction
443+
)
444+
if len(payment_records) == 0:
445+
self.log.info("No funds to payment record")
446+
await asyncio.sleep(60)
447+
continue
446448

447-
self.log.info(f"Submitting a payment: {payment_targets}")
449+
payment_targets: List[Dict] = [
450+
{
451+
"puzzle_hash": puzzle_hash,
452+
"amount": amount,
453+
}
454+
for puzzle_hash, amount, _, _ in payment_records
455+
]
456+
self.log.info(f"Submitting a payment: {payment_records}")
448457

449458
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
450459
# fee itself. Alternatively you can set it to 0 and wait longer
451460
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
452461
blockchain_fee = 0
453462
try:
463+
await self.store.update_payments_is_payment(
464+
[(puzzle_hash, timestamp) for puzzle_hash, _, timestamp, _ in payment_records], True
465+
)
454466
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
455467
self.wallet_id, payment_targets, fee=blockchain_fee
456468
)
457-
except ValueError as e:
469+
except Exception as e:
458470
self.log.error(f"Error making payment: {e}")
471+
await self.store.update_payments_is_payment(
472+
[(puzzle_hash, timestamp) for puzzle_hash, _, timestamp, _ in payment_records], False
473+
)
459474
await asyncio.sleep(10)
460-
await self.pending_payments.put(payment_targets)
461475
continue
462476

463477
self.log.info(f"Transaction: {transaction}")

pool/store/abstract.py

+18
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,21 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6868
@abstractmethod
6969
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7070
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
71+
72+
@abstractmethod
73+
async def add_payment(self, puzzle_hash: bytes32, amount: uint64, timestamp: uint64, is_payment: bool):
74+
"""Persist a new payment record in the store"""
75+
76+
@abstractmethod
77+
async def update_payments_is_payment(self, puzzle_hash_timestamp: List[Tuple[bytes32, uint64]], is_payment: bool):
78+
"""Update payments is_payment"""
79+
80+
@abstractmethod
81+
async def get_payment_records_by_is_payment(self, is_payment: bool, count: int) -> List[
82+
Tuple[bytes32, uint64, uint64, bool]
83+
]:
84+
"""Fetch payments matching given is_payment"""
85+
86+
@abstractmethod
87+
async def get_payment_count_by_is_payment(self, is_payment: bool) -> int:
88+
"""Get payment count matching given is_payment"""

pool/store/sqlite_store.py

+59
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,21 @@ async def connect(self):
4848
"CREATE TABLE IF NOT EXISTS partial(launcher_id text, timestamp bigint, difficulty bigint)"
4949
)
5050

51+
await self.connection.execute(
52+
"""
53+
CREATE TABLE IF NOT EXISTS payment(
54+
puzzle_hash text,
55+
amount bigint,
56+
timestamp bigint,
57+
is_payment tinyint
58+
)
59+
"""
60+
)
61+
5162
await self.connection.execute("CREATE INDEX IF NOT EXISTS scan_ph on farmer(p2_singleton_puzzle_hash)")
5263
await self.connection.execute("CREATE INDEX IF NOT EXISTS timestamp_index on partial(timestamp)")
5364
await self.connection.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on partial(launcher_id)")
65+
await self.connection.execute("CREATE INDEX IF NOT EXISTS puzzle_hash_index on payment(puzzle_hash)")
5466

5567
await self.connection.commit()
5668

@@ -192,3 +204,50 @@ async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tu
192204
rows = await cursor.fetchall()
193205
ret: List[Tuple[uint64, uint64]] = [(uint64(timestamp), uint64(difficulty)) for timestamp, difficulty in rows]
194206
return ret
207+
208+
async def add_payment(self, puzzle_hash: bytes32, amount: uint64, timestamp: uint64, is_payment: bool):
209+
cursor = await self.connection.execute(
210+
"INSERT INTO payment VALUES(?, ?, ?, ?)",
211+
(puzzle_hash.hex(), amount, timestamp, int(is_payment)),
212+
)
213+
await cursor.close()
214+
await self.connection.commit()
215+
216+
async def update_payments_is_payment(self, puzzle_hash_timestamp: List[Tuple[bytes32, uint64]], is_payment: bool):
217+
cursor = await self.connection.executemany(
218+
"UPDATE payment SET is_payment=? WHERE puzzle_hash=? AND timestamp=?",
219+
tuple((int(is_payment), ph.hex(), timestamp) for ph, timestamp in puzzle_hash_timestamp),
220+
)
221+
await cursor.close()
222+
await self.connection.commit()
223+
224+
async def get_payment_records_by_is_payment(self, is_payment: bool, count: int) -> List[
225+
Tuple[bytes32, uint64, uint64, bool]
226+
]:
227+
cursor = await self.connection.execute(
228+
"""
229+
SELECT
230+
puzzle_hash, amount, timestamp, is_payment
231+
FROM payment
232+
WHERE is_payment=? ORDER BY timestamp ASC LIMIT ?
233+
""",
234+
(int(is_payment), count),
235+
)
236+
rows = await cursor.fetchall()
237+
await cursor.close()
238+
ret: List[
239+
Tuple[bytes32, uint64, uint64, bool]
240+
] = [
241+
(bytes32(bytes.fromhex(puzzle_hash)), uint64(amount), uint64(timestamp), bool(is_payment))
242+
for puzzle_hash, amount, timestamp, is_payment in rows
243+
]
244+
return ret
245+
246+
async def get_payment_count_by_is_payment(self, is_payment: bool) -> int:
247+
cursor = await self.connection.execute(
248+
"SELECT COUNT(*) FROM payment WHERE is_payment=?",
249+
(int(is_payment),),
250+
)
251+
count: int = (await cursor.fetchone())[0]
252+
await cursor.close()
253+
return count

0 commit comments

Comments
 (0)