Skip to content

Conversation

olex-snk
Copy link
Collaborator

This pull request introduces an example of Cloud Composer DAG that orchestrates a Vertex AI pipeline, including data loading to BigQuery, triggering a Vertex AI pipeline, and managing the pipeline job lifecycle.

@olex-snk olex-snk marked this pull request as draft May 19, 2025 12:01
@olex-snk olex-snk requested review from takumiohym and removed request for takumiohym May 22, 2025 15:16
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@olex-snk olex-snk changed the title added_cloud_composer_vertex_ai_integration_dag WIP: Add Cloud Composer Vertex AI Integration DAG Jun 4, 2025
@olex-snk olex-snk self-assigned this Jun 11, 2025
@olex-snk olex-snk marked this pull request as ready for review June 11, 2025 11:22
@olex-snk olex-snk changed the title WIP: Add Cloud Composer Vertex AI Integration DAG Add Cloud Composer Vertex AI Integration DAG Jun 16, 2025
@@ -0,0 +1,443 @@
{
Copy link
Collaborator

@takumiohym takumiohym Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding Prerequisites:

You can delete these items since we can assume they are already done in the ASL environment:

  • Google Cloud Project with billing enabled.
  • Vertex AI API enabled in your GCP project.
  • BigQuery API enabled in your GCP project.

Also, could you make this a separate cell and write a step by step guide in this notebook? If additional IAM setup is required, write the command in the setup script.

  • Cloud Composer environment provisioned in your GCP project. (This notebook assumes the Cloud Composer instance is already created by following the instructions covered in the Run an Apache Airflow DAG in Cloud Composer. If you haven't run it, please create Cloud Composer instance using above instructions.)

Regarding the yaml file, I think it's also better to make it a separate step by step guide and write a command to 1) create a bucket if not exist, 2) run the asl-ml-immersion/notebooks/kubeflow_pipelines/pipelines/solutions/kfp_pipeline_vertex_lightweight.ipynb file, 3) and copy the yaml file from the solution directory to the GCS bucket.

  • A compiled Kubeflow Pipeline YAML file uploaded to a GCS bucket (e.g., gs://your-bucket/covertype_kfp_pipeline.yaml). This file should define all the steps of your Vertex AI Pipeline. its recommented to use Lab "Continuous Training with Kubeflow Pipeline and Vertex AI" from "asl-ml-immersion/notebooks/kubeflow_pipelines/pipelines/solutions/kfp_pipeline_vertex_lightweight.ipynb" notebook to create "covertype_kfp_pipeline.yaml"

1) Create bucket.

PROJECT = !(gcloud config get-value core/project)
PROJECT = PROJECT[0]
BUCKET = PROJECT  # defaults to PROJECT

os.environ["BUCKET"] = BUCKET

%%bash
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)

if [ -n "$exists" ]; then
   echo -e "Bucket gs://${BUCKET} already exists."
    
else
   echo "Creating a new GCS bucket."
   gsutil mb -l ${REGION} gs://${BUCKET}
   echo -e "\nHere are your current buckets:"
   gsutil ls
fi

3) copy the yaml file

!gsutil cp ../../../pipelines/solutions/covertype_kfp_pipeline.yaml gs://$BUCKET

Also, in Setup and Configuration:

1) GCS_VERTEX_AI_PIPELINE_YAML and GCS_TRAIN_DATASET_PATH can be prefilled with the bucket name created above.

It seems the pipeline fails if BIGQUERY_DATASET_ID doesn't exist. Please add a step to create the dataset with bq mk command.

2) the IAM section can be removed. If additional IAM is required, add it to the setup script.


Reply via ReviewNB

@@ -0,0 +1,443 @@
{
Copy link
Collaborator

@takumiohym takumiohym Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this dag doesn't contain tasks to create validation dataset csv?


Reply via ReviewNB

@@ -0,0 +1,443 @@
{
Copy link
Collaborator

@takumiohym takumiohym Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain where to find the dag bucket path, or explicitly import the python file using gcloud composer environments storage dags import command.


Reply via ReviewNB

* **Trigger**: Executes once `start_vertex_ai_pipeline` successfully completes.

5. **`delete_vertex_ai_pipeline_job`**:
* **Operator**: `DeletePipelineJobOperator`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think deleting the pipeline is not necessary. Vertex AI pipeline is a serverless service and the resource is automatically shut down after the execution.
This deletion step seems to be deleting the job record (not resource) from the pipeline history, which is not ideal for logging purpose.

@olex-snk olex-snk marked this pull request as draft July 15, 2025 16:29
@takumiohym takumiohym added the new label Jul 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants