Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wasm futures #72

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,26 @@ description = "Low-level asynchronous TIFF reader."
readme = "README.md"

[dependencies]
async-trait = "0.1.88"
byteorder = "1"
bytes = "1.7.0"
flate2 = "1.0.20"
futures = "0.3.31"
jpeg = { package = "jpeg-decoder", version = "0.3.0", default-features = false }
num_enum = "0.7.3"
object_store = "0.12"
object_store = { version = "0.12", optional = true }
# In the future we could make this feature-flagged, but for now we depend on
# object_store which uses reqwest.
reqwest = { version = "0.12", default-features = false }
reqwest = { version = "0.12", default-features = false, optional = true }
thiserror = "1"
tokio = { version = "1.43.0", optional = true }
weezl = "0.1.0"

[dev-dependencies]
tiff = "0.9.1"
tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] }

[features]
default = ["object_store"]
reqwest = ["dep:reqwest"]
object_store = ["dep:object_store", "reqwest"]
1 change: 1 addition & 0 deletions src/cog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl TIFF {
}
}

#[cfg(feature = "object_store")]
#[cfg(test)]
mod test {
use std::io::BufReader;
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum AsyncTiffError {
JPEGDecodingError(#[from] jpeg::Error),

/// Error while fetching data using object store.
#[cfg(feature = "object_store")]
#[error(transparent)]
ObjectStore(#[from] object_store::Error),

Expand All @@ -32,6 +33,7 @@ pub enum AsyncTiffError {
InternalTIFFError(#[from] crate::tiff::TiffError),

/// Reqwest error
#[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/
#[error(transparent)]
ReqwestError(#[from] reqwest::Error),

Expand Down
102 changes: 52 additions & 50 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use std::io::Read;
use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;
use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
use bytes::buf::Reader;
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::future::TryFutureExt;
#[cfg(feature = "object_store")]
use object_store::ObjectStore;

use crate::error::{AsyncTiffError, AsyncTiffResult};
Expand All @@ -29,41 +31,36 @@ use crate::error::{AsyncTiffError, AsyncTiffResult};
/// [`ObjectStore`]: object_store::ObjectStore
///
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AsyncFileReader: Debug + Send + Sync {
/// Retrieve the bytes in `range`
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes`
/// sequentially
fn get_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());

for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
let mut result = Vec::with_capacity(ranges.len());

Ok(result)
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
.boxed()

Ok(result)
}
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.as_ref().get_bytes(range)
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl AsyncFileReader for Box<dyn AsyncFileReader> {
async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
self.get_bytes(range).await
}

fn get_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
self.as_ref().get_byte_ranges(ranges)
async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
self.get_byte_ranges(ranges).await
}
}

Expand Down Expand Up @@ -91,12 +88,13 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
// }

/// An AsyncFileReader that reads from an [`ObjectStore`] instance.
#[cfg(feature = "object_store")]
#[derive(Clone, Debug)]
pub struct ObjectReader {
store: Arc<dyn ObjectStore>,
path: object_store::path::Path,
}

#[cfg(feature = "object_store")]
impl ObjectReader {
/// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path
///
Expand All @@ -105,60 +103,62 @@ impl ObjectReader {
Self { store, path }
}
}

#[cfg(feature = "object_store")]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl AsyncFileReader for ObjectReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
let range = range.start as _..range.end as _;
self.store
.get_range(&self.path, range)
.map_err(|e| e.into())
.boxed()
.await
}

fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>>
where
Self: Send,
{
let ranges = ranges
.into_iter()
.map(|r| r.start as _..r.end as _)
.collect::<Vec<_>>();
async move {
self.store
.get_ranges(&self.path, &ranges)
.await
.map_err(|e| e.into())
}
.boxed()
self.store
.get_ranges(&self.path, &ranges)
.await
.map_err(|e| e.into())
}
}

/// An AsyncFileReader that reads from a URL using reqwest.
#[cfg(feature = "reqwest")]
#[derive(Debug, Clone)]
pub struct ReqwestReader {
client: reqwest::Client,
url: reqwest::Url,
}

#[cfg(feature = "reqwest")]
impl ReqwestReader {
/// Construct a new ReqwestReader from a reqwest client and URL.
pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
Self { client, url }
}
}

#[cfg(feature = "reqwest")]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl AsyncFileReader for ReqwestReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
// }
// fn get_bytes<'async_trait>(&'async_trait self, range: Range<u64>) -> BoxFuture<'async_trait, AsyncTiffResult<Bytes>>
// {
let url = self.url.clone();
let client = self.client.clone();
// HTTP range is inclusive, so we need to subtract 1 from the end
let range = format!("bytes={}-{}", range.start, range.end - 1);
async move {
let response = client.get(url).header("Range", range).send().await?;
let bytes = response.bytes().await?;
Ok(bytes)
}
.boxed()
let response = client.get(url).header("Range", range).send().await?;
let bytes = response.bytes().await?;
Ok(bytes)
}
}

Expand All @@ -177,29 +177,31 @@ impl PrefetchReader {
}
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl AsyncFileReader for PrefetchReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
if range.start < self.buffer.len() as _ {
if range.end < self.buffer.len() as _ {
let usize_range = range.start as usize..range.end as usize;
let result = self.buffer.slice(usize_range);
async { Ok(result) }.boxed()
Ok(result)
} else {
// TODO: reuse partial internal buffer
self.reader.get_bytes(range)
self.reader.get_bytes(range).await
}
} else {
self.reader.get_bytes(range)
self.reader.get_bytes(range).await
}
}

fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>>
where
Self: Send,
{
// In practice, get_byte_ranges is only used for fetching tiles, which are unlikely to
// overlap a metadata prefetch.
self.reader.get_byte_ranges(ranges)
self.reader.get_byte_ranges(ranges).await
}
}

Expand Down