Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for version 2.42 for RoV Bigquery read/write #5033

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/aiplatform/vertex_ray/bigquery_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
)


# BigQuery write for Ray 2.33.0 and 2.9.3
# BigQuery write for Ray 2.42.0, 2.33.0, and 2.9.3
if Datasink is None:
_BigQueryDatasink = None
else:
Expand Down
113 changes: 56 additions & 57 deletions google/cloud/aiplatform/vertex_ray/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,68 +46,67 @@ def read_bigquery(
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
) -> Dataset:
"""Create a dataset from BigQuery.

The data to read from is specified via the ``project_id``, ``dataset``
and/or ``query`` parameters.

Args:
project_id: The name of the associated Google Cloud Project that hosts
the dataset to read.
dataset: The name of the dataset hosted in BigQuery in the format of
``dataset_id.table_id``. Both the dataset_id and table_id must exist
otherwise an exception will be raised.
query: The query to execute.
The dataset is created from the results of executing the query if provided.
Otherwise, the entire dataset is read. For query syntax guidelines, see
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax
parallelism:
2.9.3: The requested parallelism of the read. If -1, it will be
automatically chosen based on the available cluster resources
and estimated in-memory data size.
2.33.0: This argument is deprecated. Use ``override_num_blocks`` argument.
ray_remote_args: kwargs passed to ray.remote in the read tasks.
concurrency: Not supported in 2.9.3.
2.33.0: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Not supported in 2.9.3.
2.33.0: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.

Returns:
Dataset producing rows from the results of executing the query
or reading the entire dataset on the specified BigQuery dataset.
"""
datasource = _BigQueryDatasource(
"""Create a dataset from BigQuery.

The data to read from is specified via the ``project_id``, ``dataset``
and/or ``query`` parameters.

Args:
project_id: The name of the associated Google Cloud Project that hosts the
dataset to read.
dataset: The name of the dataset hosted in BigQuery in the format of
``dataset_id.table_id``. Both the dataset_id and table_id must exist
otherwise an exception will be raised.
query: The query to execute. The dataset is created from the results of
executing the query if provided. Otherwise, the entire dataset is read.
For query syntax guidelines, see
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax
parallelism: 2.9.3: The requested parallelism of the read. If -1, it will
be automatically chosen based on the available cluster resources and
estimated in-memory data size. 2.33.0, 2.42.0: This argument is
deprecated. Use ``override_num_blocks`` argument.
ray_remote_args: kwargs passed to ray.remote in the read tasks.
concurrency: Not supported in 2.9.3. 2.33.0, 2.42.0: The maximum number of
Ray tasks to run concurrently. Set this to control number of tasks to
run concurrently. This doesn't change the total number of tasks run or
the total number of output blocks. By default, concurrency is
dynamically decided based on the available resources.
override_num_blocks: Not supported in 2.9.3. 2.33.0, 2.42.0: Override the
number of output blocks from all read tasks. By default, the number of
output blocks is dynamically decided based on input data size and
available resources. You shouldn't manually set this value in most
cases.

Returns:
Dataset producing rows from the results of executing the query
or reading the entire dataset on the specified BigQuery dataset.
"""
datasource = _BigQueryDatasource(
project_id=project_id,
dataset=dataset,
query=query,
)

if ray.__version__ == "2.9.3":
warnings.warn(_V2_9_WARNING_MESSAGE, DeprecationWarning, stacklevel=1)
# Concurrency and override_num_blocks are not supported in 2.9.3
return ray.data.read_datasource(
datasource=datasource,
parallelism=parallelism,
ray_remote_args=ray_remote_args,
)
elif ray.__version__ == "2.33.0":
return ray.data.read_datasource(
if ray.__version__ == "2.9.3":
warnings.warn(_V2_9_WARNING_MESSAGE, DeprecationWarning, stacklevel=1)
# Concurrency and override_num_blocks are not supported in 2.9.3
return ray.data.read_datasource(
datasource=datasource,
parallelism=parallelism,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)
else:
raise ImportError(
elif ray.__version__ in ("2.33.0", "2.42.0"):
return ray.data.read_datasource(
datasource=datasource,
parallelism=parallelism,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)
else:
raise ImportError(
f"[Ray on Vertex AI]: Unsupported version {ray.__version__}."
+ "Only 2.33.0 and 2.9.3 are supported."
+ "Only 2.42.0, 2.33.0, and 2.9.3 are supported."
)


Expand Down Expand Up @@ -135,19 +134,19 @@ def write_bigquery(
The default number of retries is 10.
ray_remote_args: kwargs passed to ray.remote in the write tasks.
overwrite_table: Not supported in 2.9.3.
2.33.0: Whether the write will overwrite the table if it already
2.33.0, 2.42.0: Whether the write will overwrite the table if it already
exists. The default behavior is to overwrite the table.
If false, will append to the table if it exists.
concurrency: Not supported in 2.9.3.
2.33.0: The maximum number of Ray tasks to run concurrently. Set this
2.33.0, 2.42.0: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
"""
if ray.__version__ == "2.4.0":
raise RuntimeError(_V2_4_WARNING_MESSAGE)

elif ray.__version__ == "2.9.3" or ray.__version__ == "2.33.0":
elif ray.__version__ in ("2.9.3", "2.33.0", "2.42.0"):
if ray.__version__ == "2.9.3":
warnings.warn(_V2_9_WARNING_MESSAGE, DeprecationWarning, stacklevel=1)
if ray_remote_args is None:
Expand All @@ -174,7 +173,7 @@ def write_bigquery(
datasink=datasink,
ray_remote_args=ray_remote_args,
)
elif ray.__version__ == "2.33.0":
elif ray.__version__ in ("2.33.0", "2.42.0"):
datasink = _BigQueryDatasink(
project_id=project_id,
dataset=dataset,
Expand All @@ -189,5 +188,5 @@ def write_bigquery(
else:
raise ImportError(
f"[Ray on Vertex AI]: Unsupported version {ray.__version__}."
+ "Only 2.33.0 and 2.9.3 are supported."
+ "Only 2.42.0, 2.33.0 and 2.9.3 are supported."
)
8 changes: 5 additions & 3 deletions google/cloud/aiplatform/vertex_ray/util/_validation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform.utils import resource_manager_utils

SUPPORTED_RAY_VERSIONS = immutabledict({"2.9": "2.9.3", "2.33": "2.33.0"})
SUPPORTED_RAY_VERSIONS = immutabledict({
"2.9": "2.9.3", "2.33": "2.33.0", "2.42": "2.42.0"
})
SUPPORTED_PY_VERSION = ["3.10"]
_V2_4_WARNING_MESSAGE = (
"After google-cloud-aiplatform>1.53.0, using Ray version = 2.4 will result in an error. "
"Please use Ray version = 2.33.0 (default)."
"Please use Ray version = 2.42.0 (default)."
)
_V2_9_WARNING_MESSAGE = (
"In March 2025, using Ray version = 2.9 will result in an error. "
"Please use Ray version = 2.33.0 (default) instead."
"Please use Ray version = 2.33.0 or 2.42.0 (default) instead."
)


Expand Down
11 changes: 7 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,22 @@
preview_extra_require = []

ray_extra_require = [
# Cluster only supports 2.9.3 and 2.33.0. Keep 2.4.0 for our testing environment.
# Cluster only supports 2.9.3, 2.33.0, and 2.42.0. Keep 2.4.0 for our
# testing environment.
# Note that testing is submiting a job in a cluster with Ray 2.9.3 remotely.
(
"ray[default] >= 2.4, <= 2.33.0,!= 2.5.*,!= 2.6.*,!= 2.7.*,!="
"ray[default] >= 2.4, <= 2.42.0,!= 2.5.*,!= 2.6.*,!= 2.7.*,!="
" 2.8.*,!=2.9.0,!=2.9.1,!=2.9.2, !=2.10.*, !=2.11.*, !=2.12.*, !=2.13.*, !="
" 2.14.*, !=2.15.*, !=2.16.*, !=2.17.*, !=2.18.*, !=2.19.*, !=2.20.*, !="
" 2.21.*, !=2.22.*, !=2.23.*, !=2.24.*, !=2.25.*, !=2.26.*, !=2.27.*, !="
" 2.28.*, !=2.29.*, !=2.30.*, !=2.31.*, !=2.32.*; python_version<'3.11'"
" 2.28.*, !=2.29.*, !=2.30.*, !=2.31.*, !=2.32.*, !=2.34.*, !=2.35.*, !="
" 2.36.*, !=2.37.*, !=2.38.*, !=2.39.*, !=2.40.*, !=2.41.*;"
" python_version<'3.11'"
),
# To avoid ImportError: cannot import name 'packaging' from 'pkg_resources'
"setuptools < 70.0.0",
# Ray Data v2.4 in Python 3.11 is broken, but got fixed in Ray v2.5.
"ray[default] >= 2.5, <= 2.33.0; python_version=='3.11'",
"ray[default] >= 2.5, <= 2.42.0; python_version=='3.11'",
"google-cloud-bigquery-storage",
"google-cloud-bigquery",
"pandas >= 1.0.0",
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/vertex_ray/test_cluster_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

_TEST_V2_4_WARNING_MESSAGE = (
"After google-cloud-aiplatform>1.53.0, using Ray version = 2.4 will result"
" in an error. Please use Ray version = 2.33.0 (default)."
" in an error. Please use Ray version = 2.33.0 or 2.42.0 (default) instead."
)


Expand Down