diff --git a/obstore/src/buffered.rs b/obstore/src/buffered.rs index 49eafb9..8d22143 100644 --- a/obstore/src/buffered.rs +++ b/obstore/src/buffered.rs @@ -8,7 +8,7 @@ use pyo3::exceptions::{PyIOError, PyStopAsyncIteration, PyStopIteration}; use pyo3::prelude::*; use pyo3::types::PyString; use pyo3::{intern, IntoPyObjectExt}; -use pyo3_async_runtimes::tokio::future_into_py; +use pyo3_async_runtimes::tokio::{future_into_py, get_runtime}; use pyo3_bytes::PyBytes; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Lines}; @@ -16,7 +16,6 @@ use tokio::sync::Mutex; use crate::attributes::PyAttributes; use crate::list::PyObjectMeta; -use crate::runtime::get_runtime; use crate::tags::PyTagSet; #[pyfunction] @@ -28,7 +27,7 @@ pub(crate) fn open_reader( buffer_size: usize, ) -> PyObjectStoreResult { let store = store.into_inner(); - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let (reader, meta) = py.allow_threads(|| runtime.block_on(create_reader(store, path, buffer_size)))?; Ok(PyReadableFile::new(reader, meta, false)) @@ -105,7 +104,7 @@ impl PyReadableFile { let out = future_into_py(py, read(reader, size))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(read(reader, size)))?; out.into_py_any(py) } @@ -121,7 +120,7 @@ impl PyReadableFile { let out = future_into_py(py, readline(reader))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(readline(reader)))?; out.into_py_any(py) } @@ -135,7 +134,7 @@ impl PyReadableFile { let out = future_into_py(py, readlines(reader, hint))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(readlines(reader, hint)))?; out.into_py_any(py) } @@ -163,7 +162,7 @@ impl PyReadableFile { let out = future_into_py(py, seek(reader, pos))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(seek(reader, pos)))?; out.into_py_any(py) } @@ -184,7 +183,7 @@ impl PyReadableFile { let out = future_into_py(py, tell(reader))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(tell(reader)))?; out.into_py_any(py) } @@ -267,7 +266,7 @@ impl PyLinesReader { } fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let lines = self.0.clone(); py.allow_threads(|| runtime.block_on(next_line(lines, false))) } @@ -376,7 +375,7 @@ impl PyWritableFile { traceback: Option, ) -> PyResult<()> { let writer = self.writer.clone(); - let runtime = get_runtime(py)?; + let runtime = get_runtime(); if exc_type.is_some() { py.allow_threads(|| runtime.block_on(abort_writer(writer)))?; } else { @@ -395,7 +394,7 @@ impl PyWritableFile { traceback: Option, ) -> PyResult> { let writer = self.writer.clone(); - let runtime = get_runtime(py)?; + let runtime = get_runtime(); if exc_type.is_some() { future_into_py(py, abort_writer(writer)) } else { @@ -409,7 +408,7 @@ impl PyWritableFile { let out = future_into_py(py, close_writer(writer))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); py.allow_threads(|| runtime.block_on(close_writer(writer)))?; Ok(py.None()) } @@ -436,7 +435,7 @@ impl PyWritableFile { let out = future_into_py(py, is_closed(writer))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(is_closed(writer)))?; out.into_py_any(py) } @@ -448,7 +447,7 @@ impl PyWritableFile { let out = future_into_py(py, flush(writer))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); py.allow_threads(|| runtime.block_on(flush(writer)))?; Ok(py.None()) } @@ -460,7 +459,7 @@ impl PyWritableFile { let out = future_into_py(py, write(writer, buffer))?; Ok(out.unbind()) } else { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let out = py.allow_threads(|| runtime.block_on(write(writer, buffer)))?; out.into_py_any(py) } diff --git a/obstore/src/copy.rs b/obstore/src/copy.rs index 0e6ebe0..85eb1ab 100644 --- a/obstore/src/copy.rs +++ b/obstore/src/copy.rs @@ -1,8 +1,8 @@ use object_store::ObjectStore; use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; -use crate::runtime::get_runtime; use crate::utils::PyNone; #[pyfunction] @@ -14,7 +14,7 @@ pub(crate) fn copy( to: String, overwrite: bool, ) -> PyObjectStoreResult<()> { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let from_ = from_.into(); let to = to.into(); py.allow_threads(|| { diff --git a/obstore/src/delete.rs b/obstore/src/delete.rs index 4e7132d..da2ab53 100644 --- a/obstore/src/delete.rs +++ b/obstore/src/delete.rs @@ -1,14 +1,14 @@ use futures::{StreamExt, TryStreamExt}; use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; use crate::path::PyPaths; -use crate::runtime::get_runtime; use crate::utils::PyNone; #[pyfunction] pub(crate) fn delete(py: Python, store: PyObjectStore, paths: PyPaths) -> PyObjectStoreResult<()> { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let store = store.into_inner(); py.allow_threads(|| { match paths { diff --git a/obstore/src/get.rs b/obstore/src/get.rs index 3072092..07823c8 100644 --- a/obstore/src/get.rs +++ b/obstore/src/get.rs @@ -9,13 +9,13 @@ use futures::StreamExt; use object_store::{Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore}; use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError}; use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_bytes::PyBytes; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; use tokio::sync::Mutex; use crate::attributes::PyAttributes; use crate::list::PyObjectMeta; -use crate::runtime::get_runtime; /// 10MB default chunk size const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024; @@ -154,7 +154,7 @@ impl PyGetResult { .unwrap() .take() .ok_or(PyValueError::new_err("Result has already been disposed."))?; - let runtime = get_runtime(py)?; + let runtime = get_runtime(); py.allow_threads(|| { let bytes = runtime.block_on(get_result.bytes())?; Ok::<_, PyObjectStoreError>(PyBytes::new(bytes)) @@ -282,8 +282,8 @@ impl PyBytesStream { ) } - fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult { - let runtime = get_runtime(py)?; + fn __next__(&self) -> PyResult { + let runtime = get_runtime(); let stream = self.stream.clone(); runtime.block_on(next_stream(stream, self.min_chunk_size, true)) } @@ -329,7 +329,7 @@ pub(crate) fn get( path: String, options: Option, ) -> PyObjectStoreResult { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); py.allow_threads(|| { let path = &path.into(); let fut = if let Some(options) = options { @@ -372,7 +372,7 @@ pub(crate) fn get_range( end: Option, length: Option, ) -> PyObjectStoreResult { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let range = params_to_range(start, end, length)?; py.allow_threads(|| { let out = runtime.block_on(store.as_ref().get_range(&path.into(), range))?; @@ -426,7 +426,7 @@ pub(crate) fn get_ranges( ends: Option>, lengths: Option>, ) -> PyObjectStoreResult> { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let ranges = params_to_ranges(starts, ends, lengths)?; py.allow_threads(|| { let out = runtime.block_on(store.as_ref().get_ranges(&path.into(), &ranges))?; diff --git a/obstore/src/head.rs b/obstore/src/head.rs index f6c628e..e33bec3 100644 --- a/obstore/src/head.rs +++ b/obstore/src/head.rs @@ -1,12 +1,12 @@ use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; use crate::list::PyObjectMeta; -use crate::runtime::get_runtime; #[pyfunction] pub fn head(py: Python, store: PyObjectStore, path: String) -> PyObjectStoreResult { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let store = store.into_inner(); py.allow_threads(|| { diff --git a/obstore/src/lib.rs b/obstore/src/lib.rs index a73b556..1e10db0 100644 --- a/obstore/src/lib.rs +++ b/obstore/src/lib.rs @@ -11,7 +11,6 @@ mod list; mod path; mod put; mod rename; -mod runtime; mod scheme; mod signer; mod tags; diff --git a/obstore/src/list.rs b/obstore/src/list.rs index b8dbfe5..365b08a 100644 --- a/obstore/src/list.rs +++ b/obstore/src/list.rs @@ -15,11 +15,10 @@ use pyo3::prelude::*; use pyo3::types::PyDict; use pyo3::{intern, IntoPyObjectExt}; use pyo3_arrow::{PyRecordBatch, PyTable}; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; use tokio::sync::Mutex; -use crate::runtime::get_runtime; - pub(crate) struct PyObjectMeta(ObjectMeta); impl PyObjectMeta { @@ -105,8 +104,8 @@ impl PyListStream { slf } - fn collect(&self, py: Python) -> PyResult { - let runtime = get_runtime(py)?; + fn collect(&self) -> PyResult { + let runtime = get_runtime(); let stream = self.stream.clone(); runtime.block_on(collect_stream(stream, self.return_arrow)) } @@ -124,8 +123,8 @@ impl PyListStream { ) } - fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult { - let runtime = get_runtime(py)?; + fn __next__(&self) -> PyResult { + let runtime = get_runtime(); let stream = self.stream.clone(); runtime.block_on(next_stream( stream, @@ -435,7 +434,7 @@ pub(crate) fn list_with_delimiter( prefix: Option, return_arrow: bool, ) -> PyObjectStoreResult { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); py.allow_threads(|| { let out = runtime.block_on(list_with_delimiter_materialize( store.into_inner(), diff --git a/obstore/src/put.rs b/obstore/src/put.rs index 64157db..69590a8 100644 --- a/obstore/src/put.rs +++ b/obstore/src/put.rs @@ -16,12 +16,12 @@ use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; use pyo3::types::PyDict; use pyo3::{intern, IntoPyObjectExt}; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_bytes::PyBytes; use pyo3_file::PyFileLikeObject; use pyo3_object_store::{PyObjectStore, PyObjectStoreResult}; use crate::attributes::PyAttributes; -use crate::runtime::get_runtime; use crate::tags::PyTagSet; pub(crate) struct PyPutMode(PutMode); @@ -294,7 +294,6 @@ impl<'py> IntoPyObject<'py> for PyPutResult { #[pyo3(signature = (store, path, file, *, attributes=None, tags=None, mode=None, use_multipart=None, chunk_size=5242880, max_concurrency=12))] #[allow(clippy::too_many_arguments)] pub(crate) fn put( - py: Python, store: PyObjectStore, path: String, mut file: PutInput, @@ -324,7 +323,7 @@ pub(crate) fn put( } } - let runtime = get_runtime(py)?; + let runtime = get_runtime(); if use_multipart { runtime.block_on(put_multipart_inner( store.into_inner(), diff --git a/obstore/src/rename.rs b/obstore/src/rename.rs index e71fdc1..0a5a1d0 100644 --- a/obstore/src/rename.rs +++ b/obstore/src/rename.rs @@ -1,8 +1,8 @@ use object_store::ObjectStore; use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; -use crate::runtime::get_runtime; use crate::utils::PyNone; #[pyfunction] @@ -14,7 +14,7 @@ pub(crate) fn rename( to: String, overwrite: bool, ) -> PyObjectStoreResult<()> { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let from_ = from_.into(); let to = to.into(); py.allow_threads(|| { diff --git a/obstore/src/runtime.rs b/obstore/src/runtime.rs deleted file mode 100644 index 6bbdda8..0000000 --- a/obstore/src/runtime.rs +++ /dev/null @@ -1,38 +0,0 @@ -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use pyo3::sync::GILOnceCell; -use tokio::runtime::Runtime; - -static RUNTIME: GILOnceCell = GILOnceCell::new(); -static PID: GILOnceCell = GILOnceCell::new(); - -/// Construct a tokio runtime for sync requests -/// -/// This constructs a runtime with default tokio settings (e.g. [`Runtime::new`]). -/// -/// This runtime can possibly be used in the store creation process (e.g. in the AWS case, for -/// finding shared credentials), and thus any downstream applications may wish to reuse the same -/// runtime. -/// -/// Downstream consumers may explicitly want to depend on tokio and add `rt-multi-thread` as a -/// tokio feature flag to opt-in to the multi-threaded tokio runtime. -pub fn get_runtime(py: Python<'_>) -> PyResult<&'static Runtime> { - let pid = std::process::id(); - let runtime_pid = *PID.get_or_init(py, || pid); - if pid != runtime_pid { - panic!( - "Forked process detected - current PID is {} but the tokio runtime was created by {}. The tokio \ - runtime does not support forked processes https://github.com/tokio-rs/tokio/issues/4301. If you are \ - seeing this message while using Python multithreading make sure to use the `spawn` or `forkserver` \ - mode.", - pid, runtime_pid - ); - } - - let runtime = RUNTIME.get_or_try_init(py, || { - Runtime::new().map_err(|err| { - PyValueError::new_err(format!("Could not create tokio runtime. {}", err)) - }) - })?; - Ok(runtime) -} diff --git a/obstore/src/signer.rs b/obstore/src/signer.rs index 3e3ed48..cdaeb81 100644 --- a/obstore/src/signer.rs +++ b/obstore/src/signer.rs @@ -13,6 +13,7 @@ use pyo3::exceptions::PyValueError; use pyo3::intern; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; +use pyo3_async_runtimes::tokio::get_runtime; use pyo3_object_store::{ MaybePrefixedStore, PyAzureStore, PyGCSStore, PyObjectStoreError, PyObjectStoreResult, PyS3Store, PyUrl, @@ -20,7 +21,6 @@ use pyo3_object_store::{ use url::Url; use crate::path::PyPaths; -use crate::runtime::get_runtime; #[derive(Debug)] pub(crate) enum SignCapableStore { @@ -155,7 +155,7 @@ pub(crate) fn sign( paths: PyPaths, expires_in: Duration, ) -> PyObjectStoreResult { - let runtime = get_runtime(py)?; + let runtime = get_runtime(); let method = method.0; py.allow_threads(|| match paths {