Skip to content

Commit 346b6cf

Browse files
pool: Improve transaction (Chia-Network#201)
1 parent e627c92 commit 346b6cf

File tree

3 files changed

+99
-17
lines changed

3 files changed

+99
-17
lines changed

pool/pool.py

+29-17
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,6 @@ def __init__(
155155
# faster.
156156
self.max_additions_per_transaction = pool_config["max_additions_per_transaction"]
157157

158-
# This is the list of payments that we have not sent yet, to farmers
159-
self.pending_payments: Optional[asyncio.Queue] = None
160-
161158
# Keeps track of the latest state of our node
162159
self.blockchain_state = {"peak": None}
163160

@@ -207,8 +204,6 @@ async def start(self):
207204
self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop())
208205
self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop())
209206

210-
self.pending_payments = asyncio.Queue()
211-
212207
async def stop(self):
213208
if self.confirm_partials_loop_task is not None:
214209
self.confirm_partials_loop_task.cancel()
@@ -362,8 +357,8 @@ async def create_payment_loop(self):
362357
await asyncio.sleep(60)
363358
continue
364359

365-
if self.pending_payments.qsize() != 0:
366-
self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting")
360+
if (pending_payments := await self.store.get_pending_payment_count()) != 0:
361+
self.log.warning(f"Pending payments ({pending_payments}), waiting")
367362
await asyncio.sleep(60)
368363
continue
369364

@@ -408,14 +403,16 @@ async def create_payment_loop(self):
408403
if points > 0:
409404
additions_sub_list.append({"puzzle_hash": ph, "amount": points * mojo_per_point})
410405

411-
if len(additions_sub_list) == self.max_additions_per_transaction:
412-
await self.pending_payments.put(additions_sub_list.copy())
413-
self.log.info(f"Will make payments: {additions_sub_list}")
414-
additions_sub_list = []
406+
for payment in additions_sub_list:
407+
await self.store.add_payment(
408+
payment["puzzle_hash"],
409+
uint64(payment["amount"]),
410+
uint64(int(time.time())),
411+
False,
412+
)
415413

416414
if len(additions_sub_list) > 0:
417415
self.log.info(f"Will make payments: {additions_sub_list}")
418-
await self.pending_payments.put(additions_sub_list.copy())
419416

420417
# Subtract the points from each farmer
421418
await self.store.clear_farmer_points()
@@ -441,23 +438,38 @@ async def submit_payment_loop(self):
441438
await asyncio.sleep(60)
442439
continue
443440

444-
payment_targets = await self.pending_payments.get()
445-
assert len(payment_targets) > 0
441+
pending_payments = await self.store.get_pending_payment_records(self.max_additions_per_transaction)
442+
if len(pending_payments) == 0:
443+
self.log.info("No funds to pending payments record")
444+
await asyncio.sleep(60)
445+
continue
446+
self.log.info(f"Submitting a payment: {pending_payments}")
446447

447-
self.log.info(f"Submitting a payment: {payment_targets}")
448+
payment_targets: List[Dict] = [
449+
{
450+
"puzzle_hash": puzzle_hash,
451+
"amount": amount,
452+
}
453+
for puzzle_hash, amount, _, _ in pending_payments
454+
]
448455

449456
# TODO(pool): make sure you have enough to pay the blockchain fee, this will be taken out of the pool
450457
# fee itself. Alternatively you can set it to 0 and wait longer
451458
# blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets)
452459
blockchain_fee = 0
453460
try:
461+
await self.store.update_is_payment(
462+
[(puzzle_hash, timestamp) for puzzle_hash, _, timestamp, _ in pending_payments], True
463+
)
454464
transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi(
455465
self.wallet_id, payment_targets, fee=blockchain_fee
456466
)
457-
except ValueError as e:
467+
except Exception as e:
458468
self.log.error(f"Error making payment: {e}")
469+
await self.store.update_is_payment(
470+
[(puzzle_hash, timestamp) for puzzle_hash, _, timestamp, _ in pending_payments], False
471+
)
459472
await asyncio.sleep(10)
460-
await self.pending_payments.put(payment_targets)
461473
continue
462474

463475
self.log.info(f"Transaction: {transaction}")

pool/store/abstract.py

