From 3f9f203ad523f5ce05cd3033aca8c66665a1ee77 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 08:13:35 +0200 Subject: [PATCH 1/9] add flake.nix --- .envrc | 10 +++++++ .gitignore | 2 ++ flake.lock | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ flake.nix | 42 ++++++++++++++++++++++++++++ 4 files changed, 136 insertions(+) create mode 100644 .envrc create mode 100644 flake.lock create mode 100644 flake.nix diff --git a/.envrc b/.envrc new file mode 100644 index 000000000..553530849 --- /dev/null +++ b/.envrc @@ -0,0 +1,10 @@ +#!/bin/bash + +# taken from https://fasterthanli.me/series/building-a-rust-service-with-nix/part-10 and updated the version + +if ! has nix_direnv_version || ! nix_direnv_version 3.0.4; then + source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.4/direnvrc" "sha256-DzlYZ33mWF/Gs8DDeyjr8mnVmQGx7ASYqA5WlxwvBG4=" +fi + +watch_file rust-toolchain.toml +use flake diff --git a/.gitignore b/.gitignore index 2d2112421..6782250ae 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ Cargo.lock *.sqlite* crates/electrum/target + +.direnv/ diff --git a/flake.lock b/flake.lock new file mode 100644 index 000000000..0f6e58db2 --- /dev/null +++ b/flake.lock @@ -0,0 +1,82 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1755615617, + "narHash": "sha256-HMwfAJBdrr8wXAkbGhtcby1zGFvs+StOp19xNsbqdOg=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "20075955deac2583bb12f07151c2df830ef346b4", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay" + } + }, + "rust-overlay": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1755743804, + "narHash": "sha256-M6qT02voARH5e9eTXQBzpYIE/hAp6jPgBCyxLmw5uBM=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "80322e975e27d834451d6b66e63f8abae9d74bf2", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 000000000..995f11a32 --- /dev/null +++ b/flake.nix @@ -0,0 +1,42 @@ +{ + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + rust-overlay = { + url = "github:oxalica/rust-overlay"; + inputs = { + nixpkgs.follows = "nixpkgs"; + }; + }; + }; + + outputs = { self, nixpkgs, flake-utils, rust-overlay }: + flake-utils.lib.eachDefaultSystem + (system: + let + overlays = [ (import rust-overlay) ]; + pkgs = import nixpkgs { + inherit system overlays; + }; + inherit (pkgs) lib; + + # Read rust version from rust-version file + rustVersion = lib.strings.removeSuffix "\n" (builtins.readFile ./rust-version); + + rustToolchain = pkgs.rust-bin.stable.${rustVersion}.default.override { + extensions = [ "rust-src" "clippy" ]; + }; + + nativeBuildInputs = with pkgs; [ rustToolchain pkg-config ]; + buildInputs = with pkgs; [ openssl ]; + + in + { + devShells.default = pkgs.mkShell { + inherit buildInputs nativeBuildInputs; + + RUST_VERSION = rustVersion; + }; + } + ); +} \ No newline at end of file From 200a0aed90cfa51b0c07d1e3d6be7acec71d709e Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 08:36:47 +0200 Subject: [PATCH 2/9] init bdk_waterfalls 0.1.0 --- Cargo.toml | 5 ++++- crates/waterfalls/Cargo.toml | 39 +++++++++++++++++++++++++++++++++++ crates/waterfalls/src/main.rs | 3 +++ 3 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 crates/waterfalls/Cargo.toml create mode 100644 crates/waterfalls/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index d505c1a0a..c186a8515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/esplora", "crates/bitcoind_rpc", "crates/testenv", + "crates/waterfalls", "examples/example_cli", "examples/example_electrum", "examples/example_esplora", @@ -22,4 +23,6 @@ print_stdout = "deny" print_stderr = "deny" [workspace.lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(coverage,coverage_nightly)', +] } diff --git a/crates/waterfalls/Cargo.toml b/crates/waterfalls/Cargo.toml new file mode 100644 index 000000000..b464349e0 --- /dev/null +++ b/crates/waterfalls/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "bdk_waterfalls" +version = "0.1.0" +edition = "2021" +authors = ["Riccardo Casatta "] +homepage = "https://bitcoindevkit.org" +repository = "https://github.com/bitcoindevkit/bdk" +documentation = "https://docs.rs/bdk_waterfalls" +description = "Fetch data from waterfalls in the form that bdk accepts" +license = "MIT OR Apache-2.0" +readme = "README.md" + +[lints] +workspace = true + +[dependencies] +bdk_core = { path = "../core", version = "0.6.1", default-features = false } +waterfalls-client = { version = "0.1.0", default-features = false } +async-trait = { version = "0.1.66", optional = true } +futures = { version = "0.3.26", optional = true } + +[dev-dependencies] +waterfalls-client = { version = "0.1.0" } +bdk_chain = { path = "../chain" } +bdk_testenv = { path = "../testenv" } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } + +[features] +default = ["std", "async-https", "blocking-https"] +std = ["bdk_core/std"] +tokio = ["waterfalls-client/tokio"] +async = ["async-trait", "futures", "waterfalls-client/async"] +async-https = ["async", "waterfalls-client/async-https"] +async-https-rustls = ["async", "waterfalls-client/async-https-rustls"] +async-https-native = ["async", "waterfalls-client/async-https-native"] +blocking = ["waterfalls-client/blocking"] +blocking-https = ["blocking", "waterfalls-client/blocking-https"] +blocking-https-rustls = ["blocking", "waterfalls-client/blocking-https-rustls"] +blocking-https-native = ["blocking", "waterfalls-client/blocking-https-native"] diff --git a/crates/waterfalls/src/main.rs b/crates/waterfalls/src/main.rs new file mode 100644 index 000000000..e7a11a969 --- /dev/null +++ b/crates/waterfalls/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} From be88807e8ebcf832c0474f2eff0012fa661ce7ee Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 11:12:39 +0200 Subject: [PATCH 3/9] add waterfalls test env --- crates/waterfalls/Cargo.toml | 3 + crates/waterfalls/src/lib.rs | 133 ++++++++++++++++++++++++++++++++++ crates/waterfalls/src/main.rs | 3 - flake.nix | 5 +- 4 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 crates/waterfalls/src/lib.rs delete mode 100644 crates/waterfalls/src/main.rs diff --git a/crates/waterfalls/Cargo.toml b/crates/waterfalls/Cargo.toml index b464349e0..d53cb3cdb 100644 --- a/crates/waterfalls/Cargo.toml +++ b/crates/waterfalls/Cargo.toml @@ -21,6 +21,9 @@ futures = { version = "0.3.26", optional = true } [dev-dependencies] waterfalls-client = { version = "0.1.0" } +waterfalls = { version = "0.9", default-features = false, features = [ + "test_env", +] } bdk_chain = { path = "../chain" } bdk_testenv = { path = "../testenv" } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/crates/waterfalls/src/lib.rs b/crates/waterfalls/src/lib.rs new file mode 100644 index 000000000..5cb356d2c --- /dev/null +++ b/crates/waterfalls/src/lib.rs @@ -0,0 +1,133 @@ +use std::collections::BTreeMap; + +use bdk_core::{ + bitcoin::{address::FromScriptError, Address, Network}, + spk_client::{FullScanRequest, FullScanResponse}, + TxUpdate, +}; +use waterfalls_client::{BlockingClient, Builder}; + +pub type Error = Box; + +struct Client { + client: BlockingClient, + network: Network, +} + +impl Client { + pub fn new(network: Network, base_url: &str) -> Result { + Ok(Self { + client: Builder::new(base_url).build_blocking(), + network, + }) + } + + fn full_scan>>( + &self, + request: R, + _stop_gap: usize, + _parallel_requests: usize, + ) -> Result, Error> { + let mut request: FullScanRequest = request.into(); + + let keychain_addresses = + get_addesses_from_request(&mut request, self.network).map_err(|e| Box::new(e))?; + + let chain_update = None; + let tx_update = TxUpdate::default(); + let last_active_indices = BTreeMap::new(); + + for (_keychain, addresses) in keychain_addresses { + // TODO: we can do a single call for multiple keychains + let _result = self + .client + .waterfalls_addresses(&addresses) + .map_err(|e| Box::new(e))?; + } + + Ok(FullScanResponse { + chain_update, + tx_update, + last_active_indices, + }) + } +} + +fn get_addesses_from_request( + request: &mut FullScanRequest, + network: Network, +) -> Result)>, FromScriptError> { + let mut result = Vec::new(); + for (i, keychain) in request.keychains().iter().enumerate() { + let mut addresses = Vec::new(); + let keychain_spks = request.iter_spks(keychain.clone()); + for (_, spk) in keychain_spks { + addresses.push(Address::from_script(&spk, network)?); + } + println!("keychain_{}: addresses: {:?}", i, addresses); + result.push((keychain.clone(), addresses)); + } + Ok(result) +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use bdk_core::{ + bitcoin::{Address, Network}, + spk_client::FullScanRequest, + }; + use bdk_testenv::anyhow; + + use crate::Client; + + #[test] + pub fn test_full_scan() -> anyhow::Result<()> { + // Initialize a waterfalls TestEnv + let rt = tokio::runtime::Runtime::new().unwrap(); + let test_env = rt.block_on(async { + let exe = std::env::var("BITCOIND_EXEC").expect("BITCOIND_EXEC must be set"); + waterfalls::test_env::launch(exe, waterfalls::be::Family::Bitcoin).await + }); + let url = test_env.base_url(); + + let client = Client::new(Network::Regtest, &url).map_err(|e| anyhow::anyhow!("{}", e))?; + + // Now let's test the gap limit. First of all get a chain of 10 addresses. + let addresses = [ + "bcrt1qj9f7r8r3p2y0sqf4r3r62qysmkuh0fzep473d2ar7rcz64wqvhssjgf0z4", + "bcrt1qmm5t0ch7vh2hryx9ctq3mswexcugqe4atkpkl2tetm8merqkthas3w7q30", + "bcrt1qut9p7ej7l7lhyvekj28xknn8gnugtym4d5qvnp5shrsr4nksmfqsmyn87g", + "bcrt1qqz0xtn3m235p2k96f5wa2dqukg6shxn9n3txe8arlrhjh5p744hsd957ww", + "bcrt1q9c0t62a8l6wfytmf2t9lfj35avadk3mm8g4p3l84tp6rl66m48sqrme7wu", + "bcrt1qkmh8yrk2v47cklt8dytk8f3ammcwa4q7dzattedzfhqzvfwwgyzsg59zrh", + "bcrt1qvgrsrzy07gjkkfr5luplt0azxtfwmwq5t62gum5jr7zwcvep2acs8hhnp2", + "bcrt1qw57edarcg50ansq8mk3guyrk78rk0fwvrds5xvqeupteu848zayq549av8", + "bcrt1qvtve5ekf6e5kzs68knvnt2phfw6a0yjqrlgat392m6zt9jsvyxhqfx67ef", + "bcrt1qw03ddumfs9z0kcu76ln7jrjfdwam20qtffmkcral3qtza90sp9kqm787uk", + ]; + let addresses: Vec<_> = addresses + .into_iter() + .map(|s| Address::from_str(s).unwrap().assume_checked()) + .collect(); + let spks: Vec<_> = addresses + .iter() + .enumerate() + .map(|(i, addr)| (i as u32, addr.script_pubkey())) + .collect(); + + // Then send coins on one of the addresses. + + let _full_scan_update = { + let request = FullScanRequest::builder().spks_for_keychain(0, spks.clone()); + client + .full_scan(request, 0, 0) + .map_err(|e| anyhow::anyhow!("{}", e))? + }; + + rt.block_on(test_env.shutdown()); + + Ok(()) + } +} diff --git a/crates/waterfalls/src/main.rs b/crates/waterfalls/src/main.rs deleted file mode 100644 index e7a11a969..000000000 --- a/crates/waterfalls/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/flake.nix b/flake.nix index 995f11a32..cb5d954e2 100644 --- a/flake.nix +++ b/flake.nix @@ -28,7 +28,7 @@ }; nativeBuildInputs = with pkgs; [ rustToolchain pkg-config ]; - buildInputs = with pkgs; [ openssl ]; + buildInputs = with pkgs; [ openssl bitcoind ]; in { @@ -36,6 +36,9 @@ inherit buildInputs nativeBuildInputs; RUST_VERSION = rustVersion; + + # Environment variables for integration tests + BITCOIND_EXEC = "${pkgs.bitcoind}/bin/bitcoind"; }; } ); From fbe0abf9cd80b3daf2f6935d740775f00fd49e05 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 11:33:01 +0200 Subject: [PATCH 4/9] compute last_active_indices --- crates/waterfalls/Cargo.toml | 1 + crates/waterfalls/src/lib.rs | 41 ++++++++++++++++++++++++++++-------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/crates/waterfalls/Cargo.toml b/crates/waterfalls/Cargo.toml index d53cb3cdb..46752e20e 100644 --- a/crates/waterfalls/Cargo.toml +++ b/crates/waterfalls/Cargo.toml @@ -27,6 +27,7 @@ waterfalls = { version = "0.9", default-features = false, features = [ bdk_chain = { path = "../chain" } bdk_testenv = { path = "../testenv" } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } +env_logger = "0.11.8" [features] default = ["std", "async-https", "blocking-https"] diff --git a/crates/waterfalls/src/lib.rs b/crates/waterfalls/src/lib.rs index 5cb356d2c..23fde6e76 100644 --- a/crates/waterfalls/src/lib.rs +++ b/crates/waterfalls/src/lib.rs @@ -5,11 +5,11 @@ use bdk_core::{ spk_client::{FullScanRequest, FullScanResponse}, TxUpdate, }; -use waterfalls_client::{BlockingClient, Builder}; +use waterfalls_client::{api::WaterfallResponse, BlockingClient, Builder}; pub type Error = Box; -struct Client { +pub struct Client { client: BlockingClient, network: Network, } @@ -22,7 +22,7 @@ impl Client { }) } - fn full_scan>>( + pub fn full_scan>>( &self, request: R, _stop_gap: usize, @@ -33,16 +33,18 @@ impl Client { let keychain_addresses = get_addesses_from_request(&mut request, self.network).map_err(|e| Box::new(e))?; - let chain_update = None; + let chain_update = None; // TODO handle chain update let tx_update = TxUpdate::default(); - let last_active_indices = BTreeMap::new(); + let mut last_active_indices = BTreeMap::new(); - for (_keychain, addresses) in keychain_addresses { + for (keychain, addresses) in keychain_addresses { // TODO: we can do a single call for multiple keychains - let _result = self + let result = self .client .waterfalls_addresses(&addresses) .map_err(|e| Box::new(e))?; + let index = get_last_active_index(&result); + last_active_indices.insert(keychain, index); } Ok(FullScanResponse { @@ -70,6 +72,20 @@ fn get_addesses_from_request( Ok(result) } +/// Find the index of the last non-empty element in the addresses array from waterfalls response +fn get_last_active_index(response: &WaterfallResponse) -> u32 { + if let Some(addresses) = response.txs_seen.get("addresses") { + // Find the last index that has non-empty transactions + for (index, txs) in addresses.iter().enumerate().rev() { + if !txs.is_empty() { + return index as u32; + } + } + } + // If no addresses with transactions found, return 0 + 0 +} + #[cfg(test)] mod test { use std::str::FromStr; @@ -84,6 +100,7 @@ mod test { #[test] pub fn test_full_scan() -> anyhow::Result<()> { + let _ = env_logger::try_init(); // Initialize a waterfalls TestEnv let rt = tokio::runtime::Runtime::new().unwrap(); let test_env = rt.block_on(async { @@ -117,15 +134,21 @@ mod test { .map(|(i, addr)| (i as u32, addr.script_pubkey())) .collect(); - // Then send coins on one of the addresses. + // Send coins to one of the addresses from the bitcoin node. + // Convert the first address to waterfalls format and send coins to it + let waterfalls_address = waterfalls::be::Address::Bitcoin(addresses[3].clone()); + let _txid = test_env.send_to(&waterfalls_address, 10000); + rt.block_on(test_env.node_generate(1)); - let _full_scan_update = { + let full_scan_update = { let request = FullScanRequest::builder().spks_for_keychain(0, spks.clone()); client .full_scan(request, 0, 0) .map_err(|e| anyhow::anyhow!("{}", e))? }; + assert_eq!(full_scan_update.last_active_indices.get(&0).unwrap(), &3u32); + rt.block_on(test_env.shutdown()); Ok(()) From ecb8c5064c05b86cd902c874fca963ffa5028dd4 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 12:55:02 +0200 Subject: [PATCH 5/9] download txs --- crates/waterfalls/Cargo.toml | 1 + crates/waterfalls/src/lib.rs | 60 +++++++++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/crates/waterfalls/Cargo.toml b/crates/waterfalls/Cargo.toml index 46752e20e..08cb79a91 100644 --- a/crates/waterfalls/Cargo.toml +++ b/crates/waterfalls/Cargo.toml @@ -18,6 +18,7 @@ bdk_core = { path = "../core", version = "0.6.1", default-features = false } waterfalls-client = { version = "0.1.0", default-features = false } async-trait = { version = "0.1.66", optional = true } futures = { version = "0.3.26", optional = true } +log = "0.4.20" [dev-dependencies] waterfalls-client = { version = "0.1.0" } diff --git a/crates/waterfalls/src/lib.rs b/crates/waterfalls/src/lib.rs index 23fde6e76..158abbd3e 100644 --- a/crates/waterfalls/src/lib.rs +++ b/crates/waterfalls/src/lib.rs @@ -1,11 +1,11 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use bdk_core::{ - bitcoin::{address::FromScriptError, Address, Network}, + bitcoin::{self, address::FromScriptError, Address, Network}, spk_client::{FullScanRequest, FullScanResponse}, - TxUpdate, + BlockId, ConfirmationBlockTime, TxUpdate, }; -use waterfalls_client::{api::WaterfallResponse, BlockingClient, Builder}; +use waterfalls_client::{api::WaterfallResponse, BlockingClient, Builder, TxStatus}; pub type Error = Box; @@ -26,7 +26,7 @@ impl Client { &self, request: R, _stop_gap: usize, - _parallel_requests: usize, + parallel_requests: usize, ) -> Result, Error> { let mut request: FullScanRequest = request.into(); @@ -34,9 +34,10 @@ impl Client { get_addesses_from_request(&mut request, self.network).map_err(|e| Box::new(e))?; let chain_update = None; // TODO handle chain update - let tx_update = TxUpdate::default(); + let mut tx_update = TxUpdate::default(); let mut last_active_indices = BTreeMap::new(); + let mut waterfalls_responses = Vec::new(); for (keychain, addresses) in keychain_addresses { // TODO: we can do a single call for multiple keychains let result = self @@ -45,6 +46,49 @@ impl Client { .map_err(|e| Box::new(e))?; let index = get_last_active_index(&result); last_active_indices.insert(keychain, index); + waterfalls_responses.push(result); + } + let txids: HashSet<_> = waterfalls_responses + .iter() + .flat_map(|response| { + response + .txs_seen + .get("addresses") + .unwrap() + .iter() + .flat_map(|txs| txs.iter().map(|tx| tx.txid)) + }) + .collect(); + + log::info!("txids: {:?}", txids); + + // Fetch transactions in parallel using parallel_requests parameter + let mut txids_iter = txids.into_iter(); + loop { + let handles = txids_iter + .by_ref() + .take(parallel_requests.max(1)) // Ensure at least 1 request + .map(|txid| { + let client = self.client.clone(); + std::thread::spawn(move || { + client + .get_tx(&txid) + .map_err(|e| Box::new(e) as Error) + .map(|tx_opt| (txid, tx_opt)) + }) + }) + .collect::>(); + + if handles.is_empty() { + break; + } + + for handle in handles { + let (_txid, tx_opt) = handle.join().expect("thread must not panic")?; + if let Some(tx) = tx_opt { + tx_update.txs.push(std::sync::Arc::new(tx)); + } + } } Ok(FullScanResponse { @@ -66,7 +110,7 @@ fn get_addesses_from_request( for (_, spk) in keychain_spks { addresses.push(Address::from_script(&spk, network)?); } - println!("keychain_{}: addresses: {:?}", i, addresses); + log::info!("keychain_{}: addresses: {:?}", i, addresses); result.push((keychain.clone(), addresses)); } Ok(result) @@ -143,7 +187,7 @@ mod test { let full_scan_update = { let request = FullScanRequest::builder().spks_for_keychain(0, spks.clone()); client - .full_scan(request, 0, 0) + .full_scan(request, 0, 4) .map_err(|e| anyhow::anyhow!("{}", e))? }; From 2af3f169836fa51a210fbf8f142136bdd2de8b9f Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 18:05:47 +0200 Subject: [PATCH 6/9] bdk_waterfalls: updates --- crates/waterfalls/src/lib.rs | 106 +++++++++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 4 deletions(-) diff --git a/crates/waterfalls/src/lib.rs b/crates/waterfalls/src/lib.rs index 158abbd3e..a3602acd4 100644 --- a/crates/waterfalls/src/lib.rs +++ b/crates/waterfalls/src/lib.rs @@ -3,10 +3,12 @@ use std::collections::{BTreeMap, HashSet}; use bdk_core::{ bitcoin::{self, address::FromScriptError, Address, Network}, spk_client::{FullScanRequest, FullScanResponse}, - BlockId, ConfirmationBlockTime, TxUpdate, + BlockId, CheckPoint, ConfirmationBlockTime, TxUpdate, }; use waterfalls_client::{api::WaterfallResponse, BlockingClient, Builder, TxStatus}; +pub use waterfalls_client; + pub type Error = Box; pub struct Client { @@ -14,6 +16,11 @@ pub struct Client { network: Network, } +// Open points: +// How to handle unbounded keychains? +// How to handle evicted txs? +// How to handle custom gap limit? + impl Client { pub fn new(network: Network, base_url: &str) -> Result { Ok(Self { @@ -22,6 +29,11 @@ impl Client { }) } + pub fn broadcast(&self, tx: &bdk_core::bitcoin::Transaction) -> Result<(), Error> { + self.client.broadcast(tx).map_err(|e| Box::new(e))?; + Ok(()) + } + pub fn full_scan>>( &self, request: R, @@ -33,7 +45,7 @@ impl Client { let keychain_addresses = get_addesses_from_request(&mut request, self.network).map_err(|e| Box::new(e))?; - let chain_update = None; // TODO handle chain update + let mut chain_update = None; let mut tx_update = TxUpdate::default(); let mut last_active_indices = BTreeMap::new(); @@ -44,8 +56,29 @@ impl Client { .client .waterfalls_addresses(&addresses) .map_err(|e| Box::new(e))?; + log::info!("result: {:?}", result); let index = get_last_active_index(&result); + log::info!("last_active_index: {:?}", index); last_active_indices.insert(keychain, index); + + for tx_seens in result.txs_seen.get("addresses").unwrap().iter() { + for tx_seen in tx_seens.iter() { + insert_anchor_or_seen_at_from_status( + &mut tx_update, + request.start_time(), + tx_seen.txid, + tx_status(tx_seen), + ); + } + } + + if let Some(block_hash) = &result.tip { + chain_update = Some(CheckPoint::new(BlockId { + height: 1000 as u32, // TODO: get height + hash: *block_hash, + })); + } + waterfalls_responses.push(result); } let txids: HashSet<_> = waterfalls_responses @@ -107,8 +140,13 @@ fn get_addesses_from_request( for (i, keychain) in request.keychains().iter().enumerate() { let mut addresses = Vec::new(); let keychain_spks = request.iter_spks(keychain.clone()); - for (_, spk) in keychain_spks { + for (j, spk) in keychain_spks { addresses.push(Address::from_script(&spk, network)?); + + if addresses.len() == 50 { + // TODO hanlde unbounded keychains + break; + } } log::info!("keychain_{}: addresses: {:?}", i, addresses); result.push((keychain.clone(), addresses)); @@ -130,12 +168,47 @@ fn get_last_active_index(response: &WaterfallResponse) -> u32 { 0 } +fn tx_status(tx_seen: &waterfalls_client::api::TxSeen) -> TxStatus { + TxStatus { + block_height: (tx_seen.height != 0).then_some(tx_seen.height), + block_hash: tx_seen.block_hash, + block_time: tx_seen.block_timestamp.map(|t| t as u64), + confirmed: tx_seen.height != 0, + } +} + +// copied from esplora/src/lib.rs +#[allow(dead_code)] +fn insert_anchor_or_seen_at_from_status( + update: &mut TxUpdate, + start_time: u64, + txid: bitcoin::Txid, + status: TxStatus, +) { + if let TxStatus { + block_height: Some(height), + block_hash: Some(hash), + block_time: Some(time), + .. + } = status + { + let anchor = ConfirmationBlockTime { + block_id: BlockId { height, hash }, + confirmation_time: time, + }; + update.anchors.insert((anchor, txid)); + } else { + update.seen_ats.insert((txid, start_time)); + } +} + #[cfg(test)] mod test { use std::str::FromStr; + use bdk_chain::{miniscript::Descriptor, SpkIterator}; use bdk_core::{ - bitcoin::{Address, Network}, + bitcoin::{key::Secp256k1, Address, Network}, spk_client::FullScanRequest, }; use bdk_testenv::anyhow; @@ -197,4 +270,29 @@ mod test { Ok(()) } + + #[test] + #[ignore = "requires prod server and internet"] + pub fn test_signet() { + let _ = env_logger::try_init(); + let base_url = "https://waterfalls.liquidwebwallet.org/bitcoinsignet/api"; + let client = Client::new(Network::Signet, base_url).unwrap(); + + let waterfalls_client = waterfalls_client::Builder::new(base_url).build_blocking(); + let external = "tr(tpubDDh1wUM29wsoJnHomNYrEwhGainWHUSzErfNrsZKiCjQWWUjFLwhtAqWvGUKc4oESXqcGKdbPDv7fBDsPHPYitNuGNrJ9BKrW1GPxUyiUUb/0/*)"; + let internal = "tr(tpubDDh1wUM29wsoJnHomNYrEwhGainWHUSzErfNrsZKiCjQWWUjFLwhtAqWvGUKc4oESXqcGKdbPDv7fBDsPHPYitNuGNrJ9BKrW1GPxUyiUUb/1/*)"; + + let resp = waterfalls_client.waterfalls(&external).unwrap(); + log::info!("resp: {:?}", resp); + let resp2 = waterfalls_client.waterfalls(&internal).unwrap(); + log::info!("resp2: {:?}", resp2); + + let secp = Secp256k1::new(); + let (external_descriptor, _) = Descriptor::parse_descriptor(&secp, external).unwrap(); + let (internal_descriptor, _) = Descriptor::parse_descriptor(&secp, internal).unwrap(); + let request = FullScanRequest::builder() + .spks_for_keychain(0, SpkIterator::new_with_range(external_descriptor, 0..100)) + .spks_for_keychain(1, SpkIterator::new_with_range(internal_descriptor, 0..100)); + let full_scan_update = client.full_scan(request, 0, 4).unwrap(); + } } From ae33f13b9b7fff79f764eb2d2a81b8d3d949f5ce Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 18:06:06 +0200 Subject: [PATCH 7/9] bootstrap example waterfalls --- Cargo.toml | 1 + examples/example_waterfalls/Cargo.toml | 12 ++ examples/example_waterfalls/src/main.rs | 179 ++++++++++++++++++++++++ 3 files changed, 192 insertions(+) create mode 100644 examples/example_waterfalls/Cargo.toml create mode 100644 examples/example_waterfalls/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index c186a8515..e05a3ac7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "examples/example_cli", "examples/example_electrum", "examples/example_esplora", + "examples/example_waterfalls", "examples/example_bitcoind_rpc_polling", ] diff --git a/examples/example_waterfalls/Cargo.toml b/examples/example_waterfalls/Cargo.toml new file mode 100644 index 000000000..9d3d5c63d --- /dev/null +++ b/examples/example_waterfalls/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_waterfalls" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_waterfalls = { path = "../../crates/waterfalls", features = ["blocking"] } +example_cli = { path = "../example_cli" } +env_logger = "0.11" diff --git a/examples/example_waterfalls/src/main.rs b/examples/example_waterfalls/src/main.rs new file mode 100644 index 000000000..d5349348f --- /dev/null +++ b/examples/example_waterfalls/src/main.rs @@ -0,0 +1,179 @@ +use core::f32; +use std::{ + collections::BTreeSet, + io::{self, Write}, +}; + +use bdk_chain::{ + bitcoin::Network, + keychain_txout::FullScanRequestBuilderExt, + spk_client::{FullScanRequest, SyncRequest}, + CanonicalizationParams, Merge, +}; +use bdk_waterfalls::waterfalls_client; +use example_cli::{ + anyhow::{self, Context}, + clap::{self, Parser, Subcommand}, + ChangeSet, Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_waterfalls"; +const DB_PATH: &str = ".bdk_example_waterfalls.db"; + +#[derive(Subcommand, Debug, Clone)] +enum WaterfallsCommands { + /// Scans the addresses in the wallet using the waterfalls API. + Scan { + #[clap(flatten)] + waterfalls_args: WaterfallsArgs, + + #[clap(flatten)] + scan_options: ScanOptions, + }, +} + +impl WaterfallsCommands { + fn waterfalls_args(&self) -> WaterfallsArgs { + match self { + WaterfallsCommands::Scan { + waterfalls_args, .. + } => waterfalls_args.clone(), + } + } +} + +#[derive(clap::Args, Debug, Clone)] +pub struct WaterfallsArgs { + /// The esplora url endpoint to connect to. + #[clap(long, short = 'u', env = "WATERFALLS_SERVER")] + waterfalls_url: Option, +} + +#[derive(Parser, Debug, Clone, PartialEq)] +pub struct ScanOptions { + /// Max number of concurrent esplora server requests. + #[clap(long, default_value = "2")] + pub parallel_requests: usize, +} + +impl WaterfallsArgs { + pub fn client(&self, network: Network) -> anyhow::Result { + let waterfalls_url = self.waterfalls_url.as_deref().unwrap_or(match network { + Network::Bitcoin => "https://waterfalls.liquidwebwallet.org/bitcoin/api", + Network::Regtest => "http://localhost:3002", + Network::Signet => "https://waterfalls.liquidwebwallet.org/bitcoinsignet/api", + _ => panic!("unsupported network"), + }); + + let client = bdk_waterfalls::Client::new(network, waterfalls_url).unwrap(); // TODO: handle error + Ok(client) + } +} + +fn main() -> anyhow::Result<()> { + env_logger::init(); + let example_cli::Init { + args, + graph, + chain, + db, + network, + } = match example_cli::init_or_load::(DB_MAGIC, DB_PATH)? { + Some(init) => init, + None => return Ok(()), + }; + + let waterfalls_cmd = match &args.command { + // These are commands that are handled by this example (sync, scan). + example_cli::Commands::ChainSpecific(esplora_cmd) => esplora_cmd, + // These are general commands handled by example_cli. Execute the cmd and return. + general_cmd => { + return example_cli::handle_commands( + &graph, + &chain, + &db, + network, + |esplora_args, tx| { + let client = esplora_args.client(network)?; + client.broadcast(tx).unwrap(); // TODO: handle error + Ok(()) + }, + general_cmd.clone(), + ); + } + }; + + let client = waterfalls_cmd.waterfalls_args().client(network)?; + // Prepare the `IndexedTxGraph` and `LocalChain` updates based on whether we are scanning or + // syncing. + // + // Scanning: We are iterating through spks of all keychains and scanning for transactions for + // each spk. We start with the lowest derivation index spk and stop scanning after `stop_gap` + // number of consecutive spks have no transaction history. A Scan is done in situations of + // wallet restoration. It is a special case. Applications should use "sync" style updates + // after an initial scan. + // + // Syncing: We only check for specified spks, utxos and txids to update their confirmation + // status or fetch missing transactions. + let (local_chain_changeset, indexed_tx_graph_changeset) = match &waterfalls_cmd { + WaterfallsCommands::Scan { scan_options, .. } => { + let request = { + let chain_tip = chain.lock().expect("mutex must not be poisoned").tip(); + let indexed_graph = &*graph.lock().expect("mutex must not be poisoned"); + FullScanRequest::builder() + .chain_tip(chain_tip) + .spks_from_indexer(&indexed_graph.index) + .inspect({ + let mut once = BTreeSet::::new(); + move |keychain, spk_i, _| { + if once.insert(keychain) { + eprint!("\nscanning {keychain}: "); + } + eprint!("{spk_i} "); + // Flush early to ensure we print at every iteration. + let _ = io::stderr().flush(); + } + }) + .build() + }; + + // The client scans keychain spks for transaction histories, stopping after `stop_gap` + // is reached. It returns a `TxGraph` update (`tx_update`) and a structure that + // represents the last active spk derivation indices of keychains + // (`keychain_indices_update`). + let update = client + .full_scan(request, 10, scan_options.parallel_requests) + .unwrap(); // TODO: handle error + + let mut graph = graph.lock().expect("mutex must not be poisoned"); + let mut chain = chain.lock().expect("mutex must not be poisoned"); + // Because we did a stop gap based scan we are likely to have some updates to our + // deriviation indices. Usually before a scan you are on a fresh wallet with no + // addresses derived so we need to derive up to last active addresses the scan found + // before adding the transactions. + ( + chain.apply_update(update.chain_update.expect("request included chain tip"))?, + { + let index_changeset = graph + .index + .reveal_to_target_multi(&update.last_active_indices); + let mut indexed_tx_graph_changeset = graph.apply_update(update.tx_update); + indexed_tx_graph_changeset.merge(index_changeset.into()); + indexed_tx_graph_changeset + }, + ) + } + }; + + println!(); + + // We persist the changes + let mut db = db.lock().unwrap(); + db.append(&ChangeSet { + local_chain: local_chain_changeset, + tx_graph: indexed_tx_graph_changeset.tx_graph, + indexer: indexed_tx_graph_changeset.indexer, + ..Default::default() + })?; + Ok(()) +} From 6bac55a4c22a63ed3d1aa95e9f46735e08b7acfe Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 2 Sep 2025 18:07:01 +0200 Subject: [PATCH 8/9] fix warnings --- crates/waterfalls/src/lib.rs | 2 +- examples/example_waterfalls/src/main.rs | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/crates/waterfalls/src/lib.rs b/crates/waterfalls/src/lib.rs index a3602acd4..f23d9bb8b 100644 --- a/crates/waterfalls/src/lib.rs +++ b/crates/waterfalls/src/lib.rs @@ -140,7 +140,7 @@ fn get_addesses_from_request( for (i, keychain) in request.keychains().iter().enumerate() { let mut addresses = Vec::new(); let keychain_spks = request.iter_spks(keychain.clone()); - for (j, spk) in keychain_spks { + for (_, spk) in keychain_spks { addresses.push(Address::from_script(&spk, network)?); if addresses.len() == 50 { diff --git a/examples/example_waterfalls/src/main.rs b/examples/example_waterfalls/src/main.rs index d5349348f..20d08a450 100644 --- a/examples/example_waterfalls/src/main.rs +++ b/examples/example_waterfalls/src/main.rs @@ -1,18 +1,13 @@ -use core::f32; use std::{ collections::BTreeSet, io::{self, Write}, }; use bdk_chain::{ - bitcoin::Network, - keychain_txout::FullScanRequestBuilderExt, - spk_client::{FullScanRequest, SyncRequest}, - CanonicalizationParams, Merge, + bitcoin::Network, keychain_txout::FullScanRequestBuilderExt, spk_client::FullScanRequest, Merge, }; -use bdk_waterfalls::waterfalls_client; use example_cli::{ - anyhow::{self, Context}, + anyhow::{self}, clap::{self, Parser, Subcommand}, ChangeSet, Keychain, }; From 5d83b39c0823eb827e2bee763e0dcbd8cb4731fa Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Wed, 3 Sep 2025 09:51:37 +0200 Subject: [PATCH 9/9] handle chain update --- crates/waterfalls/Cargo.toml | 6 +-- crates/waterfalls/src/lib.rs | 73 +++++++++++++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/crates/waterfalls/Cargo.toml b/crates/waterfalls/Cargo.toml index 08cb79a91..5ca45f6b3 100644 --- a/crates/waterfalls/Cargo.toml +++ b/crates/waterfalls/Cargo.toml @@ -15,14 +15,14 @@ workspace = true [dependencies] bdk_core = { path = "../core", version = "0.6.1", default-features = false } -waterfalls-client = { version = "0.1.0", default-features = false } +waterfalls-client = { version = "0.2.0", default-features = false } async-trait = { version = "0.1.66", optional = true } futures = { version = "0.3.26", optional = true } log = "0.4.20" [dev-dependencies] -waterfalls-client = { version = "0.1.0" } -waterfalls = { version = "0.9", default-features = false, features = [ +waterfalls-client = { version = "0.2.0" } +waterfalls = { version = "0.9.6", default-features = false, features = [ "test_env", ] } bdk_chain = { path = "../chain" } diff --git a/crates/waterfalls/src/lib.rs b/crates/waterfalls/src/lib.rs index f23d9bb8b..c09be5aaa 100644 --- a/crates/waterfalls/src/lib.rs +++ b/crates/waterfalls/src/lib.rs @@ -42,6 +42,9 @@ impl Client { ) -> Result, Error> { let mut request: FullScanRequest = request.into(); + let chain_tip = request.chain_tip(); + let mut latest_blocks = BTreeMap::new(); + let keychain_addresses = get_addesses_from_request(&mut request, self.network).map_err(|e| Box::new(e))?; @@ -50,6 +53,8 @@ impl Client { let mut last_active_indices = BTreeMap::new(); let mut waterfalls_responses = Vec::new(); + let mut tip_meta_from_server = None; + for (keychain, addresses) in keychain_addresses { // TODO: we can do a single call for multiple keychains let result = self @@ -69,18 +74,43 @@ impl Client { tx_seen.txid, tx_status(tx_seen), ); + if let Some(block_hash) = tx_seen.block_hash { + latest_blocks.insert(tx_seen.height, block_hash); + } } } - if let Some(block_hash) = &result.tip { - chain_update = Some(CheckPoint::new(BlockId { - height: 1000 as u32, // TODO: get height - hash: *block_hash, - })); + // Store the tip_meta from server for later use + if let Some(tip_meta) = &result.tip_meta { + tip_meta_from_server = Some(tip_meta.clone()); } waterfalls_responses.push(result); } + + // Handle chain update after the loop using the stored tip_meta + if let Some(tip_meta) = tip_meta_from_server { + if let Some(existing_tip) = chain_tip { + // Extend the existing chain tip to the new tip height if needed + let new_tip_block_id = BlockId { + height: tip_meta.h, + hash: tip_meta.b, + }; + + // If the new tip is higher than our existing tip, extend the chain + if tip_meta.h > existing_tip.height() { + chain_update = Some(existing_tip.extend([new_tip_block_id]).map_err(|e| { + Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Chain extension failed: {:?}", e), + )) as Error + })?); + } else { + // If new tip is not higher, keep the existing tip + chain_update = Some(existing_tip); + } + } + } let txids: HashSet<_> = waterfalls_responses .iter() .flat_map(|response| { @@ -123,6 +153,15 @@ impl Client { } } } + let chain_update = if let Some(chain_update) = chain_update { + Some(chain_update_with_anchors( + chain_update, + &latest_blocks, + tx_update.anchors.iter().cloned(), + )?) + } else { + None + }; Ok(FullScanResponse { chain_update, @@ -202,6 +241,30 @@ fn insert_anchor_or_seen_at_from_status( } } +// copied from electrum/src/lib.rs +// Add a corresponding checkpoint per anchor height if it does not yet exist. Checkpoints should not +// surpass `latest_blocks`. +fn chain_update_with_anchors( + mut tip: CheckPoint, + latest_blocks: &BTreeMap, + anchors: impl Iterator, +) -> Result { + for (anchor, _txid) in anchors { + let height = anchor.block_id.height; + + // Checkpoint uses the `BlockHash` from `latest_blocks` so that the hash will be consistent + // in case of a re-org. + if tip.get(height).is_none() && height <= tip.height() { + let hash = match latest_blocks.get(&height) { + Some(&hash) => hash, + None => anchor.block_id.hash, + }; + tip = tip.insert(BlockId { hash, height }); + } + } + Ok(tip) +} + #[cfg(test)] mod test { use std::str::FromStr;