-
Notifications
You must be signed in to change notification settings - Fork 113
Implement lazy deletes for VssStore
#689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; | |
| use std::sync::{Arc, Mutex}; | ||
| use std::time::Duration; | ||
|
|
||
| use bdk_chain::Merge; | ||
| use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; | ||
| use lightning::io::{self, Error, ErrorKind}; | ||
| use lightning::util::persist::{KVStore, KVStoreSync}; | ||
|
|
@@ -181,7 +182,7 @@ impl KVStoreSync for VssStore { | |
| } | ||
|
|
||
| fn remove( | ||
| &self, primary_namespace: &str, secondary_namespace: &str, key: &str, | ||
| &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, | ||
| ) -> io::Result<()> { | ||
| let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { | ||
| debug_assert!(false, "Failed to access internal runtime"); | ||
|
|
@@ -203,6 +204,7 @@ impl KVStoreSync for VssStore { | |
| primary_namespace, | ||
| secondary_namespace, | ||
| key, | ||
| lazy, | ||
| ) | ||
| .await | ||
| }; | ||
|
|
@@ -275,7 +277,7 @@ impl KVStore for VssStore { | |
| }) | ||
| } | ||
| fn remove( | ||
| &self, primary_namespace: &str, secondary_namespace: &str, key: &str, | ||
| &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, | ||
| ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> { | ||
| let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); | ||
| let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); | ||
|
|
@@ -292,6 +294,7 @@ impl KVStore for VssStore { | |
| primary_namespace, | ||
| secondary_namespace, | ||
| key, | ||
| lazy, | ||
| ) | ||
| .await | ||
| }) | ||
|
|
@@ -321,6 +324,7 @@ struct VssStoreInner { | |
| // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. | ||
| // The lock also encapsulates the latest written version per key. | ||
| locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<u64>>>>, | ||
| pending_lazy_deletes: Mutex<Vec<KeyValue>>, | ||
| } | ||
|
|
||
| impl VssStoreInner { | ||
|
|
@@ -347,7 +351,8 @@ impl VssStoreInner { | |
|
|
||
| let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); | ||
| let locks = Mutex::new(HashMap::new()); | ||
| Self { client, store_id, storable_builder, key_obfuscator, locks } | ||
| let pending_lazy_deletes = Mutex::new(Vec::new()); | ||
| Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes } | ||
| } | ||
|
|
||
| fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> { | ||
|
|
@@ -451,6 +456,12 @@ impl VssStoreInner { | |
| "write", | ||
| )?; | ||
|
|
||
| let delete_items = self | ||
| .pending_lazy_deletes | ||
| .try_lock() | ||
| .ok() | ||
| .and_then(|mut guard| guard.take()) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, we might lose some @joostjager Any opinion? Similiarly, I do wonder if we should
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question how loose we can get away with. Those keys are then never deleted anymore, which isn't great? I am not sure.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you both think about re-adding the |
||
| .unwrap_or_default(); | ||
| self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { | ||
| let obfuscated_key = | ||
| self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); | ||
|
|
@@ -464,7 +475,7 @@ impl VssStoreInner { | |
| version: vss_version, | ||
| value: storable.encode_to_vec(), | ||
| }], | ||
| delete_items: vec![], | ||
| delete_items, | ||
| }; | ||
|
|
||
| self.client.put_object(&request).await.map_err(|e| { | ||
|
|
@@ -482,7 +493,7 @@ impl VssStoreInner { | |
|
|
||
| async fn remove_internal( | ||
| &self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64, | ||
| primary_namespace: String, secondary_namespace: String, key: String, | ||
| primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, | ||
| ) -> io::Result<()> { | ||
| check_namespace_key_validity( | ||
| &primary_namespace, | ||
|
|
@@ -491,13 +502,19 @@ impl VssStoreInner { | |
| "remove", | ||
| )?; | ||
|
|
||
| let obfuscated_key = | ||
| self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); | ||
|
|
||
| let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] }; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we store just keys here? |
||
| if lazy { | ||
| let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap(); | ||
| pending_lazy_deletes.push(key_value); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { | ||
| let obfuscated_key = | ||
| self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); | ||
| let request = DeleteObjectRequest { | ||
| store_id: self.store_id.clone(), | ||
| key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), | ||
| }; | ||
| let request = | ||
| DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) }; | ||
|
|
||
| self.client.delete_object(&request).await.map_err(|e| { | ||
| let msg = format!( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I briefly thought "oh why can't we do this inside the lock that we already obtain", but that doesn't work because here we are also processing deletes on other keys.