Skip to content

Commit f495c84

Browse files
authored
chore: use faster query_and_wait API in _read_gbq_colab (#1777)
* chore: use faster query_and_wait API in _read_gbq_colab * try to fix unit tests * more unit test fixes * more test fixes * fix mypy * fix metrics counter in read_gbq with allow_large_results=False * use managedarrowtable * Update bigframes/session/loader.py * split out a few special case return values for read_gbq_query * support slice node for repr * fix failing system test * move slice into semiexecutor and out of readlocalnode * unit test for local executor * split method instead of using reloads * fix reference to _start_query * use limit rewrite for slice support * do not use numpy for offsets
1 parent e480d29 commit f495c84

28 files changed

+665
-117
lines changed

bigframes/blob/_functions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ def _create_udf(self):
9595
sql,
9696
job_config=bigquery.QueryJobConfig(),
9797
metrics=self._session._metrics,
98+
location=None,
99+
project=None,
100+
timeout=None,
101+
query_with_job=True,
98102
)
99103

100104
return udf_name

bigframes/core/array_value.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import bigframes.core.ordering as orderings
3535
import bigframes.core.schema as schemata
3636
import bigframes.core.tree_properties
37-
import bigframes.core.utils
3837
from bigframes.core.window_spec import WindowSpec
3938
import bigframes.dtypes
4039
import bigframes.exceptions as bfe

bigframes/core/compile/compiler.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
import bigframes_vendored.ibis.expr.api as ibis_api
2323
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
2424
import bigframes_vendored.ibis.expr.types as ibis_types
25-
import pyarrow as pa
2625

2726
from bigframes import dtypes, operations
28-
from bigframes.core import expression
27+
from bigframes.core import expression, pyarrow_utils
2928
import bigframes.core.compile.compiled as compiled
3029
import bigframes.core.compile.concat as concat_impl
3130
import bigframes.core.compile.configs as configs
@@ -172,9 +171,7 @@ def compile_readlocal(node: nodes.ReadLocalNode, *args):
172171
pa_table = pa_table.rename_columns([item.id.sql for item in node.scan_list.items])
173172

174173
if offsets:
175-
pa_table = pa_table.append_column(
176-
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
177-
)
174+
pa_table = pyarrow_utils.append_offsets(pa_table, offsets)
178175
return compiled.UnorderedIR.from_polars(pa_table, bq_schema)
179176

180177

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
import typing
1919

2020
from google.cloud import bigquery
21-
import pyarrow as pa
2221
import sqlglot.expressions as sge
2322

24-
from bigframes.core import expression, guid, identifiers, nodes, rewrite
23+
from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite
2524
from bigframes.core.compile import configs
2625
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
2726
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
@@ -155,9 +154,7 @@ def compile_readlocal(self, node: nodes.ReadLocalNode, *args) -> ir.SQLGlotIR:
155154

156155
offsets = node.offsets_col.sql if node.offsets_col else None
157156
if offsets:
158-
pa_table = pa_table.append_column(
159-
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
160-
)
157+
pa_table = pyarrow_utils.append_offsets(pa_table, offsets)
161158

162159
return ir.SQLGlotIR.from_pyarrow(pa_table, node.schema, uid_gen=self.uid_gen)
163160

bigframes/core/local_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ def _adapt_chunked_array(
295295

296296

297297
def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtype]:
298-
"""Normalize the array to managed storage types. Preverse shapes, only transforms values."""
298+
"""Normalize the array to managed storage types. Preserve shapes, only transforms values."""
299299
if array.offset != 0: # Offset arrays don't have all operations implemented
300300
return _adapt_arrow_array(pa.concat_arrays([array]))
301301

bigframes/core/nodes.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ def is_limit(self) -> bool:
154154
and (self.stop > 0)
155155
)
156156

157+
@property
158+
def is_noop(self) -> bool:
159+
"""Returns whether this node doesn't actually change the results."""
160+
# TODO: Handle tail case.
161+
return (
162+
((not self.start) or (self.start == 0))
163+
and (self.step == 1)
164+
and ((self.stop is None) or (self.stop == self.row_count))
165+
)
166+
157167
@property
158168
def row_count(self) -> typing.Optional[int]:
159169
child_length = self.child.row_count

bigframes/core/pyarrow_utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,12 @@ def truncate_pyarrow_iterable(
8585
else:
8686
yield batch
8787
total_yielded += batch.num_rows
88+
89+
90+
def append_offsets(
91+
pa_table: pa.Table,
92+
offsets_col: str,
93+
) -> pa.Table:
94+
return pa_table.append_column(
95+
offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64())
96+
)

bigframes/core/rewrite/scan_reduction.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from typing import Optional
1717

1818
from bigframes.core import nodes
19+
import bigframes.core.rewrite.slices
1920

