Skip to content

Commit 92d4f4f

Browse files
committed
feat:multithread made necessary modules public for using in mutithreaded envs, implemented ChunkReader and provided example
1 parent da8f183 commit 92d4f4f

File tree

7 files changed

+321
-70
lines changed

7 files changed

+321
-70
lines changed

Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ exclude = ["tests/images/*", "tests/fuzz_images/*"]
1818

1919
[features]
2020
async_decoder = ["dep:futures", "dep:async-trait"]
21-
ehttp = ["dep:ehttp", "async_decoder"]
21+
multithread = ["async_decoder"]
22+
# only for async example reading a COG
23+
ehttp = ["async_decoder", "dep:ehttp"]
2224

2325
[dependencies]
2426
weezl = "0.1.0"
@@ -44,4 +46,9 @@ description = "Example showing use of async features using async http requests"
4446
[[example]]
4547
name = "async_http"
4648
path="examples/async_http.rs"
47-
required-features=["ehttp"]
49+
required-features=["ehttp"]
50+
51+
[package.metadata.example.multithread]
52+
name = "multithread_http"
53+
path="examples/multithread_http.rs"
54+
required-features=["ehttp", "async_decoder"]

examples/multithread_http.rs

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6
2+
use std::io::{Result, SeekFrom};
3+
use std::pin::Pin;
4+
use std::sync::Arc;
5+
use futures::{
6+
future::BoxFuture,
7+
io::{AsyncRead, AsyncSeek},
8+
Future,
9+
};
10+
use tiff::decoder::Decoder;
11+
12+
// extern crate ehttp;
13+
14+
// Arc for sharing, see https://users.rust-lang.org/t/how-to-clone-a-boxed-closure/31035/9
15+
// or https://stackoverflow.com/a/27883612/14681457
16+
pub type F = Arc<
17+
dyn Fn(u64, u64) -> BoxFuture<'static, std::io::Result<SeekOutput>> + Send + Sync,
18+
>;
19+
pub struct RangedStreamer {
20+
pos: u64,
21+
length: u64, // total size
22+
state: State,
23+
range_get: F,
24+
min_request_size: usize, // requests have at least this size
25+
}
26+
27+
/// This is a fake clone, that doesn't clone the currently pending task, but everything else
28+
impl Clone for RangedStreamer {
29+
fn clone(&self) -> Self {
30+
RangedStreamer {
31+
pos: self.pos,
32+
length: self.length,
33+
state: State::HasChunk(SeekOutput {
34+
start: 0,
35+
data: vec![],
36+
}),
37+
range_get: self.range_get.clone(),
38+
min_request_size: self.min_request_size,
39+
}
40+
}
41+
}
42+
43+
enum State {
44+
HasChunk(SeekOutput),
45+
Seeking(BoxFuture<'static, std::io::Result<SeekOutput>>),
46+
}
47+
48+
#[derive(Debug, Clone)]
49+
pub struct SeekOutput {
50+
pub start: u64,
51+
pub data: Vec<u8>,
52+
}
53+
54+
55+
56+
impl RangedStreamer {
57+
pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self {
58+
let length = length as u64;
59+
Self {
60+
pos: 0,
61+
length,
62+
state: State::HasChunk(SeekOutput {
63+
start: 0,
64+
data: vec![],
65+
}),
66+
range_get,
67+
min_request_size,
68+
}
69+
}
70+
}
71+
72+
// whether `test_interval` is inside `a` (start, length).
73+
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
74+
if test_interval.0 < a.0 {
75+
return false;
76+
}
77+
let test_end = test_interval.0 + test_interval.1;
78+
let a_end = a.0 + a.1;
79+
if test_end > a_end {
80+
return false;
81+
}
82+
true
83+
}
84+
85+
impl AsyncRead for RangedStreamer {
86+
fn poll_read(
87+
mut self: std::pin::Pin<&mut Self>,
88+
cx: &mut std::task::Context<'_>,
89+
buf: &mut [u8],
90+
) -> std::task::Poll<Result<usize>> {
91+
let requested_range = (self.pos as usize, buf.len());
92+
let min_request_size = self.min_request_size;
93+
match &mut self.state {
94+
State::HasChunk(output) => {
95+
let existing_range = (output.start as usize, output.data.len());
96+
if range_includes(existing_range, requested_range) {
97+
let offset = requested_range.0 - existing_range.0;
98+
buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
99+
self.pos += buf.len() as u64;
100+
std::task::Poll::Ready(Ok(buf.len()))
101+
} else {
102+
let start = requested_range.0 as u64;
103+
let length = std::cmp::max(min_request_size, requested_range.1);
104+
let future = (self.range_get)(start, length.try_into().unwrap());
105+
self.state = State::Seeking(Box::pin(future));
106+
self.poll_read(cx, buf)
107+
}
108+
}
109+
State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
110+
std::task::Poll::Ready(v) => {
111+
match v {
112+
Ok(output) => self.state = State::HasChunk(output),
113+
Err(e) => return std::task::Poll::Ready(Err(e)),
114+
};
115+
self.poll_read(cx, buf)
116+
}
117+
std::task::Poll::Pending => std::task::Poll::Pending,
118+
},
119+
}
120+
}
121+
}
122+
123+
impl AsyncSeek for RangedStreamer {
124+
fn poll_seek(
125+
mut self: std::pin::Pin<&mut Self>,
126+
_: &mut std::task::Context<'_>,
127+
pos: std::io::SeekFrom,
128+
) -> std::task::Poll<Result<u64>> {
129+
match pos {
130+
SeekFrom::Start(pos) => self.pos = pos,
131+
SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
132+
SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
133+
};
134+
std::task::Poll::Ready(Ok(self.pos))
135+
}
136+
}
137+
138+
139+
140+
#[tokio::main]
141+
async fn main() {
142+
let url = "https://isdasoil.s3.amazonaws.com/covariates/dem_30m/dem_30m.tif";
143+
let Ok(url_head) = ehttp::fetch_async(ehttp::Request::head(url)).await else {println!("EPIC FAIL!"); return;};
144+
let length = usize::from_str_radix(url_head.headers.get("content-length").unwrap(), 10).expect("Could not parse content length");
145+
println!("head: {:?}", url_head);
146+
let range_get = Arc::new(move |start: u64, length: u64| {
147+
// let bucket = bucket.clone();
148+
let url = url;
149+
Box::pin(async move {
150+
println!("requested: {} kb", length / 1024);
151+
let mut request = ehttp::Request::get(url);
152+
request.headers.insert("Range".to_string(), format!("bytes={:?}-{:?}",start,start+length));
153+
let resp = ehttp::fetch_async(request).await.map_err(|e| std::io::Error::other(e))?;
154+
if !resp.ok {
155+
Err(std::io::Error::other(format!("Received invalid response: {:?}", resp.status)))
156+
} else {
157+
println!("received: {} kb", resp.bytes.len() / 1024);
158+
Ok(SeekOutput {start, data: resp.bytes})
159+
}
160+
}) as BoxFuture<'static, std::io::Result<SeekOutput>>
161+
});
162+
let reader = RangedStreamer::new(length, 30*1024, range_get);
163+
164+
// this decoder will read all necessary tags
165+
let decoder = Decoder::new_overview_async(reader, 0).await.expect("oh noes!");
166+
println!("initialized decoder");
167+
let cloneable_decoder = tiff::decoder::ChunkDecoder::from_decoder(decoder);
168+
169+
let mut handles = Vec::new();
170+
for chunk in 42..69 {
171+
let mut cloned_decoder = cloneable_decoder.clone();
172+
173+
let handle = tokio::spawn(async move {
174+
175+
let result = cloned_decoder.read_chunk_async(chunk).await;
176+
match result {
177+
Ok(data) => {
178+
println!("Successfully read chunk {}", chunk);
179+
Ok(data) // Return the data for collection
180+
}
181+
Err(e) => {
182+
eprintln!("Error reading chunk {}: {:?}", chunk, e);
183+
Err(e) // Return the error for handling
184+
}
185+
}
186+
});
187+
handles.push(handle);
188+
}
189+
190+
let results = futures::future::join_all(handles).await;
191+
for r in results {
192+
println!("result: {:?}", r.expect("idk").expect("idk²").len())
193+
}
194+
}

src/decoder/async_decoder/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures::{
55
};
66
use std::collections::{HashMap, HashSet};
77

8-
use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedError, UsageError};
8+
use crate::{TiffError, TiffFormatError, TiffResult, TiffUnsupportedError};
99

1010
// use self::ifd::Directory;
1111
// use self::image::Image;
@@ -18,7 +18,7 @@ use crate::decoder::{
1818
Decoder,
1919
ifd::{Value, Directory}, Image, stream::{
2020
ByteOrder, SmartReader,
21-
}, ChunkType, DecodingBuffer, DecodingResult, Limits,
21+
}, ChunkType, DecodingBuffer, DecodingResult,
2222
};
2323

2424
use stream::AsyncEndianReader;
@@ -453,7 +453,7 @@ impl<R: AsyncRead + AsyncSeek + RangeReader + Unpin + Send> Decoder<R> {
453453
pub async fn read_chunk_async(&mut self, chunk_index: u32) -> TiffResult<DecodingResult> {
454454
let data_dims = self.image().chunk_data_dimensions(chunk_index)?;
455455

456-
let mut result = self.result_buffer(data_dims.0 as usize, data_dims.1 as usize)?;
456+
let mut result = Self::result_buffer(data_dims.0 as usize, data_dims.1 as usize, self.image(), &self.limits)?;
457457

458458
self.read_chunk_to_buffer_async(result.as_buffer(0), chunk_index, data_dims.0 as usize)
459459
.await?;
@@ -465,7 +465,7 @@ impl<R: AsyncRead + AsyncSeek + RangeReader + Unpin + Send> Decoder<R> {
465465
pub async fn read_image_async(&mut self) -> TiffResult<DecodingResult> {
466466
let width = self.image().width;
467467
let height = self.image().height;
468-
let mut result = self.result_buffer(width as usize, height as usize)?;
468+
let mut result = Self::result_buffer(usize::try_from(width)?, usize::try_from(height)?, self.image(), &self.limits )?;
469469
if width == 0 || height == 0 {
470470
return Ok(result);
471471
}

src/decoder/ifd.rs

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::collections::HashMap;
44
use std::io::{self, Read, Seek};
5-
use std::mem;
65
use std::str;
76

87
use super::stream::{ByteOrder, EndianReader, SmartReader};
@@ -679,40 +678,6 @@ impl Entry {
679678
}
680679
Ok(List(v))
681680
}
682-
683-
#[inline]
684-
fn decode_offset<R, F>(
685-
&self,
686-
value_count: u64,
687-
bo: ByteOrder,
688-
bigtiff: bool,
689-
limits: &super::Limits,
690-
reader: &mut SmartReader<R>,
691-
decode_fn: F,
692-
) -> TiffResult<Value>
693-
where
694-
R: Read + Seek,
695-
F: Fn(&mut SmartReader<R>) -> TiffResult<Value>,
696-
{
697-
let value_count = usize::try_from(value_count)?;
698-
if value_count > limits.decoding_buffer_size / mem::size_of::<Value>() {
699-
return Err(TiffError::LimitsExceeded);
700-
}
701-
702-
let mut v = Vec::with_capacity(value_count);
703-
704-
let offset = if bigtiff {
705-
self.r(bo).read_u64()?
706-
} else {
707-
self.r(bo).read_u32()?.into()
708-
};
709-
reader.goto_offset(offset)?;
710-
711-
for _ in 0..value_count {
712-
v.push(decode_fn(reader)?)
713-
}
714-
Ok(List(v))
715-
}
716681
}
717682

718683
/// Extracts a list of BYTE tags stored in an offset

src/decoder/image.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedEr
1010
use std::io::{self, Cursor, Read, Seek};
1111
use std::sync::Arc;
1212

13-
#[derive(Debug)]
14-
pub(crate) struct StripDecodeState {
13+
#[derive(Debug, Clone)]
14+
pub struct StripDecodeState {
1515
pub rows_per_strip: u32,
1616
}
1717

18-
#[derive(Debug)]
18+
#[derive(Debug, Clone)]
1919
/// Computed values useful for tile decoding
20-
pub(crate) struct TileAttributes {
20+
pub struct TileAttributes {
2121
pub image_width: usize,
2222
pub image_height: usize,
2323

@@ -58,8 +58,8 @@ impl TileAttributes {
5858
}
5959
}
6060

61-
#[derive(Debug)]
62-
pub(crate) struct Image {
61+
#[derive(Debug, Clone)]
62+
pub struct Image {
6363
pub ifd: Option<Directory>,
6464
pub width: u32,
6565
pub height: u32,
@@ -679,16 +679,14 @@ impl Image {
679679
}
680680
}
681681
} else {
682-
for (i, row) in buf
682+
for (_, row) in buf
683683
.chunks_mut(output_row_stride)
684684
.take(data_dims.1 as usize)
685685
.enumerate()
686686
{
687687
let row = &mut row[..data_row_bytes];
688688
reader.read_exact(row)?;
689689

690-
println!("chunk={chunk_index}, index={i}");
691-
692690
// Skip horizontal padding
693691
if chunk_row_bytes > data_row_bytes {
694692
let len = u64::try_from(chunk_row_bytes - data_row_bytes)?;

0 commit comments

Comments
 (0)