Skip to content

Commit c95980a

Browse files
[Data] Add BigQueryDatasource (#37380)
Ray Datasets support parallelized reads/writes from multiple data types (csv, parquet, arrow, etc) and sources (s3, GCS, etc) but not BigQuery does not have full coverage yet. As of Ray 2.5.1, only reads are supported from BigQuery using read_sql() (docs ref). However, this uses the basic BigQuery Query API which suffers from 2 issues: 1. read_sql() uses the basic BigQuery Query API (library link) which is slower when compared to the BigQuery Storage API (library link). This is most noticeable for medium sized datasets (ex. 10 million rows) in which Storage Read can be around 5-10x faster than using read_sql() 2. Technically, BigQuery does not guarantee correctness with regard to ordering, which could potentially cause an incorrect result when using read_sql() since read_sql() relies on using OFFSET to parallelize its read with the basic BigQuery Query API. (The BigQuery Storage API is designed to be used for parallelized use cases which creates independent read/write streams) This PR introduces a datasource for BigQuery to handle reads and writes for BigQuery. Reads use the BigQuery Storage API for the reasons aforementioned. Writes are done using a Load Job as opposed to the BigQuery Storage Write API since the latter requires the block/table to be serialized to protobuf (which, when done in python, is not faster than the Load Job Write). --------- Signed-off-by: Matthew Tang <[email protected]> Signed-off-by: Balaji Veeramani <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]>
1 parent 5abc4ab commit c95980a

File tree

11 files changed

+625
-59
lines changed

11 files changed

+625
-59
lines changed

doc/source/data/api/input_output.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,15 @@ MongoDB
140140
read_mongo
141141
Dataset.write_mongo
142142

143+
BigQuery
144+
--------
145+
146+
.. autosummary::
147+
:toctree: doc/
148+
149+
read_bigquery
150+
Dataset.write_bigquery
151+
143152
SQL Databases
144153
-------------
145154

python/ray/data/BUILD

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ py_test(
7373
deps = ["//:ray_lib", ":conftest"],
7474
)
7575

