Skip to content

Commit 5db628e

Browse files
authored
Fetch mempool transactions for full nodes (#12)
* add storage backends for mempool This allows unconfirmed transactions to be cached in an sqlite, mysql, or postgres database. Most recent transactions are cached in memory, so that memory usage remains low. * clean up cache class * add SQL transaction caching module * fix (sqlite) cache database connectivity * store temporary and finished mempool data in the database
1 parent 52e6af8 commit 5db628e

File tree

16 files changed

+669
-97
lines changed

16 files changed

+669
-97
lines changed

Diff for: .vscode/settings.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
"editor.formatOnSave": true
55
},
66
"editor.wordWrap": "off",
7-
"editor.rulers": [80, 120]
7+
"editor.rulers": [
8+
80,
9+
120
10+
]
811
}

Diff for: requirements.txt

+4
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,7 @@ aiohttp<4.0
1717
# Web3 6.14.0 fixes a bug that would make ZPyWallet unit tests fail to setup the testing environment.
1818
web3>=6.14.0
1919
websocket-client # automatically pulled by web3, but we also use this package directly.
20+
21+
# For mempool transaction caching inside full nodes.
22+
psycopg2
23+
mysql-connector-python

Diff for: zpywallet/address/btc/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
109109
self.rpc_url = kwargs.get("url")
110110
self.rpc_user = kwargs.get("user")
111111
self.rpc_password = kwargs.get("password")
112-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
113112
self.fast_mode = kwargs.get("fast_mode") or True
114113
self.min_height = kwargs.get("min_height") or 0
115114
self.transactions = []

Diff for: zpywallet/address/btctest/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
110110
self.rpc_url = kwargs.get("url")
111111
self.rpc_user = kwargs.get("user")
112112
self.rpc_password = kwargs.get("password")
113-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
114113
self.min_height = kwargs.get("min_height") or 0
115114
self.fast_mode = kwargs.get("fast_mode") or True
116115
self.transactions = []

Diff for: zpywallet/address/dash/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
102102
self.rpc_url = kwargs.get("url")
103103
self.rpc_user = kwargs.get("user")
104104
self.rpc_password = kwargs.get("password")
105-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
106105
self.min_height = kwargs.get("min_height") or 0
107106
self.fast_mode = kwargs.get("fast_mode") or True
108107
self.transactions = []

Diff for: zpywallet/address/dashtest/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
102102
self.rpc_url = kwargs.get("url")
103103
self.rpc_user = kwargs.get("user")
104104
self.rpc_password = kwargs.get("password")
105-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
106105
self.min_height = kwargs.get("min_height") or 0
107106
self.fast_mode = kwargs.get("fast_mode") or True
108107
self.transactions = []

Diff for: zpywallet/address/doge/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
102102
self.rpc_url = kwargs.get("url")
103103
self.rpc_user = kwargs.get("user")
104104
self.rpc_password = kwargs.get("password")
105-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
106105
self.min_height = kwargs.get("min_height") or 0
107106
self.fast_mode = kwargs.get("fast_mode") or True
108107
self.transactions = []

Diff for: zpywallet/address/dogetest/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
102102
self.rpc_url = kwargs.get("url")
103103
self.rpc_user = kwargs.get("user")
104104
self.rpc_password = kwargs.get("password")
105-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
106105
self.min_height = kwargs.get("min_height") or 0
107106
self.fast_mode = kwargs.get("fast_mode") or True
108107
self.transactions = []

Diff for: zpywallet/address/ltc/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
102102
self.rpc_url = kwargs.get("url")
103103
self.rpc_user = kwargs.get("user")
104104
self.rpc_password = kwargs.get("password")
105-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
106105
self.min_height = kwargs.get("min_height") or 0
107106
self.fast_mode = kwargs.get("fast_mode") or True
108107
self.transactions = []

Diff for: zpywallet/address/ltctest/fullnode.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def __init__(self, addresses, transactions=None, **kwargs):
102102
self.rpc_url = kwargs.get("url")
103103
self.rpc_user = kwargs.get("user")
104104
self.rpc_password = kwargs.get("password")
105-
self.max_tx_at_once = kwargs.get("max_tx_at_once") or 1000
106105
self.min_height = kwargs.get("min_height") or 0
107106
self.fast_mode = kwargs.get("fast_mode") or True
108107
self.transactions = []

