Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 116 additions & 11 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bdk_chain::Merge;
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
use lightning::io::{self, Error, ErrorKind};
use lightning::util::persist::{KVStore, KVStoreSync};
Expand Down Expand Up @@ -181,7 +182,7 @@ impl KVStoreSync for VssStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> io::Result<()> {
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
Expand All @@ -203,6 +204,7 @@ impl KVStoreSync for VssStore {
primary_namespace,
secondary_namespace,
key,
lazy,
)
.await
};
Expand Down Expand Up @@ -275,7 +277,7 @@ impl KVStore for VssStore {
})
}
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
Expand All @@ -292,6 +294,7 @@ impl KVStore for VssStore {
primary_namespace,
secondary_namespace,
key,
lazy,
)
.await
})
Expand Down Expand Up @@ -321,6 +324,7 @@ struct VssStoreInner {
// Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
// The lock also encapsulates the latest written version per key.
locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<u64>>>>,
pending_lazy_deletes: Mutex<Vec<KeyValue>>,
}

impl VssStoreInner {
Expand All @@ -347,7 +351,8 @@ impl VssStoreInner {

let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
let locks = Mutex::new(HashMap::new());
Self { client, store_id, storable_builder, key_obfuscator, locks }
let pending_lazy_deletes = Mutex::new(Vec::new());
Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes }
}

fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> {
Expand Down Expand Up @@ -451,6 +456,12 @@ impl VssStoreInner {
"write",
)?;

let delete_items = self
.pending_lazy_deletes
.try_lock()
.ok()
.and_then(|mut guard| guard.take())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we might lose some lazy deletes if the write below would fail. I do wonder if we should go out of our way to restore the pending items in such a case, or if we're fine just leaning into the 'may or may not succeed' API contract here.

@joostjager Any opinion?

Similiarly, I do wonder if we should spawn-and-forget some tasks on Drop to attempt cleaning up the pending deletes on shutdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question how loose we can get away with. Those keys are then never deleted anymore, which isn't great? I am not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you both think about re-adding the delete_items back to pending_lazy_deletes if write fails? We get to retry deleting them on a subsequent write attempt.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you both think about re-adding the delete_items back to pending_lazy_deletes if write fails? We get to retry deleting them on a subsequent write attempt.

Yeah, that's the straightforward approach. Technically, we wouldn't even need to do this according to the API contract, but it's probably best to keep it best-effort. Blocking on re-acquiring the pending_lazy_deletes mutex in write isn't too great as it suddenly creates yet another interdependency between remove and write, but it should be safe here.

Went ahead and did that now.

.unwrap_or_default();
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
Expand All @@ -464,10 +475,15 @@ impl VssStoreInner {
version: vss_version,
value: storable.encode_to_vec(),
}],
delete_items: vec![],
delete_items: delete_items.clone(),
};

self.client.put_object(&request).await.map_err(|e| {
// Restore delete items so they'll be retried on next write.
if !delete_items.is_empty() {
self.pending_lazy_deletes.lock().unwrap().extend(delete_items);
}

let msg = format!(
"Failed to write to key {}/{}/{}: {}",
primary_namespace, secondary_namespace, key, e
Expand All @@ -482,7 +498,7 @@ impl VssStoreInner {

async fn remove_internal(
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
primary_namespace: String, secondary_namespace: String, key: String,
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
) -> io::Result<()> {
check_namespace_key_validity(
&primary_namespace,
Expand All @@ -491,13 +507,19 @@ impl VssStoreInner {
"remove",
)?;

let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);

let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we store just keys here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we might get away with it, but going forward we might utilize the version field there, so I'd prefer to do the future-safer thing here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You say 'might'. Do you mean that just using keys can already be a problem without the version field?

Even if we use the version field in the future, value still isn't needed?

In general, I would avoid dead code (or fields in this case). It just raises questions with devs. At least it did with me. Or otherwise clearly document.

Final question - is a version needed for lazy deletes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You say 'might'. Do you mean that just using keys can already be a problem without the version field?

No, but it's something we could easily overlook when making use of the version going forward.

Even if we use the version field in the future, value still isn't needed?

Well, whether the API design of KeyValue is good or not is debatable (I didn't invent it), but it is the type that we use to reference entries in the store. I.e., it is the thing used in remove/DeleteObjectRequest.

Final question - is a version needed for lazy deletes?

Well, if we issue a delete on a certain version, the service could in the future ignore a delete if it already has a higher version, no? I.e., it would thereby detect that the value has been updated since the delete was issued.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could map the key to KeyValue at a later point, and not use it internally? Not a blocker.

Well, if we issue a delete on a certain version, the service could in the future ignore a delete if it already has a higher version, no? I.e., it would thereby detect that the value has been updated since the delete was issued.

I thought we didn't want rewrites of the same key after it has been deleted lazily, but not 100% sure anymore. There's no version currently anyway, so a worry for later.

if lazy {
let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap();
pending_lazy_deletes.push(key_value);
return Ok(());
}

self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let request = DeleteObjectRequest {
store_id: self.store_id.clone(),
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
};
let request =
DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) };

self.client.delete_object(&request).await.map_err(|e| {
let msg = format!(
Expand Down Expand Up @@ -644,4 +666,87 @@ mod tests {
do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn vss_lazy_delete() {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let logger = Arc::new(Logger::new_log_facade());
let runtime = Arc::new(Runtime::new(logger).unwrap());
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);

let primary_namespace = "test_namespace";
let secondary_namespace = "";
let key_to_delete = "key_to_delete";
let key_for_trigger = "key_for_trigger";
let data_to_delete = b"data_to_delete".to_vec();
let trigger_data = b"trigger_data".to_vec();

// Write the key that we'll later lazily delete
KVStore::write(
&vss_store,
primary_namespace,
secondary_namespace,
key_to_delete,
data_to_delete.clone(),
)
.await
.unwrap();

// Verify the key exists
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
.await
.unwrap();
assert_eq!(read_data, data_to_delete);

// Perform a lazy delete
KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true)
.await
.unwrap();

// Verify the key still exists (lazy delete doesn't immediately remove it)
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
.await
.unwrap();
assert_eq!(read_data, data_to_delete);

// Verify the key is still in the list
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
assert!(keys.contains(&key_to_delete.to_string()));

// Trigger the actual deletion by performing a write operation
KVStore::write(
&vss_store,
primary_namespace,
secondary_namespace,
key_for_trigger,
trigger_data.clone(),
)
.await
.unwrap();

// Now verify the key is actually deleted
let read_result =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await;
assert!(read_result.is_err());
assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound);

// Verify the key is no longer in the list
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
assert!(!keys.contains(&key_to_delete.to_string()));

// Verify the trigger key still exists
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger)
.await
.unwrap();
assert_eq!(read_data, trigger_data);
}
}