Skip to content

chore: use faster query_and_wait API in _read_gbq_colab #1777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ccdfecf
chore: use faster query_and_wait API in _read_gbq_colab
tswast May 28, 2025
ba81e0b
try to fix unit tests
tswast May 29, 2025
d33812f
more unit test fixes
tswast May 29, 2025
1dc80ee
more test fixes
tswast May 29, 2025
95c8890
Merge remote-tracking branch 'origin/main' into b405372623-read_gbq-a…
tswast May 29, 2025
63117a1
fix mypy
tswast May 29, 2025
fe696ae
Merge remote-tracking branch 'origin/main' into b405372623-read_gbq-a…
tswast May 30, 2025
bbc8294
fix metrics counter in read_gbq with allow_large_results=False
tswast May 30, 2025
12fd221
use managedarrowtable
tswast May 30, 2025
06661c9
Update bigframes/session/loader.py
tswast May 30, 2025
61bf139
Merge branch 'main' into b405372623-read_gbq-allow_large_results
tswast May 30, 2025
fc5df84
Merge remote-tracking branch 'origin/main' into b405372623-read_gbq-a…
tswast May 30, 2025
e22c737
Merge remote-tracking branch 'origin/main' into b405372623-read_gbq-a…
tswast Jun 2, 2025
64979d6
split out a few special case return values for read_gbq_query
tswast Jun 2, 2025
888fd3d
support slice node for repr
tswast Jun 2, 2025
2b75c49
fix failing system test
tswast Jun 2, 2025
63661be
move slice into semiexecutor and out of readlocalnode
tswast Jun 2, 2025
7cd7371
unit test for local executor
tswast Jun 2, 2025
da2bf08
split method instead of using reloads
tswast Jun 3, 2025
492ec81
Merge branch 'main' into b405372623-read_gbq-allow_large_results
tswast Jun 3, 2025
35aba37
fix reference to _start_query
tswast Jun 3, 2025
0a4e987
use limit rewrite for slice support
tswast Jun 3, 2025
bfb4ca2
do not use numpy for offsets
tswast Jun 3, 2025
758544d
Merge remote-tracking branch 'origin/main' into b405372623-read_gbq-a…
tswast Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigframes/blob/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def _create_udf(self):
sql,
job_config=bigquery.QueryJobConfig(),
metrics=self._session._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
)

return udf_name
Expand Down
1 change: 0 additions & 1 deletion bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import bigframes.core.ordering as orderings
import bigframes.core.schema as schemata
import bigframes.core.tree_properties
import bigframes.core.utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
import bigframes.exceptions as bfe
Expand Down
7 changes: 2 additions & 5 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.types as ibis_types
import pyarrow as pa

from bigframes import dtypes, operations
from bigframes.core import expression
from bigframes.core import expression, pyarrow_utils
import bigframes.core.compile.compiled as compiled
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.configs as configs
Expand Down Expand Up @@ -172,9 +171,7 @@ def compile_readlocal(node: nodes.ReadLocalNode, *args):
pa_table = pa_table.rename_columns([item.id.sql for item in node.scan_list.items])

if offsets:
pa_table = pa_table.append_column(
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
)
pa_table = pyarrow_utils.append_offsets(pa_table, offsets)
return compiled.UnorderedIR.from_polars(pa_table, bq_schema)


Expand Down
7 changes: 2 additions & 5 deletions bigframes/core/compile/sqlglot/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import typing

from google.cloud import bigquery
import pyarrow as pa
import sqlglot.expressions as sge

from bigframes.core import expression, guid, identifiers, nodes, rewrite
from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite
from bigframes.core.compile import configs
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
Expand Down Expand Up @@ -155,9 +154,7 @@ def compile_readlocal(self, node: nodes.ReadLocalNode, *args) -> ir.SQLGlotIR:

offsets = node.offsets_col.sql if node.offsets_col else None
if offsets:
pa_table = pa_table.append_column(
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
)
pa_table = pyarrow_utils.append_offsets(pa_table, offsets)