Diff for: zpywallet/destination.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ def __init__(self, address, amount, network, fee_policy=FeePolicy.NONE):
2727
address (str): The destination address.
2828
amount (int): The amount of the destination in the lowest denomination.
2929
network: The network associated with the destination.
30-
fee_policy (FeePolicy, optional): The fee policy associated with the destination. Defaults to FeePolicy.NONE.
30+
fee_policy (FeePolicy, optional): The fee policy associated with the destination.
31+
Defaults to FeePolicy.NONE.
3132
"""
3233
self._network = network
3334
self._address = address
@@ -46,7 +47,8 @@ def amount(self, in_standard_units=True):
4647
Returns the amount of the destination.
4748
4849
Args:
49-
in_standard_units (bool, optional): If True, returns the amount in standard units. If False, returns the amount in the lowest denomination. Defaults to True.
50+
in_standard_units (bool, optional): If True, returns the amount in standard units.
51+
If False, returns the amount in the lowest denomination. Defaults to True.
5052
"""
5153
if not in_standard_units:
5254
if self._network.SUPPORTS_EVM:

Diff for: zpywallet/mempool/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__all__ = ["btc"]
1+
__all__ = ["btclike", "cache"]

Diff for: zpywallet/mempool/btc.py renamed to zpywallet/mempool/btclike.py

+101-81
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import json
2+
import multiprocessing
23
import random
34
import requests
45
from concurrent.futures import ThreadPoolExecutor
56

6-
from ..errors import NetworkException
7-
from ..generated import wallet_pb2
7+
8+
from zpywallet.errors import NetworkException
9+
from zpywallet.generated import wallet_pb2
10+
11+
from zpywallet.mempool.cache import SQLTransactionStorage, DatabaseError
812

913

1014
def transform_and_sort_transactions(data):
@@ -17,11 +21,12 @@ def transform_and_sort_transactions(data):
1721
return sorted_data
1822

1923

20-
class BitcoinMempool:
21-
"""Holds the transactions inside the mempool. There should only be one
22-
mempool class for each coin running in the entire application.
24+
class BTCLikeMempool:
25+
"""Holds the transactions of Bitcoin-like blockchains inside the mempool.
26+
There should only be one mempool class for each coin running in the entire
27+
application.
2328
24-
Note: The performance of this class heavily depends on the network speed and CPU
29+
The performance of this class heavily depends on the network speed and CPU
2530
speed of the node as well as the number of threads available, the size of the RPC
2631
batch work queue specified in the constructor, and the amount of transactions in
2732
megabytes you are trying to fetch at once.
@@ -59,8 +64,11 @@ def _clean_tx(self, element):
5964
# Of course this is an unconfirmed transaction.
6065
# Hence it is also not a coinbase transaction.
6166
new_element.confirmed = False
67+
new_element.txid = element["txid"]
6268

6369
for vin in element["vin"]:
70+
if "txid" not in vin.keys():
71+
continue
6472
txinput = new_element.btclike_transaction.inputs.add()
6573
txinput.txid = vin["txid"]
6674
txinput.index = vin["vout"]
@@ -82,23 +90,17 @@ def _clean_tx(self, element):
8290
new_element.btclike_transaction.fee = element["vsize"]
8391
return new_element
8492

85-
def _post_clean_tx(self, new_element):
86-
93+
def _post_clean_tx(self, new_element, sql_transaction_storage):
8794
for txinput in new_element.btclike_transaction.inputs:
88-
fine_rawtx = self.raw_txos.get(txinput.txid)
95+
fine_rawtx = sql_transaction_storage.get_rawtx(txinput.txid)
8996
if fine_rawtx is None:
9097
return None # maybe processing error.
91-
txinput.amount = int(fine_rawtx["vout"][txinput.index]["value"] * 1e8)
92-
if "address" in fine_rawtx["vout"][txinput.index]["scriptPubKey"].keys():
93-
txinput.address = fine_rawtx["vout"][txinput.index]["scriptPubKey"][
94-
"address"
95-
]
96-
elif (
97-
"addresses" in fine_rawtx["vout"][txinput.index]["scriptPubKey"].keys()
98-
):
99-
txinput.address = fine_rawtx["vout"][txinput.index]["scriptPubKey"][
100-
"addresses"
101-
][0]
98+
txinput.amount = fine_rawtx.btclike_transaction.outputs[
99+
txinput.index
100+
].amount
101+
txinput.address = fine_rawtx.btclike_transaction.outputs[
102+
txinput.index
103+
].address
102104

