Skip to content

Commit d833301

Browse files
authored
Merge pull request #2895 from devigned/wasi-kv
WASI Key Value 0.2.0-draft2 support
2 parents c021968 + 50f22a1 commit d833301

File tree

23 files changed

+1436
-17
lines changed

23 files changed

+1436
-17
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-key-value/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spin-world = { path = "../world" }
1616
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
1717
toml = { workspace = true }
1818
tracing = { workspace = true }
19+
thiserror = { workspace = true }
1920

2021
[dev-dependencies]
2122
spin-factors-test = { path = "../factors-test" }

crates/factor-key-value/src/host.rs

+264-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use super::{Cas, SwapError};
12
use anyhow::{Context, Result};
23
use spin_core::{async_trait, wasmtime::component::Resource};
34
use spin_resource_table::Table;
45
use spin_world::v2::key_value;
6+
use spin_world::wasi::keyvalue as wasi_keyvalue;
57
use std::{collections::HashSet, sync::Arc};
68
use tracing::{instrument, Level};
79

@@ -30,12 +32,19 @@ pub trait Store: Sync + Send {
3032
async fn delete(&self, key: &str) -> Result<(), Error>;
3133
async fn exists(&self, key: &str) -> Result<bool, Error>;
3234
async fn get_keys(&self) -> Result<Vec<String>, Error>;
35+
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
36+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
37+
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
38+
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
39+
async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
40+
-> Result<Arc<dyn Cas>, Error>;
3341
}
3442

3543
pub struct KeyValueDispatch {
3644
allowed_stores: HashSet<String>,
3745
manager: Arc<dyn StoreManager>,
3846
stores: Table<Arc<dyn Store>>,
47+
compare_and_swaps: Table<Arc<dyn Cas>>,
3948
}
4049

4150
impl KeyValueDispatch {
@@ -52,16 +61,43 @@ impl KeyValueDispatch {
5261
allowed_stores,
5362
manager,
5463
stores: Table::new(capacity),
64+
compare_and_swaps: Table::new(capacity),
5565
}
5666
}
5767

58-
pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
68+
pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
5969
self.stores.get(store.rep()).context("invalid store")
6070
}
6171

72+
pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
73+
self.compare_and_swaps
74+
.get(cas.rep())
75+
.context("invalid compare and swap")
76+
}
77+
6278
pub fn allowed_stores(&self) -> &HashSet<String> {
6379
&self.allowed_stores
6480
}
81+
82+
pub fn get_store_wasi<T: 'static>(
83+
&self,
84+
store: Resource<T>,
85+
) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
86+
self.stores
87+
.get(store.rep())
88+
.ok_or(wasi_keyvalue::store::Error::NoSuchStore)
89+
}
90+
91+
pub fn get_cas_wasi<T: 'static>(
92+
&self,
93+
cas: Resource<T>,
94+
) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
95+
self.compare_and_swaps
96+
.get(cas.rep())
97+
.ok_or(wasi_keyvalue::atomics::Error::Other(
98+
"compare and swap not found".to_string(),
99+
))
100+
}
65101
}
66102

67103
#[async_trait]
@@ -141,12 +177,239 @@ impl key_value::HostStore for KeyValueDispatch {
141177
}
142178
}
143179

180+
fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
181+
match e {
182+
Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
183+
Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
184+
Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
185+
Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
186+
}
187+
}
188+
189+
#[async_trait]
190+
impl wasi_keyvalue::store::Host for KeyValueDispatch {
191+
async fn open(
192+
&mut self,
193+
identifier: String,
194+
) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
195+
if self.allowed_stores.contains(&identifier) {
196+
let store = self
197+
.stores
198+
.push(self.manager.get(&identifier).await.map_err(to_wasi_err)?)
199+
.map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
200+
Ok(Resource::new_own(store))
201+
} else {
202+
Err(wasi_keyvalue::store::Error::AccessDenied)
203+
}
204+
}
205+
206+
fn convert_error(
207+
&mut self,
208+
error: spin_world::wasi::keyvalue::store::Error,
209+
) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
210+
Ok(error)
211+
}
212+
}
213+
214+
use wasi_keyvalue::store::Bucket;
215+
#[async_trait]
216+
impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
217+
async fn get(
218+
&mut self,
219+
self_: Resource<Bucket>,
220+
key: String,
221+
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
222+
let store = self.get_store_wasi(self_)?;
223+
store.get(&key).await.map_err(to_wasi_err)
224+
}
225+
226+
async fn set(
227+
&mut self,
228+
self_: Resource<Bucket>,
229+
key: String,
230+
value: Vec<u8>,
231+
) -> Result<(), wasi_keyvalue::store::Error> {
232+
let store = self.get_store_wasi(self_)?;
233+
store.set(&key, &value).await.map_err(to_wasi_err)
234+
}
235+
236+
async fn delete(
237+
&mut self,
238+
self_: Resource<Bucket>,
239+
key: String,
240+
) -> Result<(), wasi_keyvalue::store::Error> {
241+
let store = self.get_store_wasi(self_)?;
242+
store.delete(&key).await.map_err(to_wasi_err)
243+
}
244+
245+
async fn exists(
246+
&mut self,
247+
self_: Resource<Bucket>,
248+
key: String,
249+
) -> Result<bool, wasi_keyvalue::store::Error> {
250+
let store = self.get_store_wasi(self_)?;
251+
store.exists(&key).await.map_err(to_wasi_err)
252+
}
253+
254+
async fn list_keys(
255+
&mut self,
256+
self_: Resource<Bucket>,
257+
cursor: Option<String>,
258+
) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
259+
match cursor {
260+
Some(_) => Err(wasi_keyvalue::store::Error::Other(
261+
"list_keys: cursor not supported".to_owned(),
262+
)),
263+
None => {
264+
let store = self.get_store_wasi(self_)?;
265+
let keys = store.get_keys().await.map_err(to_wasi_err)?;
266+
Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
267+
}
268+
}
269+
}
270+
271+
async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
272+
self.stores.remove(rep.rep());
273+
Ok(())
274+
}
275+
}
276+
277+
#[async_trait]
278+
impl wasi_keyvalue::batch::Host for KeyValueDispatch {
279+
#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
280+
async fn get_many(
281+
&mut self,
282+
bucket: Resource<wasi_keyvalue::batch::Bucket>,
283+
keys: Vec<String>,
284+
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
285+
let store = self.get_store_wasi(bucket)?;
286+
store
287+
.get_many(keys.iter().map(|k| k.to_string()).collect())
288+
.await
289+
.map_err(to_wasi_err)
290+
}
291+
292+
#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
293+
async fn set_many(
294+
&mut self,
295+
bucket: Resource<wasi_keyvalue::batch::Bucket>,
296+
key_values: Vec<(String, Vec<u8>)>,
297+
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
298+
let store = self.get_store_wasi(bucket)?;
299+
store.set_many(key_values).await.map_err(to_wasi_err)
300+
}
301+
302+
#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
303+
async fn delete_many(
304+
&mut self,
305+
bucket: Resource<wasi_keyvalue::batch::Bucket>,
306+
keys: Vec<String>,
307+
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
308+
let store = self.get_store_wasi(bucket)?;
309+
store
310+
.delete_many(keys.iter().map(|k| k.to_string()).collect())
311+
.await
312+
.map_err(to_wasi_err)
313+
}
314+
}
315+
316+
#[async_trait]
317+
impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
318+
async fn new(
319+
&mut self,
320+
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
321+
key: String,
322+
) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
323+
let bucket_rep = bucket.rep();
324+
let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
325+
let store = self.get_store_wasi(bucket)?;
326+
let cas = store
327+
.new_compare_and_swap(bucket_rep, &key)
328+
.await
329+
.map_err(to_wasi_err)?;
330+
self.compare_and_swaps
331+
.push(cas)
332+
.map_err(|()| {
333+
spin_world::wasi::keyvalue::store::Error::Other(
334+
"too many compare_and_swaps opened".to_string(),
335+
)
336+
})
337+
.map(Resource::new_own)
338+
}
339+
340+
async fn current(
341+
&mut self,
342+
cas: Resource<wasi_keyvalue::atomics::Cas>,
343+
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
344+
let cas = self
345+
.get_cas(cas)
346+
.map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
347+
cas.current().await.map_err(to_wasi_err)
348+
}
349+
350+
async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
351+
self.compare_and_swaps.remove(rep.rep());
352+
Ok(())
353+
}
354+
}
355+
356+
#[async_trait]
357+
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
358+
#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
359+
async fn increment(
360+
&mut self,
361+
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
362+
key: String,
363+
delta: i64,
364+
) -> Result<i64, wasi_keyvalue::store::Error> {
365+
let store = self.get_store_wasi(bucket)?;
366+
store.increment(key, delta).await.map_err(to_wasi_err)
367+
}
368+
369+
#[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
370+
async fn swap(
371+
&mut self,
372+
cas_res: Resource<atomics::Cas>,
373+
value: Vec<u8>,
374+
) -> Result<std::result::Result<(), CasError>> {
375+
let cas_rep = cas_res.rep();
376+
let cas = self
377+
.get_cas(Resource::<Bucket>::new_own(cas_rep))
378+
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
379+
380+
match cas.swap(value).await {
381+
Ok(_) => Ok(Ok(())),
382+
Err(err) => match err {
383+
SwapError::CasFailed(_) => {
384+
let bucket = Resource::new_own(cas.bucket_rep().await);
385+
let new_cas = self.new(bucket, cas.key().await).await?;
386+
let new_cas_rep = new_cas.rep();
387+
self.current(Resource::new_own(new_cas_rep)).await?;
388+
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
389+
new_cas_rep,
390+
))))
391+
}
392+
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
393+
atomics::Error::Other(msg),
394+
))),
395+
},
396+
}
397+
}
398+
}
399+
144400
pub fn log_error(err: impl std::fmt::Debug) -> Error {
145401
tracing::warn!("key-value error: {err:?}");
146402
Error::Other(format!("{err:?}"))
147403
}
148404