return ir.SQLGlotIR.from_pyarrow(pa_table, node.schema, uid_gen=self.uid_gen)

Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def _adapt_chunked_array(


def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtype]:
"""Normalize the array to managed storage types. Preverse shapes, only transforms values."""
"""Normalize the array to managed storage types. Preserve shapes, only transforms values."""
if array.offset != 0: # Offset arrays don't have all operations implemented
return _adapt_arrow_array(pa.concat_arrays([array]))

Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ def is_limit(self) -> bool:
and (self.stop > 0)
)

@property
def is_noop(self) -> bool:
"""Returns whether this node doesn't actually change the results."""
# TODO: Handle tail case.
return (
((not self.start) or (self.start == 0))
and (self.step == 1)
and ((self.stop is None) or (self.stop == self.row_count))
)

@property
def row_count(self) -> typing.Optional[int]:
child_length = self.child.row_count
Expand Down
9 changes: 9 additions & 0 deletions bigframes/core/pyarrow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,12 @@ def truncate_pyarrow_iterable(
else:
yield batch
total_yielded += batch.num_rows


def append_offsets(
pa_table: pa.Table,
offsets_col: str,
) -> pa.Table:
return pa_table.append_column(
offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64())
)
13 changes: 11 additions & 2 deletions bigframes/core/rewrite/scan_reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Optional

from bigframes.core import nodes
import bigframes.core.rewrite.slices


def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTableNode]:
Expand All @@ -28,7 +29,15 @@ def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTab
return None


def try_reduce_to_local_scan(node: nodes.BigFrameNode) -> Optional[nodes.ReadLocalNode]:
def try_reduce_to_local_scan(
node: nodes.BigFrameNode,
) -> Optional[tuple[nodes.ReadLocalNode, Optional[int]]]:
"""Create a ReadLocalNode with optional limit, if possible.

Similar to ReadApiSemiExecutor._try_adapt_plan.
"""
node, limit = bigframes.core.rewrite.slices.pull_out_limit(node)

