Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ panic = 'abort' # Abort on panic
default = []

[dependencies]
lightning = { version = "0.2.0-beta1", features = ["std"] }
lightning-types = { version = "0.3.0-beta1" }
lightning-invoice = { version = "0.34.0-beta1", features = ["std"] }
lightning-net-tokio = { version = "0.2.0-beta1" }
lightning-persister = { version = "0.2.0-beta1", features = ["tokio"] }
lightning-background-processor = { version = "0.2.0-beta1" }
lightning-rapid-gossip-sync = { version = "0.2.0-beta1" }
lightning-block-sync = { version = "0.2.0-beta1", features = ["rest-client", "rpc-client", "tokio"] }
lightning-transaction-sync = { version = "0.2.0-beta1", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
lightning-liquidity = { version = "0.2.0-beta1", features = ["std"] }
lightning-macros = { version = "0.2.0-beta1" }
lightning = { version = "0.2.0-rc1", features = ["std"] }
lightning-types = { version = "0.3.0-rc1" }
lightning-invoice = { version = "0.34.0-rc1", features = ["std"] }
lightning-net-tokio = { version = "0.2.0-rc1" }
lightning-persister = { version = "0.2.0-rc1", features = ["tokio"] }
lightning-background-processor = { version = "0.2.0-rc1" }
lightning-rapid-gossip-sync = { version = "0.2.0-rc1" }
lightning-block-sync = { version = "0.2.0-rc1", features = ["rest-client", "rpc-client", "tokio"] }
lightning-transaction-sync = { version = "0.2.0-rc1", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
lightning-liquidity = { version = "0.2.0-rc1", features = ["std"] }
lightning-macros = { version = "0.2.0-rc1" }

#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] }
#lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
Expand Down Expand Up @@ -108,7 +108,7 @@ prost = { version = "0.11.6", default-features = false}
winapi = { version = "0.3", features = ["winbase"] }

[dev-dependencies]
lightning = { version = "0.2.0-beta1", features = ["std", "_test_utils"] }
lightning = { version = "0.2.0-rc1", features = ["std", "_test_utils"] }
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] }
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03", features = ["std", "_test_utils"] }
#lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] }
Expand Down
1 change: 1 addition & 0 deletions src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ where
&self.primary_namespace,
&self.secondary_namespace,
&store_key,
false,
)
.map_err(|e| {
log_error!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/sqlite_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl KVStore for SqliteStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
Expand Down Expand Up @@ -205,7 +205,7 @@ impl KVStoreSync for SqliteStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
) -> io::Result<()> {
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
Expand Down
4 changes: 2 additions & 2 deletions src/io/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap();
assert_eq!(data, &*read_data);

kv_store.remove(primary_namespace, secondary_namespace, key).unwrap();
kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap();

let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
assert_eq!(listed_keys.len(), 0);
Expand All @@ -84,7 +84,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
assert_eq!(data, &*read_data);

kv_store.remove(&max_chars, &max_chars, &max_chars).unwrap();
kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();

let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
assert_eq!(listed_keys.len(), 0);
Expand Down
122 changes: 111 additions & 11 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bdk_chain::Merge;
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
use lightning::io::{self, Error, ErrorKind};
use lightning::util::persist::{KVStore, KVStoreSync};
Expand Down Expand Up @@ -181,7 +182,7 @@ impl KVStoreSync for VssStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> io::Result<()> {
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
Expand All @@ -203,6 +204,7 @@ impl KVStoreSync for VssStore {
primary_namespace,
secondary_namespace,
key,
lazy,
)
.await
};
Expand Down Expand Up @@ -275,7 +277,7 @@ impl KVStore for VssStore {
})
}
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
Expand All @@ -292,6 +294,7 @@ impl KVStore for VssStore {
primary_namespace,
secondary_namespace,
key,
lazy,
)
.await
})
Expand Down Expand Up @@ -321,6 +324,7 @@ struct VssStoreInner {
// Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
// The lock also encapsulates the latest written version per key.
locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<u64>>>>,
pending_lazy_deletes: Mutex<Vec<KeyValue>>,
}

impl VssStoreInner {
Expand All @@ -347,7 +351,8 @@ impl VssStoreInner {

let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
let locks = Mutex::new(HashMap::new());
Self { client, store_id, storable_builder, key_obfuscator, locks }
let pending_lazy_deletes = Mutex::new(Vec::new());
Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes }
}

fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> {
Expand Down Expand Up @@ -451,6 +456,12 @@ impl VssStoreInner {
"write",
)?;

let delete_items = self
.pending_lazy_deletes
.try_lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly thought "oh why can't we do this inside the lock that we already obtain", but that doesn't work because here we are also processing deletes on other keys.

.ok()
.and_then(|mut guard| guard.take())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we might lose some lazy deletes if the write below would fail. I do wonder if we should go out of our way to restore the pending items in such a case, or if we're fine just leaning into the 'may or may not succeed' API contract here.

@joostjager Any opinion?

Similiarly, I do wonder if we should spawn-and-forget some tasks on Drop to attempt cleaning up the pending deletes on shutdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question how loose we can get away with. Those keys are then never deleted anymore, which isn't great? I am not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you both think about re-adding the delete_items back to pending_lazy_deletes if write fails? We get to retry deleting them on a subsequent write attempt.

.unwrap_or_default();
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
Expand All @@ -464,7 +475,7 @@ impl VssStoreInner {
version: vss_version,
value: storable.encode_to_vec(),
}],
delete_items: vec![],
delete_items,
};

self.client.put_object(&request).await.map_err(|e| {
Expand All @@ -482,7 +493,7 @@ impl VssStoreInner {

async fn remove_internal(
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
primary_namespace: String, secondary_namespace: String, key: String,
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
) -> io::Result<()> {
check_namespace_key_validity(
&primary_namespace,
Expand All @@ -491,13 +502,19 @@ impl VssStoreInner {
"remove",
)?;

let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);

let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we store just keys here?

if lazy {
let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap();
pending_lazy_deletes.push(key_value);
return Ok(());
}

self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let request = DeleteObjectRequest {
store_id: self.store_id.clone(),
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
};
let request =
DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) };

self.client.delete_object(&request).await.map_err(|e| {
let msg = format!(
Expand Down Expand Up @@ -644,4 +661,87 @@ mod tests {
do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn vss_lazy_delete() {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = thread_rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let logger = Arc::new(Logger::new_log_facade());
let runtime = Arc::new(Runtime::new(logger).unwrap());
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);

let primary_namespace = "test_namespace";
let secondary_namespace = "";
let key_to_delete = "key_to_delete";
let key_for_trigger = "key_for_trigger";
let data_to_delete = b"data_to_delete".to_vec();
let trigger_data = b"trigger_data".to_vec();

// Write the key that we'll later lazily delete
KVStore::write(
&vss_store,
primary_namespace,
secondary_namespace,
key_to_delete,
data_to_delete.clone(),
)
.await
.unwrap();

// Verify the key exists
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
.await
.unwrap();
assert_eq!(read_data, data_to_delete);

// Perform a lazy delete
KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true)
.await
.unwrap();

// Verify the key still exists (lazy delete doesn't immediately remove it)
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
.await
.unwrap();
assert_eq!(read_data, data_to_delete);

// Verify the key is still in the list
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
assert!(keys.contains(&key_to_delete.to_string()));

// Trigger the actual deletion by performing a write operation
KVStore::write(
&vss_store,
primary_namespace,
secondary_namespace,
key_for_trigger,
trigger_data.clone(),
)
.await
.unwrap();

// Now verify the key is actually deleted
let read_result =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await;
assert!(read_result.is_err());
assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound);

// Verify the key is no longer in the list
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
assert!(!keys.contains(&key_to_delete.to_string()));

// Verify the trigger key still exists
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger)
.await
.unwrap();
assert_eq!(read_data, trigger_data);
}
}
30 changes: 20 additions & 10 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,14 +1244,14 @@ impl KVStore for TestSyncStore {
})
}
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
let inner = Arc::clone(&self.inner);
let fut = tokio::task::spawn_blocking(move || {
inner.remove_internal(&primary_namespace, &secondary_namespace, &key)
inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy)
});
Box::pin(async move {
fut.await.unwrap_or_else(|e| {
Expand Down Expand Up @@ -1292,9 +1292,9 @@ impl KVStoreSync for TestSyncStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> lightning::io::Result<()> {
self.inner.remove_internal(primary_namespace, secondary_namespace, key)
self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy)
}

fn list(
Expand Down Expand Up @@ -1432,15 +1432,25 @@ impl TestSyncStoreInner {
}

fn remove_internal(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> lightning::io::Result<()> {
let _guard = self.serializer.write().unwrap();
let fs_res =
KVStoreSync::remove(&self.fs_store, primary_namespace, secondary_namespace, key);
let sqlite_res =
KVStoreSync::remove(&self.sqlite_store, primary_namespace, secondary_namespace, key);
let test_res =
KVStoreSync::remove(&self.test_store, primary_namespace, secondary_namespace, key);
KVStoreSync::remove(&self.fs_store, primary_namespace, secondary_namespace, key, lazy);
let sqlite_res = KVStoreSync::remove(
&self.sqlite_store,
primary_namespace,
secondary_namespace,
key,
lazy,
);
let test_res = KVStoreSync::remove(
&self.test_store,
primary_namespace,
secondary_namespace,
key,
lazy,
);

assert!(!self
.do_list(primary_namespace, secondary_namespace)
Expand Down