405+
pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
406+
tracing::warn!("key-value error: {err:?}");
407+
SwapError::Other(format!("{err:?}"))
408+
}
409+
149410
use spin_world::v1::key_value::Error as LegacyError;
411+
use spin_world::wasi::keyvalue::atomics;
412+
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
150413

151414
fn to_legacy_error(value: key_value::Error) -> LegacyError {
152415
match value {

crates/factor-key-value/src/lib.rs

+36-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ use spin_locked_app::MetadataKey;
1515

1616
/// Metadata key for key-value stores.
1717
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
18-
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
18+
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
1919
pub use runtime_config::RuntimeConfig;
20+
use spin_core::async_trait;
2021
pub use util::{CachingStoreManager, DelegatingStoreManager};
2122

2223
/// A factor that provides key-value storage.
@@ -40,6 +41,9 @@ impl Factor for KeyValueFactor {
4041
fn init<T: Send + 'static>(&mut self, mut ctx: InitContext<T, Self>) -> anyhow::Result<()> {
4142
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
4243
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
44+
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
45+
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
46+
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
4347
Ok(())
4448
}
4549

@@ -131,6 +135,37 @@ impl AppState {
131135
}
132136
}
133137

138+
/// `SwapError` are errors that occur during compare and swap operations
139+
#[derive(Debug, thiserror::Error)]
140+
pub enum SwapError {
141+
#[error("{0}")]
142+
CasFailed(String),
143+
144+
#[error("{0}")]
145+
Other(String),
146+
}
147+
148+
/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
149+
///
150+
/// `current` is expected to get the current value for the key associated with the CAS operation
151+
/// while also starting what is needed to ensure the value to be replaced will not have mutated
152+
/// between the time of calling `current` and `swap`. For example, a get from a backend store
153+
/// may provide the caller with an etag (a version stamp), which can be used with an if-match
154+
/// header to ensure the version updated is the version that was read (optimistic concurrency).
155+
/// Rather than an etag, one could start a transaction, if supported by the backing store, which
156+
/// would provide atomicity.
157+
///
158+
/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
159+
/// operation. If there was no key / value with the given key in the store, the `swap` operation
160+
/// should **insert** the key and value, disallowing an update.
161+
#[async_trait]
162+
pub trait Cas: Sync + Send {
163+
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
164+
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
165+
async fn bucket_rep(&self) -> u32;
166+
async fn key(&self) -> String;
167+
}
168+
134169
pub struct InstanceBuilder {
135170
/// The store manager for the app.
136171
///

0 commit comments

Comments
 (0)