76+
py_test(
77+
name = "test_bigquery",
78+
size = "large",
79+
srcs = ["tests/test_bigquery.py"],
80+
tags = ["team:data", "exclusive", "data_integration"],
81+
deps = ["//:ray_lib", ":conftest"],
82+
)
83+
7684
py_test(
7785
name = "test_actor_pool_map_operator",
7886
size = "small",

python/ray/data/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
range,
3434
range_table,
3535
range_tensor,
36+
read_bigquery,
3637
read_binary_files,
3738
read_csv,
3839
read_databricks_tables,

python/ray/data/_internal/logical/util.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# The white list of operator names allowed to be recorded.
1717
_op_name_white_list = [
1818
# Read
19+
"ReadBigQuery",
1920
"ReadRange",
2021
"ReadMongo",
2122
"ReadParquet",
@@ -35,6 +36,7 @@
3536
"FromNumpy",
3637
"FromPandas",
3738
# Write
39+
"WriteBigQuery",
3840
"WriteParquet",
3941
"WriteJSON",
4042
"WriteCSV",

python/ray/data/dataset.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
DataContext,
115115
)
116116
from ray.data.datasource import (
117+
BigQueryDatasource,
117118
BlockWritePathProvider,
118119
Connection,
119120
CSVDatasource,
@@ -3280,6 +3281,7 @@ def write_sql(
32803281
sql=sql,
32813282
)
32823283

3284+
@PublicAPI(stability="alpha")
32833285
@ConsumptionAPI
32843286
def write_mongo(
32853287
self,
@@ -3348,6 +3350,50 @@ def write_mongo(
33483350
collection=collection,
33493351
)
33503352

3353+
@ConsumptionAPI
3354+
def write_bigquery(
3355+
self,
3356+
project_id: str,
3357+
dataset: str,
3358+
ray_remote_args: Dict[str, Any] = None,
3359+
) -> None:
3360+
"""Write the dataset to a BigQuery dataset table.
3361+
3362+
To control the number of parallel write tasks, use ``.repartition()``
3363+
before calling this method.
3364+
3365+
Examples:
3366+
.. testcode::
3367+
:skipif: True
3368+
3369+
import ray
3370+
import pandas as pd
3371+
3372+
docs = [{"title": "BigQuery Datasource test"} for key in range(4)]
3373+
ds = ray.data.from_pandas(pd.DataFrame(docs))
3374+
ds.write_bigquery(
3375+
BigQueryDatasource(),
3376+
project_id="my_project_id",
3377+
dataset="my_dataset_table",
3378+
)
3379+
3380+
Args:
3381+
project_id: The name of the associated Google Cloud Project that hosts
3382+
the dataset to read. For more information, see details in
3383+
`Creating and managing projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`. # noqa: E501
3384+
dataset: The name of the dataset in the format of ``dataset_id.table_id``.
3385+
The dataset is created if it doesn't already exist. The table_id is
3386+
overwritten if it exists.
3387+
ray_remote_args: Kwargs passed to ray.remote in the write tasks.
3388+
"""
3389+
3390+
self.write_datasource(
3391+
BigQueryDatasource(),
3392+
ray_remote_args=ray_remote_args,
3393+
dataset=dataset,
3394+
project_id=project_id,
3395+
)
3396+
33513397
@ConsumptionAPI(pattern="Time complexity:")
33523398
def write_datasource(
33533399
self,

python/ray/data/datasource/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from ray.data.datasource.bigquery_datasource import BigQueryDatasource
12
from ray.data.datasource.binary_datasource import BinaryDatasource
23
from ray.data.datasource.csv_datasource import CSVDatasource
34
from ray.data.datasource.datasource import (
@@ -48,6 +49,7 @@
4849
__all__ = [
4950
"BaseFileMetadataProvider",
5051
"BinaryDatasource",
52+
"BigQueryDatasource",
5153
"BlockWritePathProvider",
5254
"Connection",
5355
"CSVDatasource",
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import logging
2+
import os
3+
import tempfile
4+
import time
5+
import uuid
6+
from typing import Any, Dict, List, Optional
7+
8+
import pyarrow.parquet as pq
9+
10+
from ray.data._internal.execution.interfaces import TaskContext
11+
from ray.data._internal.util import _check_import
12+
from ray.data.block import Block, BlockAccessor, BlockMetadata
13+
from ray.data.datasource.datasource import Datasource, Reader, ReadTask, WriteResult
14+
from ray.types import ObjectRef
15+
from ray.util.annotations import PublicAPI
16+
17+
logger = logging.getLogger(__name__)
18+
19+
MAX_RETRY_CNT = 10
20+
RATE_LIMIT_EXCEEDED_SLEEP_TIME = 11
21+
22+
23+
class _BigQueryDatasourceReader(Reader):
24+
def __init__(
25+
self,
26+
project_id: str,
27+
dataset: Optional[str] = None,
28+
query: Optional[str] = None,
29+
parallelism: Optional[int] = -1,
30+
**kwargs: Optional[Dict[str, Any]],
31+
):
32+
self._project_id = project_id
33+
self._dataset = dataset
34+
self._query = query
35+
self._kwargs = kwargs
36+
37+
if query is not None and dataset is not None:
38+
raise ValueError(
39+
"Query and dataset kwargs cannot both be provided "
40+
+ "(must be mutually exclusive)."
41+
)
42+
43+
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
44+
from google.cloud import bigquery, bigquery_storage
45+
46+
def _read_single_partition(stream) -> Block:
47+
client = bigquery_storage.BigQueryReadClient()
48+
reader = client.read_rows(stream.name)
49+
return reader.to_arrow()
50+
51+
if self._query:
52+
query_client = bigquery.Client(project=self._project_id)
53+
query_job = query_client.query(self._query)
54+
query_job.result()
55+
destination = str(query_job.destination)
56+
dataset_id = destination.split(".")[-2]
57+
table_id = destination.split(".")[-1]
58+
else:
59+
self._validate_dataset_table_exist(self._project_id, self._dataset)
60+
dataset_id = self._dataset.split(".")[0]
61+
table_id = self._dataset.split(".")[1]
62+
63+
bqs_client = bigquery_storage.BigQueryReadClient()
64+
table = f"projects/{self._project_id}/datasets/{dataset_id}/tables/{table_id}"
65+
66+
if parallelism == -1:
67+
parallelism = None
68+
requested_session = bigquery_storage.types.ReadSession(
69+
table=table,
70+
data_format=bigquery_storage.types.DataFormat.ARROW,
71+
)
72+
read_session = bqs_client.create_read_session(
73+
parent=f"projects/{self._project_id}",
74+
read_session=requested_session,
75+
max_stream_count=parallelism,
76+
)
77+
78+
read_tasks = []
79+
logger.info("Created streams: " + str(len(read_session.streams)))
80+
if len(read_session.streams) < parallelism:
81+
logger.info(
82+
"The number of streams created by the "
83+
+ "BigQuery Storage Read API is less than the requested "
84+
+ "parallelism due to the size of the dataset."
85+
)
86+
87+
for stream in read_session.streams:
88+
# Create a metadata block object to store schema, etc.
89+
metadata = BlockMetadata(
90+
num_rows=None,
91+
size_bytes=None,
92+
schema=None,
93+
input_files=None,
94+
exec_stats=None,
95+
)
96+
97+
# Create the read task and pass the no-arg wrapper and metadata in
98+
read_task = ReadTask(
99+
lambda stream=stream: [_read_single_partition(stream)],
100+
metadata,
101+
)
102+
read_tasks.append(read_task)
103+
104+
return read_tasks
105+
106+
def estimate_inmemory_data_size(self) -> Optional[int]:
107+
return None
108+
109+
def _validate_dataset_table_exist(self, project_id: str, dataset: str) -> None:
110+
from google.api_core import exceptions
111+
from google.cloud import bigquery
112+
113+
client = bigquery.Client(project=project_id)
114+
dataset_id = dataset.split(".")[0]
115+
try:
116+
client.get_dataset(dataset_id)
117+
except exceptions.NotFound:
118+
raise ValueError(
119+
"Dataset {} is not found. Please ensure that it exists.".format(
120+
dataset_id
121+
)
122+
)
123+
124+
try:
125+
client.get_table(dataset)
126+
except exceptions.NotFound:
127+
raise ValueError(
128+
"Table {} is not found. Please ensure that it exists.".format(dataset)
129+
)
130+
131+
132+
@PublicAPI(stability="alpha")
133+
class BigQueryDatasource(Datasource):
134+
def create_reader(self, **kwargs) -> Reader:
135+
_check_import(self, module="google.cloud", package="bigquery")
136+
_check_import(self, module="google.cloud", package="bigquery_storage")
137+
_check_import(self, module="google.api_core", package="exceptions")
138+
return _BigQueryDatasourceReader(**kwargs)
139+
140+
def write(
141+
self,
142+
blocks: List[ObjectRef[Block]],
143+
ctx: TaskContext,
144+
project_id: str,
145+
dataset: str,
146+
) -> WriteResult:
147+
from google.api_core import exceptions
148+
from google.cloud import bigquery
149+
150+
def _write_single_block(block: Block, project_id: str, dataset: str):
151+
block = BlockAccessor.for_block(block).to_arrow()
152+
153+
client = bigquery.Client(project=project_id)
154+
job_config = bigquery.LoadJobConfig(autodetect=True)
155+
job_config.source_format = bigquery.SourceFormat.PARQUET
156+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
157+
158+
with tempfile.TemporaryDirectory() as temp_dir:
159+
fp = os.path.join(temp_dir, f"block_{uuid.uuid4()}.parquet")
160+
pq.write_table(block, fp, compression="SNAPPY")
161+
162+
retry_cnt = 0
163+
while retry_cnt < MAX_RETRY_CNT:
164+
with open(fp, "rb") as source_file:
165+
job = client.load_table_from_file(
166+
source_file, dataset, job_config=job_config
167+
)
168+
retry_cnt += 1
169+
try:
170+
logger.info(job.result())
171+
break
172+
except exceptions.Forbidden as e:
173+
logger.info("Rate limit exceeded... Sleeping to try again")
174+
logger.debug(e)
175+
time.sleep(RATE_LIMIT_EXCEEDED_SLEEP_TIME)
176+
177+
# Set up datasets to write
178+
client = bigquery.Client(project=project_id)
179+
dataset_id = dataset.split(".", 1)[0]
180+
try:
181+
client.create_dataset(f"{project_id}.{dataset_id}", timeout=30)
182+
logger.info("Created dataset " + dataset_id)
183+
except exceptions.Conflict:
184+
logger.info(
185+
f"Dataset {dataset_id} already exists. "
186+
"The table will be overwritten if it already exists."
187+
)
188+
189+
# Delete table if it already exists
190+
client.delete_table(f"{project_id}.{dataset}", not_found_ok=True)
191+
192+
for block in blocks:
193+
_write_single_block(block, project_id, dataset)
194+
return "ok"

0 commit comments

Comments
 (0)