Skip to content

fix: include location in Session-based temporary storage manager DDL queries #1780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 30, 2025
9 changes: 7 additions & 2 deletions bigframes/session/bigquery_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def create_temp_table(

ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}"

job = self.bqclient.query(ddl, job_config=job_config)
job = self.bqclient.query(
ddl, job_config=job_config, location=self.location
)
job.result()
# return the fully qualified table, so it can be used outside of the session
return job.destination
Expand All @@ -94,7 +96,10 @@ def close(self):
self._sessiondaemon.stop()

if self._session_id is not None and self.bqclient is not None:
self.bqclient.query_and_wait(f"CALL BQ.ABORT_SESSION('{self._session_id}')")
self.bqclient.query_and_wait(
f"CALL BQ.ABORT_SESSION('{self._session_id}')",
location=self.location,
)

def _get_session_id(self) -> str:
if self._session_id:
Expand Down
23 changes: 0 additions & 23 deletions tests/system/large/test_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import typing

from google.cloud import bigquery
from google.cloud.bigquery_storage import types as bqstorage_types
import pandas
import pandas.testing
import pytest
Expand Down Expand Up @@ -63,28 +62,6 @@ def _assert_bq_execution_location(
expected_result, result.to_pandas(), check_dtype=False, check_index_type=False
)

# Ensure BQ Storage Read client operation succceeds
table = result.query_job.destination
requested_session = bqstorage_types.ReadSession( # type: ignore[attr-defined]
table=f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}",
data_format=bqstorage_types.DataFormat.ARROW, # type: ignore[attr-defined]
)
read_session = session.bqstoragereadclient.create_read_session(
parent=f"projects/{table.project}",
read_session=requested_session,
max_stream_count=1,
)
reader = session.bqstoragereadclient.read_rows(read_session.streams[0].name)
frames = []
for message in reader.rows().pages:
frames.append(message.to_dataframe())
read_dataframe = pandas.concat(frames)
# normalize before comparing since we lost some of the bigframes column
# naming abtractions in the direct read of the destination table
read_dataframe = read_dataframe.set_index("name")
read_dataframe.columns = result.columns
pandas.testing.assert_frame_equal(expected_result, read_dataframe)


def test_bq_location_default():
session = bigframes.Session()
Expand Down