if not all(
map(
lambda x: isinstance(x, (nodes.ReadLocalNode, nodes.SelectionNode)),
Expand All @@ -38,7 +47,7 @@ def try_reduce_to_local_scan(node: nodes.BigFrameNode) -> Optional[nodes.ReadLoc
return None
result = node.bottom_up(merge_scan)
if isinstance(result, nodes.ReadLocalNode):
return result
return result, limit
return None


Expand Down
3 changes: 3 additions & 0 deletions bigframes/core/rewrite/slices.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def pull_out_limit(
if (prior_limit is not None) and (prior_limit < limit):
limit = prior_limit
return new_root, limit
if root.is_noop:
new_root, prior_limit = pull_out_limit(root.child)
return new_root, prior_limit
elif (
isinstance(root, (nodes.SelectionNode, nodes.ProjectionNode))
and root.row_preserving
Expand Down
18 changes: 14 additions & 4 deletions bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dataclasses import dataclass
import functools
import typing
from typing import Sequence
from typing import Dict, List, Sequence

import google.cloud.bigquery
import pyarrow
Expand Down Expand Up @@ -47,14 +47,24 @@ def from_bq_table(
column_type_overrides: typing.Optional[
typing.Dict[str, bigframes.dtypes.Dtype]
] = None,
):
return ArraySchema.from_bq_schema(
table.schema, column_type_overrides=column_type_overrides
)

@classmethod
def from_bq_schema(
cls,
schema: List[google.cloud.bigquery.SchemaField],
column_type_overrides: typing.Optional[
Dict[str, bigframes.dtypes.Dtype]
] = None,
):
if column_type_overrides is None:
column_type_overrides = {}
items = tuple(
SchemaItem(name, column_type_overrides.get(name, dtype))
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
table.schema
).items()
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(schema).items()
)
return ArraySchema(items)

Expand Down
8 changes: 6 additions & 2 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,15 @@ def _ensure_dataset_exists(self) -> None:
def _create_bq_function(self, create_function_ddl: str) -> None:
# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
cast(bigquery.Client, self._session.bqclient),
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
location=None,
project=None,
timeout=None,
metrics=None,
query_with_job=True,
)
assert query_job is not None
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

def _format_function_options(self, function_options: dict) -> str:
Expand Down
19 changes: 16 additions & 3 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ def _read_gbq_colab(
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
# TODO(tswast): we may need to allow allow_large_results to be overwritten
# or possibly a general configuration object for an explicit
# destination table and write disposition.
allow_large_results=False,
)

@overload
Expand Down Expand Up @@ -1917,10 +1921,15 @@ def _start_query_ml_ddl(
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
job_config.destination_encryption_configuration = None
iterator, query_job = bf_io_bigquery.start_query_with_client(
self.bqclient, sql, job_config=job_config, metrics=self._metrics
self.bqclient,
sql,
job_config=job_config,
metrics=self._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
)

assert query_job is not None
return iterator, query_job

def _create_object_table(self, path: str, connection: str) -> str:
Expand All @@ -1943,6 +1952,10 @@ def _create_object_table(self, path: str, connection: str) -> str:
sql,
job_config=bigquery.QueryJobConfig(),
metrics=self._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
)

return table
Expand Down
60 changes: 48 additions & 12 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import textwrap
import types
import typing
from typing import Dict, Iterable, Mapping, Optional, Tuple, Union
from typing import Dict, Iterable, Literal, Mapping, Optional, overload, Tuple, Union

import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
import google.api_core.exceptions
Expand All @@ -38,7 +38,6 @@


IO_ORDERING_ID = "bqdf_row_nums"
MAX_LABELS_COUNT = 64 - 8
_LIST_TABLES_LIMIT = 10000 # calls to bqclient.list_tables
# will be limited to this many tables

Expand Down Expand Up @@ -73,7 +72,12 @@ def create_job_configs_labels(
)
)
values = list(itertools.chain(job_configs_labels.values(), api_methods))
return dict(zip(labels[:MAX_LABELS_COUNT], values[:MAX_LABELS_COUNT]))
return dict(
zip(
labels[: log_adapter.MAX_LABELS_COUNT],
values[: log_adapter.MAX_LABELS_COUNT],
)
)


def create_export_data_statement(
Expand Down Expand Up @@ -223,8 +227,7 @@ def format_option(key: str, value: Union[bool, str]) -> str:
def add_and_trim_labels(job_config):
"""
Add additional labels to the job configuration and trim the total number of labels
to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64
labels per job.
to ensure they do not exceed MAX_LABELS_COUNT labels per job.
"""
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
job_config.labels = create_job_configs_labels(
Expand All @@ -233,23 +236,54 @@ def add_and_trim_labels(job_config):
)


@overload
def start_query_with_client(
bq_client: bigquery.Client,
sql: str,
job_config: bigquery.job.QueryJobConfig,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project: Optional[str],
timeout: Optional[float],
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
query_with_job: Literal[True],
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
...


@overload
def start_query_with_client(
bq_client: bigquery.Client,
sql: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project: Optional[str],
timeout: Optional[float],
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
query_with_job: Literal[False],
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
...


def start_query_with_client(
bq_client: bigquery.Client,
sql: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str] = None,
project: Optional[str] = None,
timeout: Optional[float] = None,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
*,
query_with_job: bool = True,
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts query job and waits for results.
"""
try:
# Note: Ensure no additional labels are added to job_config after this point,
# as `add_and_trim_labels` ensures the label count does not exceed 64.
# Note: Ensure no additional labels are added to job_config after this
# point, as `add_and_trim_labels` ensures the label count does not
# exceed MAX_LABELS_COUNT.
add_and_trim_labels(job_config)
if not query_with_job:
results_iterator = bq_client.query_and_wait(
Expand Down Expand Up @@ -322,8 +356,8 @@ def delete_tables_matching_session_id(

def create_bq_dataset_reference(
bq_client: bigquery.Client,
location=None,
project=None,
location: Optional[str] = None,
project: Optional[str] = None,
) -> bigquery.DatasetReference:
"""Create and identify dataset(s) for temporary BQ resources.

Expand Down Expand Up @@ -352,14 +386,16 @@ def create_bq_dataset_reference(
location=location,
job_config=job_config,
project=project,
timeout=None,
metrics=None,
query_with_job=True,
)

# The anonymous dataset is used by BigQuery to write query results and
# session tables. BigQuery DataFrames also writes temp tables directly
# to the dataset, no BigQuery Session required. Note: there is a
# different anonymous dataset per location. See:
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
assert query_job is not None
query_destination = query_job.destination
return bigquery.DatasetReference(
query_destination.project,
Expand Down
Loading