diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index e8a3968b3d..9db193a04e 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -416,6 +416,30 @@ def to_pandas( ) return df, query_job + def to_pandas_batches(self): + """Download results one message at a time.""" + dtypes = dict(zip(self.index_columns, self.index_dtypes)) + dtypes.update(zip(self.value_columns, self.dtypes)) + results_iterator, _ = self._expr.start_query() + for arrow_table in results_iterator.to_arrow_iterable( + bqstorage_client=self._expr._session.bqstoragereadclient + ): + df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) + self._copy_index_to_pandas(df) + yield df + + def _copy_index_to_pandas(self, df: pd.DataFrame): + """Set the index on pandas DataFrame to match this block. + + Warning: This method modifies ``df`` inplace. + """ + if self.index_columns: + df.set_index(list(self.index_columns), inplace=True) + # Pandas names is annotated as list[str] rather than the more + # general Sequence[Label] that BigQuery DataFrames has. + # See: https://github.com/pandas-dev/pandas-stubs/issues/804 + df.index.names = self.index.names # type: ignore + def _compute_and_count( self, value_keys: Optional[Iterable[str]] = None, @@ -489,10 +513,7 @@ def _compute_and_count( else: total_rows = results_iterator.total_rows df = self._to_dataframe(results_iterator) - - if self.index_columns: - df.set_index(list(self.index_columns), inplace=True) - df.index.names = self.index.names # type: ignore + self._copy_index_to_pandas(df) return df, total_rows, query_job diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 869075a970..3fd8319876 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -893,6 +893,10 @@ def to_pandas( self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) + def to_pandas_batches(self) -> Iterable[pandas.DataFrame]: + """Stream DataFrame results to an iterable of pandas DataFrame""" + return self._block.to_pandas_batches() + def _compute_dry_run(self) -> bigquery.QueryJob: return self._block._compute_dry_run() diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 163127b546..1af00a2d01 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -46,20 +46,32 @@ def arrow_to_pandas( # Preserve NA/NaN distinction. Note: This is currently needed, even if we use # nullable Float64Dtype in the types_mapper. See: # https://github.com/pandas-dev/pandas/issues/55668 + mask = pyarrow.compute.is_null(column) + nonnull = pyarrow.compute.fill_null(column, float("nan")) # Regarding type: ignore, this class has been public at this # location since pandas 1.2.0. See: # https://pandas.pydata.org/docs/dev/reference/api/pandas.arrays.FloatingArray.html pd_array = pandas.arrays.FloatingArray( # type: ignore - column.to_numpy(), - pyarrow.compute.is_null(column).to_numpy(), + nonnull.to_numpy() + if isinstance(nonnull, pyarrow.ChunkedArray) + else nonnull.to_numpy(zero_copy_only=False), + mask.to_numpy() + if isinstance(mask, pyarrow.ChunkedArray) + else mask.to_numpy(zero_copy_only=False), ) series = pandas.Series(pd_array, dtype=dtype) elif dtype == pandas.Int64Dtype(): # Avoid out-of-bounds errors in Pandas 1.5.x, which incorrectly # casts to float64 in an intermediate step. + mask = pyarrow.compute.is_null(column) + nonnull = pyarrow.compute.fill_null(column, 0) pd_array = pandas.arrays.IntegerArray( - pyarrow.compute.fill_null(column, 0).to_numpy(), - pyarrow.compute.is_null(column).to_numpy(), + nonnull.to_numpy() + if isinstance(nonnull, pyarrow.ChunkedArray) + else nonnull.to_numpy(zero_copy_only=False), + mask.to_numpy() + if isinstance(mask, pyarrow.ChunkedArray) + else mask.to_numpy(zero_copy_only=False), ) series = pandas.Series(pd_array, dtype=dtype) elif isinstance(dtype, pandas.ArrowDtype): diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index d60083a837..8f5d706f62 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -83,6 +83,14 @@ def test_to_pandas_array_struct_correct_result(session): ) +def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): + """Verify to_pandas_batches() APIs returns the expected dtypes.""" + expected = scalars_df_default_index.dtypes + for df in scalars_df_default_index.to_pandas_batches(): + actual = df.dtypes + pd.testing.assert_series_equal(actual, expected) + + @pytest.mark.parametrize( ("index"), [True, False], diff --git a/tests/unit/session/test_io_pandas.py b/tests/unit/session/test_io_pandas.py index 8b95977ec3..0f6f5dae03 100644 --- a/tests/unit/session/test_io_pandas.py +++ b/tests/unit/session/test_io_pandas.py @@ -231,6 +231,62 @@ ), id="scalar-dtypes", ), + pytest.param( + pyarrow.Table.from_pydict( + { + "bool": pyarrow.chunked_array( + [[True, None], [True, False]], + type=pyarrow.bool_(), + ), + "bytes": pyarrow.chunked_array( + [[b"123", None], [b"abc", b"xyz"]], + type=pyarrow.binary(), + ), + "float": pyarrow.chunked_array( + [[1.0, None], [float("nan"), -1.0]], + type=pyarrow.float64(), + ), + "int": pyarrow.chunked_array( + [[1, None], [-1, 2**63 - 1]], + type=pyarrow.int64(), + ), + "string": pyarrow.chunked_array( + [["123", None], ["abc", "xyz"]], + type=pyarrow.string(), + ), + } + ), + { + "bool": "boolean", + "bytes": "object", + "float": pandas.Float64Dtype(), + "int": pandas.Int64Dtype(), + "string": "string[pyarrow]", + }, + pandas.DataFrame( + { + "bool": pandas.Series([True, None, True, False], dtype="boolean"), + "bytes": [b"123", None, b"abc", b"xyz"], + "float": pandas.Series( + pandas.arrays.FloatingArray( # type: ignore + numpy.array( + [1.0, float("nan"), float("nan"), -1.0], dtype="float64" + ), + numpy.array([False, True, False, False], dtype="bool"), + ), + dtype=pandas.Float64Dtype(), + ), + "int": pandas.Series( + [1, None, -1, 2**63 - 1], + dtype=pandas.Int64Dtype(), + ), + "string": pandas.Series( + ["123", None, "abc", "xyz"], dtype="string[pyarrow]" + ), + } + ), + id="scalar-dtypes-chunked_array", + ), pytest.param( pyarrow.Table.from_pydict( {