Skip to content

Commit 50f22a1

Browse files
committed
implement wasi kv 0.2.0-draft2 for redis and cosmosdb
Signed-off-by: David Justice <[email protected]>
1 parent a0a132f commit 50f22a1

File tree

13 files changed

+460
-90
lines changed

13 files changed

+460
-90
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-key-value/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spin-world = { path = "../world" }
1616
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
1717
toml = { workspace = true }
1818
tracing = { workspace = true }
19+
thiserror = { workspace = true }
1920

2021
[dev-dependencies]
2122
spin-factors-test = { path = "../factors-test" }

crates/factor-key-value/src/host.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::Cas;
1+
use super::{Cas, SwapError};
22
use anyhow::{Context, Result};
33
use spin_core::{async_trait, wasmtime::component::Resource};
44
use spin_resource_table::Table;
@@ -378,22 +378,21 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
378378
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
379379

380380
match cas.swap(value).await {
381-
Ok(cas) => Ok(Ok(())),
382-
Err(err) => {
383-
if err.to_string().contains("CAS_ERROR") {
381+
Ok(_) => Ok(Ok(())),
382+
Err(err) => match err {
383+
SwapError::CasFailed(_) => {
384384
let bucket = Resource::new_own(cas.bucket_rep().await);
385385
let new_cas = self.new(bucket, cas.key().await).await?;
386386
let new_cas_rep = new_cas.rep();
387387
self.current(Resource::new_own(new_cas_rep)).await?;
388388
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
389389
new_cas_rep,
390390
))))
391-
} else {
392-
Err(anyhow::Error::new(CasError::StoreError(
393-
atomics::Error::Other(err.to_string()),
394-
)))
395391
}
396-
}
392+
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
393+
atomics::Error::Other(msg),
394+
))),
395+
},
397396
}
398397
}
399398
}
@@ -403,6 +402,11 @@ pub fn log_error(err: impl std::fmt::Debug) -> Error {
403402
Error::Other(format!("{err:?}"))
404403
}
405404

405+
pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
406+
tracing::warn!("key-value error: {err:?}");
407+
SwapError::Other(format!("{err:?}"))
408+
}
409+
406410
use spin_world::v1::key_value::Error as LegacyError;
407411
use spin_world::wasi::keyvalue::atomics;
408412
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};

crates/factor-key-value/src/lib.rs

+27-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use spin_locked_app::MetadataKey;
1515

1616
/// Metadata key for key-value stores.
1717
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
18-
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
18+
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
1919
pub use runtime_config::RuntimeConfig;
2020
use spin_core::async_trait;
2121
pub use util::{CachingStoreManager, DelegatingStoreManager};
@@ -42,6 +42,8 @@ impl Factor for KeyValueFactor {
4242
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
4343
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
4444
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
45+
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
46+
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
4547
Ok(())
4648
}
4749

@@ -133,10 +135,33 @@ impl AppState {
133135
}
134136
}
135137

138+
/// `SwapError` are errors that occur during compare and swap operations
139+
#[derive(Debug, thiserror::Error)]
140+
pub enum SwapError {
141+
#[error("{0}")]
142+
CasFailed(String),
143+
144+
#[error("{0}")]
145+
Other(String),
146+
}
147+
148+
/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
149+
///
150+
/// `current` is expected to get the current value for the key associated with the CAS operation
151+
/// while also starting what is needed to ensure the value to be replaced will not have mutated
152+
/// between the time of calling `current` and `swap`. For example, a get from a backend store
153+
/// may provide the caller with an etag (a version stamp), which can be used with an if-match
154+
/// header to ensure the version updated is the version that was read (optimistic concurrency).
155+
/// Rather than an etag, one could start a transaction, if supported by the backing store, which
156+
/// would provide atomicity.
157+
///
158+
/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
159+
/// operation. If there was no key / value with the given key in the store, the `swap` operation
160+
/// should **insert** the key and value, disallowing an update.
136161
#[async_trait]
137162
pub trait Cas: Sync + Send {
138163
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
139-
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), Error>;
164+
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
140165
async fn bucket_rep(&self) -> u32;
141166
async fn key(&self) -> String;
142167
}

crates/factor-key-value/src/util.rs

