@@ -8,8 +8,8 @@ use std::sync::Arc;
8
8
use byteorder:: { BigEndian , LittleEndian , ReadBytesExt } ;
9
9
use bytes:: buf:: Reader ;
10
10
use bytes:: { Buf , Bytes } ;
11
- use futures:: future:: { BoxFuture , FutureExt , TryFutureExt } ;
12
- use object_store :: ObjectStore ;
11
+ use futures:: future:: { BoxFuture , FutureExt } ;
12
+ use futures :: TryFutureExt ;
13
13
14
14
use crate :: error:: { AsyncTiffError , AsyncTiffResult } ;
15
15
@@ -67,45 +67,75 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
67
67
}
68
68
}
69
69
70
- // #[cfg(feature = "tokio")]
71
- // impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Debug + Send + Sync> AsyncFileReader
72
- // for T
73
- // {
74
- // fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
75
- // use tokio::io::{AsyncReadExt, AsyncSeekExt};
76
-
77
- // async move {
78
- // self.seek(std::io::SeekFrom::Start(range.start)).await?;
79
-
80
- // let to_read = (range.end - range.start).try_into().unwrap();
81
- // let mut buffer = Vec::with_capacity(to_read);
82
- // let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
83
- // if read != to_read {
84
- // return Err(AsyncTiffError::EndOfFile(to_read, read));
85
- // }
86
-
87
- // Ok(buffer.into())
88
- // }
89
- // .boxed()
90
- // }
91
- // }
70
+ /// A wrapper for things that implement [AsyncRead] and [AsyncSeek] to also implement
71
+ /// [AsyncFileReader].
72
+ ///
73
+ /// This wrapper is needed because `AsyncRead` and `AsyncSeek` require mutable access to seek and
74
+ /// read data, while the `AsyncFileReader` trait requires immutable access to read data.
75
+ ///
76
+ /// This wrapper stores the inner reader in a `Mutex`.
77
+ ///
78
+ /// [AsyncRead]: tokio::io::AsyncRead
79
+ /// [AsyncSeek]: tokio::io::AsyncSeek
80
+ #[ cfg( feature = "tokio" ) ]
81
+ #[ derive( Debug ) ]
82
+ pub struct TokioReader < T : tokio:: io:: AsyncRead + tokio:: io:: AsyncSeek + Unpin + Send + Debug > (
83
+ tokio:: sync:: Mutex < T > ,
84
+ ) ;
85
+
86
+ #[ cfg( feature = "tokio" ) ]
87
+ impl < T : tokio:: io:: AsyncRead + tokio:: io:: AsyncSeek + Unpin + Send + Debug > TokioReader < T > {
88
+ /// Create a new TokioReader from a reader.
89
+ pub fn new ( inner : T ) -> Self {
90
+ Self ( tokio:: sync:: Mutex :: new ( inner) )
91
+ }
92
+ }
93
+
94
+ #[ cfg( feature = "tokio" ) ]
95
+ impl < T : tokio:: io:: AsyncRead + tokio:: io:: AsyncSeek + Unpin + Send + Debug > AsyncFileReader
96
+ for TokioReader < T >
97
+ {
98
+ fn get_bytes ( & self , range : Range < u64 > ) -> BoxFuture < ' _ , AsyncTiffResult < Bytes > > {
99
+ use std:: io:: SeekFrom ;
100
+ use tokio:: io:: { AsyncReadExt , AsyncSeekExt } ;
101
+
102
+ async move {
103
+ let mut file = self . 0 . lock ( ) . await ;
104
+
105
+ file. seek ( SeekFrom :: Start ( range. start ) ) . await ?;
106
+
107
+ let to_read = range. end - range. start ;
108
+ let mut buffer = Vec :: with_capacity ( to_read as usize ) ;
109
+ let read = file. read ( & mut buffer) . await ? as u64 ;
110
+ if read != to_read {
111
+ return Err ( AsyncTiffError :: EndOfFile ( to_read, read) ) ;
112
+ }
113
+
114
+ Ok ( buffer. into ( ) )
115
+ }
116
+ . boxed ( )
117
+ }
118
+ }
92
119
93
120
/// An AsyncFileReader that reads from an [`ObjectStore`] instance.
121
+ #[ cfg( feature = "object_store" ) ]
94
122
#[ derive( Clone , Debug ) ]
95
123
pub struct ObjectReader {
96
- store : Arc < dyn ObjectStore > ,
124
+ store : Arc < dyn object_store :: ObjectStore > ,
97
125
path : object_store:: path:: Path ,
98
126
}
99
127
128
+ #[ cfg( feature = "object_store" ) ]
100
129
impl ObjectReader {
101
130
/// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path
102
131
///
103
132
/// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
104
- pub fn new ( store : Arc < dyn ObjectStore > , path : object_store:: path:: Path ) -> Self {
133
+ pub fn new ( store : Arc < dyn object_store :: ObjectStore > , path : object_store:: path:: Path ) -> Self {
105
134
Self { store, path }
106
135
}
107
136
}
108
137
138
+ #[ cfg( feature = "object_store" ) ]
109
139
impl AsyncFileReader for ObjectReader {
110
140
fn get_bytes ( & self , range : Range < u64 > ) -> BoxFuture < ' _ , AsyncTiffResult < Bytes > > {
111
141
let range = range. start as _ ..range. end as _ ;
@@ -134,19 +164,22 @@ impl AsyncFileReader for ObjectReader {
134
164
}
135
165
136
166
/// An AsyncFileReader that reads from a URL using reqwest.
167
+ #[ cfg( feature = "reqwest" ) ]
137
168
#[ derive( Debug , Clone ) ]
138
169
pub struct ReqwestReader {
139
170
client : reqwest:: Client ,
140
171
url : reqwest:: Url ,
141
172
}
142
173
174
+ #[ cfg( feature = "reqwest" ) ]
143
175
impl ReqwestReader {
144
176
/// Construct a new ReqwestReader from a reqwest client and URL.
145
177
pub fn new ( client : reqwest:: Client , url : reqwest:: Url ) -> Self {
146
178
Self { client, url }
147
179
}
148
180
}
149
181
182
+ #[ cfg( feature = "reqwest" ) ]
150
183
impl AsyncFileReader for ReqwestReader {
151
184
fn get_bytes ( & self , range : Range < u64 > ) -> BoxFuture < ' _ , AsyncTiffResult < Bytes > > {
152
185
let url = self . url . clone ( ) ;
0 commit comments