2021

2122
def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTableNode]:
@@ -28,7 +29,15 @@ def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTab
2829
return None
2930

3031

31-
def try_reduce_to_local_scan(node: nodes.BigFrameNode) -> Optional[nodes.ReadLocalNode]:
32+
def try_reduce_to_local_scan(
33+
node: nodes.BigFrameNode,
34+
) -> Optional[tuple[nodes.ReadLocalNode, Optional[int]]]:
35+
"""Create a ReadLocalNode with optional limit, if possible.
36+
37+
Similar to ReadApiSemiExecutor._try_adapt_plan.
38+
"""
39+
node, limit = bigframes.core.rewrite.slices.pull_out_limit(node)
40+
3241
if not all(
3342
map(
3443
lambda x: isinstance(x, (nodes.ReadLocalNode, nodes.SelectionNode)),
@@ -38,7 +47,7 @@ def try_reduce_to_local_scan(node: nodes.BigFrameNode) -> Optional[nodes.ReadLoc
3847
return None
3948
result = node.bottom_up(merge_scan)
4049
if isinstance(result, nodes.ReadLocalNode):
41-
return result
50+
return result, limit
4251
return None
4352

4453

bigframes/core/rewrite/slices.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def pull_out_limit(
5757
if (prior_limit is not None) and (prior_limit < limit):
5858
limit = prior_limit
5959
return new_root, limit
60+
if root.is_noop:
61+
new_root, prior_limit = pull_out_limit(root.child)
62+
return new_root, prior_limit
6063
elif (
6164
isinstance(root, (nodes.SelectionNode, nodes.ProjectionNode))
6265
and root.row_preserving

bigframes/core/schema.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dataclasses import dataclass
1818
import functools
1919
import typing
20-
from typing import Sequence
20+
from typing import Dict, List, Sequence
2121

2222
import google.cloud.bigquery
2323
import pyarrow
@@ -47,14 +47,24 @@ def from_bq_table(
4747
column_type_overrides: typing.Optional[
4848
typing.Dict[str, bigframes.dtypes.Dtype]
4949
] = None,
50+
):
51+
return ArraySchema.from_bq_schema(
52+
table.schema, column_type_overrides=column_type_overrides
53+
)
54+
55+
@classmethod
56+
def from_bq_schema(
57+
cls,
58+
schema: List[google.cloud.bigquery.SchemaField],
59+
column_type_overrides: typing.Optional[
60+
Dict[str, bigframes.dtypes.Dtype]
61+
] = None,
5062
):
5163
if column_type_overrides is None:
5264
column_type_overrides = {}
5365
items = tuple(
5466
SchemaItem(name, column_type_overrides.get(name, dtype))
55-
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
56-
table.schema
57-
).items()
67+
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(schema).items()
5868
)
5969
return ArraySchema(items)
6070

bigframes/functions/_function_client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,15 @@ def _ensure_dataset_exists(self) -> None:
125125
def _create_bq_function(self, create_function_ddl: str) -> None:
126126
# TODO(swast): plumb through the original, user-facing api_name.
127127
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
128-
self._session.bqclient,
128+
cast(bigquery.Client, self._session.bqclient),
129129
create_function_ddl,
130130
job_config=bigquery.QueryJobConfig(),
131+
location=None,
132+
project=None,
133+
timeout=None,
134+
metrics=None,
135+
query_with_job=True,
131136
)
132-
assert query_job is not None
133137
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
134138

135139
def _format_function_options(self, function_options: dict) -> str:

bigframes/session/__init__.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,10 @@ def _read_gbq_colab(
537537
index_col=bigframes.enums.DefaultIndexKind.NULL,
538538
force_total_order=False,
539539
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
540+
# TODO(tswast): we may need to allow allow_large_results to be overwritten
541+
# or possibly a general configuration object for an explicit
542+
# destination table and write disposition.
543+
allow_large_results=False,
540544
)
541545

542546
@overload
@@ -1917,10 +1921,15 @@ def _start_query_ml_ddl(
19171921
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
19181922
job_config.destination_encryption_configuration = None
19191923
iterator, query_job = bf_io_bigquery.start_query_with_client(
1920-
self.bqclient, sql, job_config=job_config, metrics=self._metrics
1924+
self.bqclient,
1925+
sql,
1926+
job_config=job_config,
1927+
metrics=self._metrics,
1928+
location=None,
1929+
project=None,
1930+
timeout=None,
1931+
query_with_job=True,
19211932
)
1922-
1923-
assert query_job is not None
19241933
return iterator, query_job
19251934

19261935
def _create_object_table(self, path: str, connection: str) -> str:
@@ -1943,6 +1952,10 @@ def _create_object_table(self, path: str, connection: str) -> str:
19431952
sql,
19441953
job_config=bigquery.QueryJobConfig(),
19451954
metrics=self._metrics,
1955+
location=None,
1956+
project=None,
1957+
timeout=None,
1958+
query_with_job=True,
19461959
)
19471960

19481961
return table

bigframes/session/_io/bigquery/__init__.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import textwrap
2323
import types
2424
import typing
25-
from typing import Dict, Iterable, Mapping, Optional, Tuple, Union
25+
from typing import Dict, Iterable, Literal, Mapping, Optional, overload, Tuple, Union
2626

2727
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
2828
import google.api_core.exceptions
@@ -38,7 +38,6 @@
3838

3939

4040
IO_ORDERING_ID = "bqdf_row_nums"
41-
MAX_LABELS_COUNT = 64 - 8
4241
_LIST_TABLES_LIMIT = 10000 # calls to bqclient.list_tables
4342
# will be limited to this many tables
4443

@@ -73,7 +72,12 @@ def create_job_configs_labels(
7372
)
7473
)
7574
values = list(itertools.chain(job_configs_labels.values(), api_methods))
76-
return dict(zip(labels[:MAX_LABELS_COUNT], values[:MAX_LABELS_COUNT]))
75+
return dict(
76+
zip(
77+
labels[: log_adapter.MAX_LABELS_COUNT],
78+
values[: log_adapter.MAX_LABELS_COUNT],
79+
)
80+
)
7781

7882

7983
def create_export_data_statement(
@@ -223,8 +227,7 @@ def format_option(key: str, value: Union[bool, str]) -> str:
223227
def add_and_trim_labels(job_config):
224228
"""
225229
Add additional labels to the job configuration and trim the total number of labels
226-
to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64
227-
labels per job.
230+
to ensure they do not exceed MAX_LABELS_COUNT labels per job.
228231
"""
229232
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
230233
job_config.labels = create_job_configs_labels(
@@ -233,23 +236,54 @@ def add_and_trim_labels(job_config):
233236
)
234237

235238

239+
@overload
236240
def start_query_with_client(
237241
bq_client: bigquery.Client,
238242
sql: str,
239-
job_config: bigquery.job.QueryJobConfig,
243+
*,
244+
job_config: bigquery.QueryJobConfig,
245+
location: Optional[str],
246+
project: Optional[str],
247+
timeout: Optional[float],
248+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
249+
query_with_job: Literal[True],
250+
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
251+
...
252+
253+
254+
@overload
255+
def start_query_with_client(
256+
bq_client: bigquery.Client,
257+
sql: str,
258+
*,
259+
job_config: bigquery.QueryJobConfig,
260+
location: Optional[str],
261+
project: Optional[str],
262+
timeout: Optional[float],
263+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
264+
query_with_job: Literal[False],
265+
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
266+
...
267+
268+
269+
def start_query_with_client(
270+
bq_client: bigquery.Client,
271+
sql: str,
272+
*,
273+
job_config: bigquery.QueryJobConfig,
240274
location: Optional[str] = None,
241275
project: Optional[str] = None,
242276
timeout: Optional[float] = None,
243277
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
244-
*,
245278
query_with_job: bool = True,
246279
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
247280
"""
248281
Starts query job and waits for results.
249282
"""
250283
try:
251-
# Note: Ensure no additional labels are added to job_config after this point,
252-
# as `add_and_trim_labels` ensures the label count does not exceed 64.
284+
# Note: Ensure no additional labels are added to job_config after this
285+
# point, as `add_and_trim_labels` ensures the label count does not
286+
# exceed MAX_LABELS_COUNT.
253287
add_and_trim_labels(job_config)
254288
if not query_with_job:
255289
results_iterator = bq_client.query_and_wait(
@@ -322,8 +356,8 @@ def delete_tables_matching_session_id(
322356

323357
def create_bq_dataset_reference(
324358
bq_client: bigquery.Client,
325-
location=None,
326-
project=None,
359+
location: Optional[str] = None,
360+
project: Optional[str] = None,
327361
) -> bigquery.DatasetReference:
328362
"""Create and identify dataset(s) for temporary BQ resources.
329363
@@ -352,14 +386,16 @@ def create_bq_dataset_reference(
352386
location=location,
353387
job_config=job_config,
354388
project=project,
389+
timeout=None,
390+
metrics=None,
391+
query_with_job=True,
355392
)
356393

357394
# The anonymous dataset is used by BigQuery to write query results and
358395
# session tables. BigQuery DataFrames also writes temp tables directly
359396
# to the dataset, no BigQuery Session required. Note: there is a
360397
# different anonymous dataset per location. See:
361398
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
362-
assert query_job is not None
363399
query_destination = query_job.destination
364400
return bigquery.DatasetReference(
365401
query_destination.project,

0 commit comments

Comments
 (0)