+16
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,19 @@ async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty:
6868
@abstractmethod
6969
async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]:
7070
"""Fetch last ``count`` partials for Farmer identified by ``launcher_id``"""
71+
72+
@abstractmethod
73+
async def add_payment(self, puzzle_hash: bytes32, amount: uint64, timestamp: uint64, is_payment: bool):
74+
"""Persist a new payment record in the store"""
75+
76+
@abstractmethod
77+
async def update_is_payment(self, puzzle_hash_timestamp: List[Tuple[bytes32, uint64]], is_payment: bool):
78+
"""Update is_payment for payment records identified by ``puzzle_hash`` and ``timestamp``"""
79+
80+
@abstractmethod
81+
async def get_pending_payment_records(self, count: int) -> List[Tuple[bytes32, uint64, uint64, bool]]:
82+
"""Fetch ``count`` pending payment records"""
83+
84+
@abstractmethod
85+
async def get_pending_payment_count(self) -> int:
86+
"""Fetch pending payment records count"""

pool/store/sqlite_store.py

+54
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,21 @@ async def connect(self):
4848
"CREATE TABLE IF NOT EXISTS partial(launcher_id text, timestamp bigint, difficulty bigint)"
4949
)
5050

51+
await self.connection.execute(
52+
"""
53+
CREATE TABLE IF NOT EXISTS payment(
54+
puzzle_hash text,
55+
amount bigint,
56+
timestamp bigint,
57+
is_payment tinyint
58+
)
59+
"""
60+
)
61+
5162
await self.connection.execute("CREATE INDEX IF NOT EXISTS scan_ph on farmer(p2_singleton_puzzle_hash)")
5263
await self.connection.execute("CREATE INDEX IF NOT EXISTS timestamp_index on partial(timestamp)")
5364
await self.connection.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on partial(launcher_id)")
65+
await self.connection.execute("CREATE INDEX IF NOT EXISTS puzzle_hash_index on payment(puzzle_hash)")
5466

5567
await self.connection.commit()
5668

@@ -192,3 +204,45 @@ async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tu
192204
rows = await cursor.fetchall()
193205
ret: List[Tuple[uint64, uint64]] = [(uint64(timestamp), uint64(difficulty)) for timestamp, difficulty in rows]
194206
return ret
207+
208+
async def add_payment(self, puzzle_hash: bytes32, amount: uint64, timestamp: uint64, is_payment: bool):
209+
cursor = await self.connection.execute(
210+
"INSERT INTO payment VALUES(?, ?, ?, ?)",
211+
(puzzle_hash.hex(), amount, timestamp, int(is_payment)),
212+
)
213+
await cursor.close()
214+
await self.connection.commit()
215+
216+
async def update_is_payment(self, puzzle_hash_timestamp: List[Tuple[bytes32, uint64]], is_payment: bool):
217+
cursor = await self.connection.executemany(
218+
"UPDATE payment SET is_payment=? WHERE puzzle_hash=? AND timestamp=?",
219+
tuple((int(is_payment), ph.hex(), timestamp) for ph, timestamp in puzzle_hash_timestamp),
220+
)
221+
await cursor.close()
222+
await self.connection.commit()
223+
224+
async def get_pending_payment_records(self, count: int) -> List[Tuple[bytes32, uint64, uint64, bool]]:
225+
cursor = await self.connection.execute(
226+
"""
227+
SELECT
228+
puzzle_hash, amount, timestamp, is_payment
229+
FROM payment
230+
WHERE is_payment=0 ORDER BY timestamp ASC LIMIT ?
231+
""",
232+
(count,),
233+
)
234+
rows = await cursor.fetchall()
235+
await cursor.close()
236+
ret: List[
237+
Tuple[bytes32, uint64, uint64, bool]
238+
] = [
239+
(bytes32(bytes.fromhex(puzzle_hash)), uint64(amount), uint64(timestamp), bool(is_payment))
240+
for puzzle_hash, amount, timestamp, is_payment in rows
241+
]
242+
return ret
243+
244+
async def get_pending_payment_count(self) -> int:
245+
cursor = await self.connection.execute("SELECT COUNT(*) FROM payment WHERE is_payment=0")
246+
count: int = (await cursor.fetchone())[0]
247+
await cursor.close()
248+
return count

0 commit comments

Comments
 (0)