+100-31
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Cas, Error, Store, StoreManager};
1+
use crate::{Cas, Error, Store, StoreManager, SwapError};
22
use lru::LruCache;
33
use spin_core::async_trait;
44
use std::{
@@ -92,10 +92,10 @@ impl<T: StoreManager> StoreManager for CachingStoreManager<T> {
9292
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
9393
Ok(Arc::new(CachingStore {
9494
inner: self.inner.get(name).await?,
95-
state: AsyncMutex::new(CachingStoreState {
95+
state: Arc::new(AsyncMutex::new(CachingStoreState {
9696
cache: LruCache::new(self.capacity),
9797
previous_task: None,
98-
}),
98+
})),
9999
}))
100100
}
101101

@@ -143,7 +143,7 @@ impl CachingStoreState {
143143

144144
struct CachingStore {
145145
inner: Arc<dyn Store>,
146-
state: AsyncMutex<CachingStoreState>,
146+
state: Arc<AsyncMutex<CachingStoreState>>,
147147
}
148148

149149
#[async_trait]
@@ -242,49 +242,118 @@ impl Store for CachingStore {
242242
&self,
243243
keys: Vec<String>,
244244
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
245-
// // Retrieve the specified value from the cache, lazily populating the cache as necessary.
246-
// let mut state = self.state.lock().await;
247-
//
248-
// let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
249-
// let mut keys_not_found: Vec<String> = Vec::new();
250-
// for key in keys {
251-
// match state.cache.get(key.as_str()).cloned() {
252-
// Some(value) => keys_and_values.push(Some((key, value))),
253-
// None => keys_not_found.push(key),
254-
// }
255-
// }
256-
//
257-
// // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
258-
// // cache prior to their corresponding writes reaching the backing store.
259-
// state.flush().await?;
260-
//
261-
// let value = self.inner.get(key).await?;
262-
//
263-
// state.cache.put(key.to_owned(), value.clone());
264-
//
265-
// Ok(value)
266-
//
245+
let mut state = self.state.lock().await;
246+
let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
247+
let mut not_found: Vec<String> = Vec::new();
248+
for key in keys {
249+
match state.cache.get(key.as_str()) {
250+
Some(Some(value)) => found.push((key, Some(value.clone()))),
251+
_ => not_found.push(key),
252+
}
253+
}
254+
255+
let keys_and_values = self.inner.get_many(not_found).await?;
256+
for (key, value) in keys_and_values {
257+
found.push((key.clone(), value.clone()));
258+
state.cache.put(key, value);
259+
}
267260

268-
todo!()
261+
Ok(found)
269262
}
270263

271264
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
272-
todo!()
265+
let mut state = self.state.lock().await;
266+
267+
for (key, value) in key_values.clone() {
268+
state.cache.put(key, Some(value));
269+
}
270+
271+
self.inner.set_many(key_values).await
273272
}
274273

275274
async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
276-
todo!()
275+
let mut state = self.state.lock().await;
276+
277+
for key in keys.clone() {
278+
state.cache.put(key, None);
279+
}
280+
281+
self.inner.delete_many(keys).await
277282
}
278283

279284
async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
280-
todo!()
285+
let mut state = self.state.lock().await;
286+
let counter = self.inner.increment(key.clone(), delta).await?;
287+
state
288+
.cache
289+
.put(key, Some(i64::to_le_bytes(counter).to_vec()));
290+
Ok(counter)
281291
}
282292

283293
async fn new_compare_and_swap(
284294
&self,
285295
bucket_rep: u32,
286296
key: &str,
287297
) -> anyhow::Result<Arc<dyn Cas>, Error> {
288-
todo!()
298+
let inner = self.inner.new_compare_and_swap(bucket_rep, key).await?;
299+
Ok(Arc::new(CompareAndSwap {
300+
bucket_rep,
301+
state: self.state.clone(),
302+
key: key.to_string(),
303+
inner_cas: inner,
304+
}))
305+
}
306+
}
307+
308+
struct CompareAndSwap {
309+
bucket_rep: u32,
310+
key: String,
311+
state: Arc<AsyncMutex<CachingStoreState>>,
312+
inner_cas: Arc<dyn Cas>,
313+
}
314+
315+
#[async_trait]
316+
impl Cas for CompareAndSwap {
317+
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error> {
318+
let mut state = self.state.lock().await;
319+
state.flush().await?;
320+
let res = self.inner_cas.current().await;
321+
match res.clone() {
322+
Ok(value) => {
323+
state.cache.put(self.key.clone(), value.clone());
324+
state.flush().await?;
325+
Ok(value)
326+
}
327+
Err(err) => Err(err),
328+
}?;
329+
res
330+
}
331+
332+
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError> {
333+
let mut state = self.state.lock().await;
334+
state
335+
.flush()
336+
.await
337+
.map_err(|_e| SwapError::Other("failed flushing".to_string()))?;
338+
let res = self.inner_cas.swap(value.clone()).await;
339+
match res {
340+
Ok(()) => {
341+
state.cache.put(self.key.clone(), Some(value));
342+
state
343+
.flush()
344+
.await
345+
.map_err(|_e| SwapError::Other("failed flushing".to_string()))?;
346+
Ok(())
347+
}
348+
Err(err) => Err(err),
349+
}
350+
}
351+
352+
async fn bucket_rep(&self) -> u32 {
353+
self.bucket_rep
354+
}
355+
356+
async fn key(&self) -> String {
357+
self.key.clone()
289358
}
290359
}

crates/factor-key-value/tests/factor_test.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::bail;
22
use spin_core::async_trait;
3-
use spin_factor_key_value::{KeyValueFactor, RuntimeConfig, Store, StoreManager};
3+
use spin_factor_key_value::{Cas, KeyValueFactor, RuntimeConfig, Store, StoreManager};
44
use spin_factors::RuntimeFactors;
55
use spin_factors_test::{toml, TestEnvironment};
66
use spin_world::v2::key_value::{Error, HostStore};
@@ -140,4 +140,36 @@ impl Store for MockStore {
140140
async fn get_keys(&self) -> Result<Vec<String>, Error> {
141141
todo!()
142142
}
143+
144+
async fn get_many(
145+
&self,
146+
keys: Vec<String>,
147+
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
148+
let _ = keys;
149+
todo!()
150+
}
151+
152+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
153+
let _ = key_values;
154+
todo!()
155+
}
156+
157+
async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
158+
let _ = keys;
159+
todo!()
160+
}
161+
162+
async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
163+
let (_, _) = (key, delta);
164+
todo!()
165+
}
166+
167+
async fn new_compare_and_swap(
168+
&self,
169+
bucket_rep: u32,
170+
key: &str,
171+
) -> anyhow::Result<Arc<dyn Cas>, Error> {
172+
let (_, _) = (key, bucket_rep);
173+
todo!()
174+
}
143175
}

crates/key-value-azure/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ rust-version.workspace = true
1212
anyhow = { workspace = true }
1313
azure_data_cosmos = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
1414
azure_identity = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
15+
azure_core = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
1516
futures = { workspace = true }
1617
serde = { workspace = true }
1718
spin-core = { path = "../core" }

0 commit comments

Comments
 (0)