Skip to content

Commit 650caa0

Browse files
authored
Merge pull request #2946 from tnull/2024-03-txsync-readd-reorged-output-spends
Tx-Sync: Track spent `WatchedOutput`s and re-add if unconfirmed
2 parents 9ca2280 + b71c6e2 commit 650caa0

File tree

3 files changed

+68
-11
lines changed

3 files changed

+68
-11
lines changed

lightning-transaction-sync/src/common.rs

+26-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use lightning::chain::{Confirm, WatchedOutput};
2+
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
23
use bitcoin::{Txid, BlockHash, Transaction, OutPoint};
34
use bitcoin::block::Header;
45

@@ -13,6 +14,9 @@ pub(crate) struct SyncState {
1314
// Outputs that were previously processed, but must not be forgotten yet as
1415
// as we still need to monitor any spends on-chain.
1516
pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
17+
// Outputs for which we previously saw a spend on-chain but kept around until the spends reach
18+
// sufficient depth.
19+
pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>,
1620
// The tip hash observed during our last sync.
1721
pub last_sync_hash: Option<BlockHash>,
1822
// Indicates whether we need to resync, e.g., after encountering an error.
@@ -24,6 +28,7 @@ impl SyncState {
2428
Self {
2529
watched_transactions: HashSet::new(),
2630
watched_outputs: HashMap::new(),
31+
outputs_spends_pending_threshold_conf: Vec::new(),
2732
last_sync_hash: None,
2833
pending_sync: false,
2934
}
@@ -38,6 +43,17 @@ impl SyncState {
3843
}
3944

4045
self.watched_transactions.insert(txid);
46+
47+
// If a previously-confirmed output spend is unconfirmed, re-add the watched output to
48+
// the tracking map.
49+
self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| {
50+
if txid == *conf_txid {
51+
self.watched_outputs.insert(*prev_outpoint, output.clone());
52+
false
53+
} else {
54+
true
55+
}
56+
})
4157
}
4258
}
4359

@@ -57,10 +73,18 @@ impl SyncState {
5773
self.watched_transactions.remove(&ctx.tx.txid());
5874

5975
for input in &ctx.tx.input {
60-
self.watched_outputs.remove(&input.previous_output);
76+
if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
77+
self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output));
78+
}
6179
}
6280
}
6381
}
82+
83+
pub fn prune_output_spends(&mut self, cur_height: u32) {
84+
self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| {
85+
cur_height < conf_height + ANTI_REORG_DELAY - 1
86+
});
87+
}
6488
}
6589

6690

@@ -104,6 +128,7 @@ impl FilterQueue {
104128
#[derive(Debug)]
105129
pub(crate) struct ConfirmedTx {
106130
pub tx: Transaction,
131+
pub txid: Txid,
107132
pub block_header: Header,
108133
pub block_height: u32,
109134
pub pos: usize,

lightning-transaction-sync/src/electrum.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ where
157157
for c in &confirmables {
158158
c.best_block_updated(&tip_header, tip_height);
159159
}
160+
161+
// Prune any sufficiently confirmed output spends
162+
sync_state.prune_output_spends(tip_height);
160163
}
161164

162165
match self.get_confirmed_transactions(&sync_state) {
@@ -254,7 +257,7 @@ where
254257

255258
// First, check the confirmation status of registered transactions as well as the
256259
// status of dependent transactions of registered outputs.
257-
let mut confirmed_txs = Vec::new();
260+
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
258261
let mut watched_script_pubkeys = Vec::with_capacity(
259262
sync_state.watched_transactions.len() + sync_state.watched_outputs.len());
260263
let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
@@ -302,6 +305,9 @@ where
302305

303306
for (i, script_history) in tx_results.iter().enumerate() {
304307
let (txid, tx) = &watched_txs[i];
308+
if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
309+
continue;
310+
}
305311
let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid);
306312
if let Some(history) = filtered_history.next()
307313
{
@@ -321,6 +327,10 @@ where
321327
}
322328

323329
let txid = possible_output_spend.tx_hash;
330+
if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
331+
continue;
332+
}
333+
324334
match self.client.transaction_get(&txid) {
325335
Ok(tx) => {
326336
let mut is_spend = false;
@@ -416,6 +426,7 @@ where
416426
}
417427
let confirmed_tx = ConfirmedTx {
418428
tx: tx.clone(),
429+
txid,
419430
block_header, block_height: prob_conf_height,
420431
pos,
421432
};

lightning-transaction-sync/src/esplora.rs

+30-9
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ where
153153
}
154154
}
155155

156-
match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
156+
match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) {
157157
Ok(()) => {}
158158
Err(InternalError::Inconsistency) => {
159159
// Immediately restart syncing when we encounter any inconsistencies.
@@ -238,7 +238,7 @@ where
238238

239239
#[maybe_async]
240240
fn sync_best_block_updated(
241-
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
241+
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash,
242242
) -> Result<(), InternalError> {
243243

244244
// Inform the interface of the new block.
@@ -249,6 +249,9 @@ where
249249
for c in confirmables {
250250
c.best_block_updated(&tip_header, tip_height);
251251
}
252+
253+
// Prune any sufficiently confirmed output spends
254+
sync_state.prune_output_spends(tip_height);
252255
}
253256
} else {
254257
return Err(InternalError::Inconsistency);
@@ -264,10 +267,13 @@ where
264267
// First, check the confirmation status of registered transactions as well as the
265268
// status of dependent transactions of registered outputs.
266269

267-
let mut confirmed_txs = Vec::new();
270+
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
268271

269272
for txid in &sync_state.watched_transactions {
270-
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
273+
if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
274+
continue;
275+
}
276+
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
271277
confirmed_txs.push(confirmed_tx);
272278
}
273279
}
@@ -278,9 +284,19 @@ where
278284
{
279285
if let Some(spending_txid) = output_status.txid {
280286
if let Some(spending_tx_status) = output_status.status {
287+
if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
288+
if spending_tx_status.confirmed {
289+
// Skip inserting duplicate ConfirmedTx entry
290+
continue;
291+
} else {
292+
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
293+
return Err(InternalError::Inconsistency);
294+
}
295+
}
296+
281297
if let Some(confirmed_tx) = maybe_await!(self
282298
.get_confirmed_tx(
283-
&spending_txid,
299+
spending_txid,
284300
spending_tx_status.block_hash,
285301
spending_tx_status.block_height,
286302
))?
@@ -303,7 +319,7 @@ where
303319

304320
#[maybe_async]
305321
fn get_confirmed_tx(
306-
&self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
322+
&self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
307323
) -> Result<Option<ConfirmedTx>, InternalError> {
308324
if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
309325
let block_header = merkle_block.header;
@@ -318,22 +334,27 @@ where
318334
let mut matches = Vec::new();
319335
let mut indexes = Vec::new();
320336
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
321-
if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
337+
if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
322338
log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
323339
return Err(InternalError::Failed);
324340
}
325341

326342
// unwrap() safety: len() > 0 is checked above
327343
let pos = *indexes.first().unwrap() as usize;
328344
if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
345+
if tx.txid() != txid {
346+
log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
347+
return Err(InternalError::Failed);
348+
}
349+
329350
if let Some(block_height) = known_block_height {
330351
// We can take a shortcut here if a previous call already gave us the height.
331-
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
352+
return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
332353
}
333354

334355
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
335356
if let Some(block_height) = block_status.height {
336-
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
357+
return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
337358
} else {
338359
// If any previously-confirmed block suddenly is no longer confirmed, we found
339360
// an inconsistency and should start over.

0 commit comments

Comments
 (0)