Skip to content

Remove our own tokio runtime and use the one provided by pyo3-async-runtimes #441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions obstore/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ use pyo3::exceptions::{PyIOError, PyStopAsyncIteration, PyStopIteration};
use pyo3::prelude::*;
use pyo3::types::PyString;
use pyo3::{intern, IntoPyObjectExt};
use pyo3_async_runtimes::tokio::future_into_py;
use pyo3_async_runtimes::tokio::{future_into_py, get_runtime};
use pyo3_bytes::PyBytes;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Lines};
use tokio::sync::Mutex;

use crate::attributes::PyAttributes;
use crate::list::PyObjectMeta;
use crate::runtime::get_runtime;
use crate::tags::PyTagSet;

#[pyfunction]
Expand All @@ -28,7 +27,7 @@ pub(crate) fn open_reader(
buffer_size: usize,
) -> PyObjectStoreResult<PyReadableFile> {
let store = store.into_inner();
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let (reader, meta) =
py.allow_threads(|| runtime.block_on(create_reader(store, path, buffer_size)))?;
Ok(PyReadableFile::new(reader, meta, false))
Expand Down Expand Up @@ -105,7 +104,7 @@ impl PyReadableFile {
let out = future_into_py(py, read(reader, size))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(read(reader, size)))?;
out.into_py_any(py)
}
Expand All @@ -121,7 +120,7 @@ impl PyReadableFile {
let out = future_into_py(py, readline(reader))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(readline(reader)))?;
out.into_py_any(py)
}
Expand All @@ -135,7 +134,7 @@ impl PyReadableFile {
let out = future_into_py(py, readlines(reader, hint))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(readlines(reader, hint)))?;
out.into_py_any(py)
}
Expand Down Expand Up @@ -163,7 +162,7 @@ impl PyReadableFile {
let out = future_into_py(py, seek(reader, pos))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(seek(reader, pos)))?;
out.into_py_any(py)
}
Expand All @@ -184,7 +183,7 @@ impl PyReadableFile {
let out = future_into_py(py, tell(reader))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(tell(reader)))?;
out.into_py_any(py)
}
Expand Down Expand Up @@ -267,7 +266,7 @@ impl PyLinesReader {
}

fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<String> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let lines = self.0.clone();
py.allow_threads(|| runtime.block_on(next_line(lines, false)))
}
Expand Down Expand Up @@ -376,7 +375,7 @@ impl PyWritableFile {
traceback: Option<PyObject>,
) -> PyResult<()> {
let writer = self.writer.clone();
let runtime = get_runtime(py)?;
let runtime = get_runtime();
if exc_type.is_some() {
py.allow_threads(|| runtime.block_on(abort_writer(writer)))?;
} else {
Expand All @@ -395,7 +394,7 @@ impl PyWritableFile {
traceback: Option<PyObject>,
) -> PyResult<Bound<'py, PyAny>> {
let writer = self.writer.clone();
let runtime = get_runtime(py)?;
let runtime = get_runtime();
if exc_type.is_some() {
future_into_py(py, abort_writer(writer))
} else {
Expand All @@ -409,7 +408,7 @@ impl PyWritableFile {
let out = future_into_py(py, close_writer(writer))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
py.allow_threads(|| runtime.block_on(close_writer(writer)))?;
Ok(py.None())
}
Expand All @@ -436,7 +435,7 @@ impl PyWritableFile {
let out = future_into_py(py, is_closed(writer))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(is_closed(writer)))?;
out.into_py_any(py)
}
Expand All @@ -448,7 +447,7 @@ impl PyWritableFile {
let out = future_into_py(py, flush(writer))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
py.allow_threads(|| runtime.block_on(flush(writer)))?;
Ok(py.None())
}
Expand All @@ -460,7 +459,7 @@ impl PyWritableFile {
let out = future_into_py(py, write(writer, buffer))?;
Ok(out.unbind())
} else {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let out = py.allow_threads(|| runtime.block_on(write(writer, buffer)))?;
out.into_py_any(py)
}
Expand Down
4 changes: 2 additions & 2 deletions obstore/src/copy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use object_store::ObjectStore;
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};

use crate::runtime::get_runtime;
use crate::utils::PyNone;

#[pyfunction]
Expand All @@ -14,7 +14,7 @@ pub(crate) fn copy(
to: String,
overwrite: bool,
) -> PyObjectStoreResult<()> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let from_ = from_.into();
let to = to.into();
py.allow_threads(|| {
Expand Down
4 changes: 2 additions & 2 deletions obstore/src/delete.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use futures::{StreamExt, TryStreamExt};
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};

use crate::path::PyPaths;
use crate::runtime::get_runtime;
use crate::utils::PyNone;

#[pyfunction]
pub(crate) fn delete(py: Python, store: PyObjectStore, paths: PyPaths) -> PyObjectStoreResult<()> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let store = store.into_inner();
py.allow_threads(|| {
match paths {
Expand Down
14 changes: 7 additions & 7 deletions obstore/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use futures::StreamExt;
use object_store::{Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_bytes::PyBytes;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use tokio::sync::Mutex;

use crate::attributes::PyAttributes;
use crate::list::PyObjectMeta;
use crate::runtime::get_runtime;

/// 10MB default chunk size
const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl PyGetResult {
.unwrap()
.take()
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
let runtime = get_runtime(py)?;
let runtime = get_runtime();
py.allow_threads(|| {
let bytes = runtime.block_on(get_result.bytes())?;
Ok::<_, PyObjectStoreError>(PyBytes::new(bytes))
Expand Down Expand Up @@ -282,8 +282,8 @@ impl PyBytesStream {
)
}

fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<PyBytesWrapper> {
let runtime = get_runtime(py)?;
fn __next__(&self) -> PyResult<PyBytesWrapper> {
let runtime = get_runtime();
let stream = self.stream.clone();
runtime.block_on(next_stream(stream, self.min_chunk_size, true))
}
Expand Down Expand Up @@ -329,7 +329,7 @@ pub(crate) fn get(
path: String,
options: Option<PyGetOptions>,
) -> PyObjectStoreResult<PyGetResult> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
py.allow_threads(|| {
let path = &path.into();
let fut = if let Some(options) = options {
Expand Down Expand Up @@ -372,7 +372,7 @@ pub(crate) fn get_range(
end: Option<u64>,
length: Option<u64>,
) -> PyObjectStoreResult<pyo3_bytes::PyBytes> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let range = params_to_range(start, end, length)?;
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_range(&path.into(), range))?;
Expand Down Expand Up @@ -426,7 +426,7 @@ pub(crate) fn get_ranges(
ends: Option<Vec<u64>>,
lengths: Option<Vec<u64>>,
) -> PyObjectStoreResult<Vec<pyo3_bytes::PyBytes>> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let ranges = params_to_ranges(starts, ends, lengths)?;
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_ranges(&path.into(), &ranges))?;
Expand Down
4 changes: 2 additions & 2 deletions obstore/src/head.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};

use crate::list::PyObjectMeta;
use crate::runtime::get_runtime;

#[pyfunction]
pub fn head(py: Python, store: PyObjectStore, path: String) -> PyObjectStoreResult<PyObjectMeta> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let store = store.into_inner();

py.allow_threads(|| {
Expand Down
1 change: 0 additions & 1 deletion obstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ mod list;
mod path;
mod put;
mod rename;
mod runtime;
mod scheme;
mod signer;
mod tags;
Expand Down
13 changes: 6 additions & 7 deletions obstore/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3::{intern, IntoPyObjectExt};
use pyo3_arrow::{PyRecordBatch, PyTable};
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use tokio::sync::Mutex;

use crate::runtime::get_runtime;

pub(crate) struct PyObjectMeta(ObjectMeta);

impl PyObjectMeta {
Expand Down Expand Up @@ -105,8 +104,8 @@ impl PyListStream {
slf
}

fn collect(&self, py: Python) -> PyResult<PyListIterResult> {
let runtime = get_runtime(py)?;
fn collect(&self) -> PyResult<PyListIterResult> {
let runtime = get_runtime();
let stream = self.stream.clone();
runtime.block_on(collect_stream(stream, self.return_arrow))
}
Expand All @@ -124,8 +123,8 @@ impl PyListStream {
)
}

fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<PyListIterResult> {
let runtime = get_runtime(py)?;
fn __next__(&self) -> PyResult<PyListIterResult> {
let runtime = get_runtime();
let stream = self.stream.clone();
runtime.block_on(next_stream(
stream,
Expand Down Expand Up @@ -435,7 +434,7 @@ pub(crate) fn list_with_delimiter(
prefix: Option<String>,
return_arrow: bool,
) -> PyObjectStoreResult<PyListResult> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
py.allow_threads(|| {
let out = runtime.block_on(list_with_delimiter_materialize(
store.into_inner(),
Expand Down
5 changes: 2 additions & 3 deletions obstore/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyDict;
use pyo3::{intern, IntoPyObjectExt};
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_bytes::PyBytes;
use pyo3_file::PyFileLikeObject;
use pyo3_object_store::{PyObjectStore, PyObjectStoreResult};

use crate::attributes::PyAttributes;
use crate::runtime::get_runtime;
use crate::tags::PyTagSet;

pub(crate) struct PyPutMode(PutMode);
Expand Down Expand Up @@ -294,7 +294,6 @@ impl<'py> IntoPyObject<'py> for PyPutResult {
#[pyo3(signature = (store, path, file, *, attributes=None, tags=None, mode=None, use_multipart=None, chunk_size=5242880, max_concurrency=12))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn put(
py: Python,
store: PyObjectStore,
path: String,
mut file: PutInput,
Expand Down Expand Up @@ -324,7 +323,7 @@ pub(crate) fn put(
}
}

let runtime = get_runtime(py)?;
let runtime = get_runtime();
if use_multipart {
runtime.block_on(put_multipart_inner(
store.into_inner(),
Expand Down
4 changes: 2 additions & 2 deletions obstore/src/rename.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use object_store::ObjectStore;
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};

use crate::runtime::get_runtime;
use crate::utils::PyNone;

#[pyfunction]
Expand All @@ -14,7 +14,7 @@ pub(crate) fn rename(
to: String,
overwrite: bool,
) -> PyObjectStoreResult<()> {
let runtime = get_runtime(py)?;
let runtime = get_runtime();
let from_ = from_.into();
let to = to.into();
py.allow_threads(|| {
Expand Down
38 changes: 0 additions & 38 deletions obstore/src/runtime.rs

This file was deleted.

Loading
Loading