103105
# Now we must calculate the total fee
104106
total_inputs = sum([a.amount for a in new_element.btclike_transaction.inputs])
@@ -124,6 +126,7 @@ def __init__(self, **kwargs):
124126
self.future_blocks_min = kwargs.get("future_blocks_min") or 0
125127
self.future_blocks_max = kwargs.get("future_blocks_max") or 1
126128
self.full_transactions = kwargs.get("full_transactions") or True
129+
self.db_connection_parameters = kwargs.get("db_connection_parameters")
127130
self.transactions = []
128131
self.in_mempool = []
129132
self.txos = []
@@ -153,7 +156,7 @@ def _send_rpc_request(self, method, params=None):
153156
# Certain nodes which are placed behind web servers or Cloudflare will
154157
# configure rate limits and return some HTML error page if we go over that.
155158
# Zpywallet is not designed to handle such content so we check for it first.
156-
# If you are using the full node facilities, ou are recommended to connect
159+
# If you are using the full node facilities, you are recommended to connect
157160
# to your own node and not to a public one, for this reason.
158161
try:
159162
j = response.json()
@@ -179,7 +182,6 @@ def _send_batch_rpc_request(self, reqs):
179182
try:
180183
# Requests session is not needed for the full node but we can use it
181184
# for the other providers in the future.
182-
183185
response = requests.post(
184186
self.rpc_url,
185187
auth=(
@@ -223,56 +225,48 @@ def _get_block_height(self):
223225
except Exception as e:
224226
raise NetworkException(f"Failed to make RPC Call: {str(e)}")
225227

226-
def _get_raw_mempool(self):
227-
height = self._get_block_height()
228-
h1, h2, h3, h4, h5 = [
229-
a["result"]
230-
for a in self._send_batch_rpc_request(
231-
[
232-
("getblockhash", [height]),
233-
("getblockhash", [height - 1]),
234-
("getblockhash", [height - 2]),
235-
("getblockhash", [height - 3]),
236-
("getblockhash", [height - 4]),
237-
]
238-
)
239-
]
240-
res = self._send_batch_rpc_request(
241-
[
242-
("getblock", [h1, 1]),
243-
("getblock", [h2, 1]),
244-
("getblock", [h3, 1]),
245-
("getblock", [h4, 1]),
246-
("getblock", [h5, 1]),
247-
]
248-
)
249-
tx_counts = [len(r["result"]["tx"]) for r in res]
250-
avg_tx_count = sum(tx_counts) // len(tx_counts)
228+
# Internal methods are ran in a separate process which allows the OS
229+
# to properly garbage collect the memory, as Python leaves a large footprint
230+
# behind.
231+
def _internal_mempool_fetch(self):
251232
res = self._send_rpc_request("getrawmempool", [True])
252233
sorted_transactions = transform_and_sort_transactions(res)
253-
sorted_transactions = sorted_transactions[
254-
avg_tx_count
255-
* self.future_blocks_min : avg_tx_count
256-
* self.future_blocks_max
257-
]
258-
259-
transaction_batches = [
260-
sorted_transactions[i : i + self.max_batch]
261-
for i in range(0, len(sorted_transactions), self.max_batch)
262-
]
263-
264-
for transaction_batch in transaction_batches:
234+
return [tx["txid"] for tx in sorted_transactions]
265235

266-
txids = [tx["txid"] for tx in transaction_batch]
236+
def _internal_pass_1(self, transaction_batch):
237+
sql_transaction_storage = SQLTransactionStorage(self.db_connection_parameters)
238+
sql_transaction_storage.connect()
239+
try:
240+
# The first pass will be to delete the confirmed transactions inside the DB
241+
# if applicable.
242+
txids = transaction_batch
243+
all_txids = sql_transaction_storage.all_txids()
244+
for saved_txid in all_txids:
245+
if saved_txid not in txids:
246+
sql_transaction_storage.delete_transaction(saved_txid)
247+
248+
sql_transaction_storage.wipeout_reftxos()
249+
sql_transaction_storage.commit()
250+
251+
# The second pass will be to create a copy of the mempool transactions
252+
# without the ones we already have stored inside the list or DB.
253+
new_txids = list(set(txids) - set(all_txids))
254+
255+
new_txid_batches = [
256+
[t for t in new_txids[i : i + self.max_batch]]
257+
for i in range(0, len(new_txids), self.max_batch)
258+
]
259+
return new_txid_batches
267260

268-
# We are going to replace the mempool transactions with the ones
269-
# yielded by this function.
270-
# First we are going to yield the ones that are already stored
271-
# to save time.
272-
for tx in self.transactions:
273-
if tx.txid in txids:
274-
yield tx
261+
except Exception as e:
262+
sql_transaction_storage.rollback()
263+
raise e
275264

265+
def _internal_pass_2(self, transaction_batch):
266+
sql_transaction_storage = SQLTransactionStorage(self.db_connection_parameters)
267+
sql_transaction_storage.connect()
268+
try:
269+
txids = transaction_batch
276270
# Next we are going to yield new mempool transactions that we don't have
277271
# Confirmed mempool transactions are dropped by this method and the one above.
278272
txid_batches = [
@@ -296,8 +290,11 @@ def _get_raw_mempool(self):
296290
# resolving txins.
297291
if not self.full_transactions:
298292
for tx in temp_transactions:
299-
tx.total_fee = 0
300-
yield (tx)
293+
tx.total_fee = 0 # Because this is actually the (v)size
294+
sql_transaction_storage.store_transaction(tx)
295+
for i in range(len(tx.btclike_transaction.outputs)):
296+
sql_transaction_storage.store_txo0(tx, i)
297+
sql_transaction_storage.commit()
301298
return
302299

303300
# Otherwise we have to get all of the input txids in one swoop so we can
@@ -306,27 +303,48 @@ def _get_raw_mempool(self):
306303
self.txos[i : i + self.max_workers]
307304
for i in range(0, len(self.txos), self.max_workers)
308305
]
309-
310306
with ThreadPoolExecutor(max_workers=self.rps) as executor:
311307
futures = [
312308
executor.submit(self._postprocess_transaction, txes)
313309
for txes in txid_batches
314310
]
315-
self.raw_txos = {}
316311
for future in futures:
317-
self.raw_txos.update(future.result())
312+
results = future.result()
313+
for txid, result in results.items():
314+
try:
315+
sql_transaction_storage.store_txo1(txid, result)
316+
except DatabaseError:
317+
pass
318318

319319
self.txos = []
320-
new_transactions = []
321320
for i in range(len(temp_transactions) - 1, -1, -1):
322321
temp_transaction = temp_transactions[i]
323-
new_transactions.append(self._post_clean_tx(temp_transaction))
322+
new_transaction = self._post_clean_tx(
323+
temp_transaction, sql_transaction_storage
324+
)
325+
if new_transaction is not None:
326+
sql_transaction_storage.store_transaction(new_transaction)
327+
for j in range(len(new_transaction.btclike_transaction.inputs)):
328+
sql_transaction_storage.store_txo0(
329+
new_transaction, j, output=False
330+
)
331+
for j in range(len(new_transaction.btclike_transaction.outputs)):
332+
sql_transaction_storage.store_txo0(new_transaction, j)
324333
del temp_transactions[i]
334+
sql_transaction_storage.wipeout_reftxos()
335+
sql_transaction_storage.commit()
336+
except Exception as e:
337+
sql_transaction_storage.rollback()
338+
raise e
339+
340+
def _get_raw_mempool(self):
341+
with multiprocessing.Pool(1) as pool:
342+
transaction_batches = pool.apply(self._internal_mempool_fetch)
343+
344+
new_txid_batches = self._internal_pass_1(transaction_batches)
325345

326-
self.raw_txos = {}
327-
for tx in new_transactions:
328-
if tx is not None:
329-
yield tx
346+
for transaction_batch in new_txid_batches:
347+
self._internal_pass_2(transaction_batch)
330348

331349
def _postprocess_transaction(self, txes):
332350
res = self._send_batch_rpc_request(
@@ -336,7 +354,9 @@ def _postprocess_transaction(self, txes):
336354
input_transactions = {}
337355
for r in res:
338356
if type(r) is dict and "result" in r.keys():
339-
input_transactions[r["result"]["txid"]] = r["result"]
357+
input_transactions[r["result"]["txid"]] = self._clean_tx(
358+
r["result"]
359+
).SerializeToString()
340360

341361
return input_transactions
342362

@@ -364,5 +384,5 @@ def _process_transaction(self, txes, txids):
364384
return temp_transactions
365385

366386
def get_raw_mempool(self):
367-
self.transactions = [*self._get_raw_mempool()]
368-
return self.transactions
387+
self._get_raw_mempool()
388+
return []

0 commit comments

Comments
 (0)