diff --git a/notebooks/kubeflow_pipelines/integration/cloud_composer/README.md b/notebooks/kubeflow_pipelines/integration/cloud_composer/README.md new file mode 100644 index 000000000..73ac02c15 --- /dev/null +++ b/notebooks/kubeflow_pipelines/integration/cloud_composer/README.md @@ -0,0 +1,90 @@ +# Orchestrating Vertex AI Pipelines with Cloud Composer + +--- + +This README provides an overview, setup instructions, and functionality details for an Apache Airflow DAG designed to integrate with Google Cloud's **Vertex AI Pipelines**. This DAG demonstrates a more comprehensive data preparation workflow before orchestrating the execution and management of a Kubeflow pipeline on Vertex AI. + +## Purpose + +The primary purpose of this DAG is to showcase a robust MLOps workflow orchestrated by **Cloud Composer (Airflow)**. Specifically, it illustrates: + +1. **Data Ingestion (GCS to BigQuery)**: Loading raw data from a Google Cloud Storage (GCS) bucket into a BigQuery table. This step emulating basic ETL pipeline orchestrated by Cloud Composer. +2. **Data Export (BigQuery to GCS)**: Exporting processed or prepared data from BigQuery back to GCS. This is a common pattern for creating datasets ready for consumption by ML training jobs or Vertex AI Pipelines. +3. **Vertex AI Pipeline Execution**: Triggering a pre-compiled Kubeflow Pipeline (defined in a YAML file on GCS) on **Vertex AI Pipelines**. This allows for the execution of complex machine learning workflows, including data preprocessing, model training, evaluation, and deployment, using data prepared in the preceding steps. +4. **Pipeline Job Management**: Demonstrating how to programmatically retrieve the status of a running Vertex AI Pipeline job and subsequently delete it. This is crucial for monitoring and cleanup in automated MLOps environments. + +## Setup + +To set up and run this DAG, you'll need the following: + +### Google Cloud Project Configuration + +1. **Enable APIs**: Ensure the following Google Cloud APIs are enabled in your project: + * **Cloud Composer API** + * **Vertex AI API** + * **BigQuery API** + * **Cloud Storage API** +2. **Service Account Permissions**: The service account associated with your Cloud Composer environment must have the necessary permissions to: + * Read from the specified GCS bucket (`storage.objects.get`, `storage.objects.list`). + * Write to BigQuery (`bigquery.datasets.create`, `bigquery.tables.create`, `bigquery.tables.updateData`). + * Read from BigQuery (`bigquery.tables.getData`, `bigquery.tables.list`). + * Write to GCS (`storage.objects.create`, `storage.objects.delete`). + * Run and manage Vertex AI Pipeline jobs (`aiplatform.pipelineJobs.create`, `aiplatform.pipelineJobs.get`, `aiplatform.pipelineJobs.delete`). +3. **Cloud Composer Environment**: You need an active Cloud Composer environment. +4. **Google Cloud Storage (GCS) Bucket**: + * The DAG expects a GCS bucket named `asl-public` containing the source dataset `data/covertype/dataset.csv`. You can replace this with your own bucket and data. + * You'll also need a pre-compiled Kubeflow Pipeline YAML file stored on GCS. Update the `VERTEX_AI_PIPELINE_YAML` variable in the DAG with the path to your compiled pipeline (e.g., `gs://your-bucket/path/to/covertype_kfp_pipeline.yaml`). + * The DAG will export data to the path specified by `GCS_TRAIN_DATASET_PATH` (e.g., `gs://.../train_export.csv`). Ensure the bucket exists and the service account has write permissions. + +### DAG Configuration + +1. **Update Placeholders**: + * `PROJECT_ID`: Replace `"...project id..."` with your actual Google Cloud **Project ID**. + * `REGION`: The region for your Vertex AI operations (e.g., `"us-central1"`). + * `GCS_VERTEX_AI_PIPELINE_YAML`: Replace `"gs://.../covertype_kfp_pipeline.yaml"` with the actual GCS path to your compiled Kubeflow pipeline YAML file. + * `GCS_SOURCE_DATASET_PATH` and `GCS_BUCKET_NAME`: Adjust if your source data is in a different location. + * `GCS_TRAIN_DATASET_PATH`: Update this to your desired GCS path for the exported training data. + * `BIGQUERY_DATASET_ID` and `TABLE_ID`: Customize these if you prefer different BigQuery dataset and table names. +2. **Upload DAG**: Upload the DAG file (`composer_vertex_ai_pipelines.py`) to the `dags` folder of your Cloud Composer environment. + +### Kubeflow Pipeline (Pre-requisite) + +This DAG assumes you have a pre-compiled Kubeflow Pipeline YAML file. This YAML file is typically generated from a Kubeflow Pipelines SDK definition in Python and compiled using `kfp.compiler.Compiler().compile()`. Ensure this compiled YAML is accessible at the `VERTEX_AI_PIPELINE_YAML` path specified in the DAG. The pipeline should be designed to accept `training_file_path` as a parameter. + +## Functionality + +The `demo_vertex_ai_pipeline_integration` DAG consists of the following tasks: + +1. **`load_csv_to_bigquery`**: + * **Operator**: `GCSToBigQueryOperator` + * **Purpose**: This task transfers a CSV file (`data/covertype/dataset.csv` from the `asl-public` bucket) to a specified BigQuery table (`airflow_demo_dataset.covertype`). + * **Configuration**: It's configured to create the table if it doesn't exist and truncate it if it does, ensuring a fresh load for each run. It also handles skipping a header row. + * **Trigger**: This is the initial task, executing first. + +2. **`bigquery_to_gcs_export`**: + * **Operator**: `BigQueryToGCSOperator` + * **Purpose**: This task exports the data from the BigQuery table (`airflow_demo_dataset.covertype`) to a CSV file in GCS, specified by `params.gcs_train_dataset_path`. This emulates a data preparation step where data is transformed and then made available for downstream ML processes. + * **Trigger**: Executes once `load_csv_to_bigquery` successfully completes. + +3. **`start_vertex_ai_pipeline`**: + * **Operator**: `RunPipelineJobOperator` + * **Purpose**: After the data is exported to GCS, this task triggers a new **Vertex AI Pipeline** job using the specified compiled pipeline YAML file from GCS. + * **Parameter Passing**: It passes the GCS path of the exported training data as the `training_file_path` parameter to the Kubeflow pipeline. + * **Dynamic Naming**: The `display_name` is dynamically generated with a timestamp to ensure uniqueness for each pipeline run. + * **XCom**: The `pipeline_job_id` of the triggered pipeline is pushed to XCom, allowing subsequent tasks to reference this specific job. + * **Trigger**: Executes once `bigquery_to_gcs_export` successfully completes. + +4. **`vertex_ai_pipline_status`**: + * **Operator**: `GetPipelineJobOperator` + * **Purpose**: This task retrieves detailed information and the current status of the Vertex AI Pipeline job initiated by the previous task. It uses the `pipeline_job_id` pulled from XCom. + * **Trigger**: Executes once `start_vertex_ai_pipeline` successfully completes. + +5. **`delete_vertex_ai_pipeline_job`**: + * **Operator**: `DeletePipelineJobOperator` + * **Purpose**: This task cleans up the Vertex AI Pipeline job by deleting it. This is important for managing resources and keeping your Vertex AI environment tidy. + * **Trigger Rule**: `TriggerRule.ALL_DONE` ensures this task runs regardless of whether the preceding tasks succeeded or failed, as long as they have all completed their execution. This is a robust approach for cleanup tasks. + * **Trigger**: Executes once `vertex_ai_pipline_status` completes (or if any previous task fails, due to `ALL_DONE` trigger rule). + +--- + +This DAG provides a comprehensive example for integrating your MLOps data preparation and pipeline execution workflows with Google Cloud's powerful Vertex AI platform, all orchestrated seamlessly using Cloud Composer. diff --git a/notebooks/kubeflow_pipelines/integration/cloud_composer/img/airflow_vertexai_monitoring.png b/notebooks/kubeflow_pipelines/integration/cloud_composer/img/airflow_vertexai_monitoring.png new file mode 100644 index 000000000..b81b178f5 Binary files /dev/null and b/notebooks/kubeflow_pipelines/integration/cloud_composer/img/airflow_vertexai_monitoring.png differ diff --git a/notebooks/kubeflow_pipelines/integration/cloud_composer/solutions/cloud_composer_orchestration_vertex.ipynb b/notebooks/kubeflow_pipelines/integration/cloud_composer/solutions/cloud_composer_orchestration_vertex.ipynb new file mode 100644 index 000000000..a382abe45 --- /dev/null +++ b/notebooks/kubeflow_pipelines/integration/cloud_composer/solutions/cloud_composer_orchestration_vertex.ipynb @@ -0,0 +1,475 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Orchestrating Vertex AI Pipelines with Cloud Composer" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook provides an example of integrating Cloud Composer with Vertex AI for automated MLOps workflows.\n", + "It demonstrates a common MLOps workflow on Google Cloud, leveraging **Cloud Composer** (managed Apache Airflow) \n", + "to orchestrate data loading, transformation, and the execution of a **Vertex AI Pipeline**.\n", + "\n", + "**Learning Objectives:**\n", + "1. Learn how to create a custom Directed acyclic graph (DAG) for Cloud Composer\n", + "2. Learn how to use Airflow operators to trigger Vertex AI Pipeline and monitor jobs status\n", + "3. Learn how to orchestrate Vertex AI Pipelines with existing ETL (Extract, Transform, Load) pipeline" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Important Notes:**\n", + "Airflow DAGs are typically uploaded directly to your Cloud Composer environment's GCS DAGs folder. Airflow workers then discover and parse these files. You do *not* run the DAG code directly from this notebook to execute the Airflow workflow." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Directed acyclic graph (DAG) overview and implementation details\n", + "\n", + "The provided DAG performs the following steps:\n", + "\n", + "1. **Load Data to BigQuery**: (`load_csv_to_bigquery`): Downloads a CSV dataset from Google Cloud Storage (GCS) and loads it into a BigQuery table. \n", + " ***This step emulates an ETL (Extract, Transform, Load) process for preparing data*** and load it to BigQuery.\n", + " * **Operator**: `GCSToBigQueryOperator`\n", + " * **Purpose**: This task transfers a CSV file (`data/covertype/dataset.csv` from the `asl-public` bucket) to a specified BigQuery table (`airflow_demo_dataset.covertype`).\n", + " * **Configuration**: It's configured to create the table if it doesn't exist and truncate it if it does, ensuring a fresh load for each run. It also handles skipping a header row.\n", + " * **Trigger**: This is the initial task, executing first.\n", + "\n", + "2. **Export Data from BigQuery to GCS** (`bigquery_to_gcs_export`): Exports the processed data from BigQuery back to GCS. This step prepares the data in a format suitable for consumption by a Vertex AI Pipeline.\n", + " * **Operator**: `BigQueryToGCSOperator`\n", + " * **Purpose**: This task exports the data from the BigQuery table (`airflow_demo_dataset.covertype`) to a CSV file in GCS, specified by `params.gcs_train_dataset_path`. This emulates a data preparation step where data is transformed and then made available for downstream ML processes.\n", + " * **Trigger**: Executes once `load_csv_to_bigquery` successfully completes.\n", + "\n", + "3. **Run Vertex AI Pipeline** (`start_vertex_ai_pipeline`): Triggers a pre-compiled Kubeflow Pipeline (KFP) on Vertex AI using a YAML file stored in GCS. This pipeline can encapsulate various machine learning tasks like training, evaluation, and deployment.\n", + " * **Operator**: `RunPipelineJobOperator`\n", + " * **Purpose**: After the data is exported to GCS, this task triggers a new **Vertex AI Pipeline** job using the specified compiled pipeline YAML file from GCS.\n", + " * **Parameter Passing**: It passes the GCS path of the exported training data (`params.gcs_train_dataset_path`) as the `training_file_path` parameter to the Kubeflow pipeline.\n", + " * **Dynamic Naming**: The `display_name` is dynamically generated with a timestamp to ensure uniqueness for each pipeline run.\n", + " * **XCom**: The `pipeline_job_id` of the triggered pipeline is pushed to XCom, allowing subsequent tasks to reference this specific job.\n", + " * **Trigger**: Executes once `bigquery_to_gcs_export` successfully completes.\n", + "\n", + "4. **Get Vertex AI Pipeline Status** (`vertex_ai_pipline_status`): Retrieves the status and details of the running Vertex AI Pipeline job.\n", + " * **Operator**: `GetPipelineJobOperator`\n", + " * **Purpose**: This task retrieves detailed information and the current status of the Vertex AI Pipeline job initiated by the previous task. It uses the `pipeline_job_id` pulled from XCom.\n", + " * **Trigger**: Executes once `start_vertex_ai_pipeline` successfully completes.\n", + "\n", + "5. **Delete Vertex AI Pipeline Job** (`delete_vertex_ai_pipeline_job`): Cleans up by deleting the Vertex AI Pipeline job.\n", + " * **Operator**: `DeletePipelineJobOperator`\n", + " * **Purpose**: This task cleans up the Vertex AI Pipeline job by deleting it. This is important for managing resources and keeping your Vertex AI environment tidy.\n", + " * **Trigger Rule**: `TriggerRule.ALL_DONE` ensures this task runs regardless of whether the preceding tasks succeeded or failed, as long as they have all completed their execution. This is a robust approach for cleanup tasks.\n", + " * **Trigger**: Executes once `vertex_ai_pipline_status` completes (or if any previous task fails, due to `ALL_DONE` trigger rule).\n", + "\n", + "---\n", + "\n", + "## Prerequisites\n", + "\n", + "Before deploying and running this DAG, ensure you have the following:\n", + "\n", + "* A **Google Cloud Project** with billing enabled.\n", + "* A **Cloud Composer environment** provisioned in your GCP project. (This notebook assumes the Cloud Composer instance is already created by following the instructions covered in the [Run an Apache Airflow DAG in Cloud Composer](https://cloud.google.com/composer/docs/composer-3/run-apache-airflow-dag). If you haven't run it, please create Cloud Composer instance using above instructions.)\n", + "* **Vertex AI API** enabled in your GCP project.\n", + "* **BigQuery API** enabled in your GCP project.\n", + "* A compiled **Kubeflow Pipeline YAML file** uploaded to a GCS bucket (e.g., `gs://your-bucket/covertype_kfp_pipeline.yaml`). This file should define all the steps of your Vertex AI Pipeline. its recommented to use Lab \"Continuous Training with Kubeflow Pipeline and Vertex AI\" from \"asl-ml-immersion/notebooks/kubeflow_pipelines/pipelines/solutions/kfp_pipeline_vertex_lightweight.ipynb\" notebook to create \"covertype_kfp_pipeline.yaml\"\n", + "\n", + "---\n", + "\n", + "## Setup and Configuration\n", + "\n", + "1. **Update Placeholders**:\n", + " In the next notebook cell replace the placeholder values with your specific project details:\n", + "\n", + " * `PROJECT_ID`: Replace `\"my_project_id\"` with your actual Google Cloud Project ID.\n", + " * `GCS_VERTEX_AI_PIPELINE_YAML`: Replace `gs://.../covertype_kfp_pipeline.yaml` with the GCS path to your compiled Kubeflow Pipeline YAML file.\n", + " * `GCS_TRAIN_DATASET_PATH`: Update `gs://.../train_export.csv` to the desired GCS path for the exported training data.\n", + " * `BIGQUERY_DATASET_ID`: Replace `airflow_demo_dataset` with the ID of your BigQuery dataset. If it doesn't exist, it will be created by the DAG.\n", + "\n", + "2. **Ensure IAM Permissions**:\n", + " The service account associated with your Cloud Composer environment must have the necessary IAM roles to:\n", + " * Read from and write to **BigQuery**.\n", + " * Read from and write to **Cloud Storage**.\n", + " * Create, run, and manage **Vertex AI Pipeline Jobs**.\n", + "\n", + " Recommended roles include:\n", + " * `BigQuery Data Editor`\n", + " * `Storage Object Admin`\n", + " * `Vertex AI User`\n" + ] + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2025-06-17T21:46:26.743439Z", + "start_time": "2025-06-17T21:46:26.621382Z" + } + }, + "cell_type": "code", + "source": [ + "#Creating ./dags folder\n", + "!mkdir dags" + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "mkdir: dags: File exists\r\n" + ] + } + ], + "execution_count": 7 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "### Create composer_vertex_ai_pipelines.py file:" + }, + { + "cell_type": "code", + "metadata": { + "tags": [], + "ExecuteTime": { + "end_time": "2025-06-17T21:47:23.299627Z", + "start_time": "2025-06-17T21:47:23.294799Z" + } + }, + "source": [ + "%%writefile ./dags/composer_vertex_ai_pipelines.py\n", + "# Copyright 2025 Google LLC\n", + "\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\"); you may not\n", + "# use this file except in compliance with the License. You may obtain a copy of\n", + "# the License at\n", + "\n", + "# https://www.apache.org/licenses/LICENSE-2.0\n", + "\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\"\n", + "# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either\n", + "# express or implied. See the License for the specific language governing\n", + "# permissions and limitations under the License.\n", + "\"\"\"Ae example of using Cloud Composer DAG for VertexAI Pipelines integration\"\"\"\n", + "\n", + "import datetime\n", + "\n", + "from airflow import DAG\n", + "from airflow.providers.google.cloud.operators.vertex_ai.pipeline_job import (\n", + " DeletePipelineJobOperator,\n", + " GetPipelineJobOperator,\n", + " RunPipelineJobOperator,\n", + ")\n", + "from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (\n", + " BigQueryToGCSOperator,\n", + ")\n", + "from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (\n", + " GCSToBigQueryOperator,\n", + ")\n", + "from airflow.utils.trigger_rule import TriggerRule\n", + "\n", + "# Replace with your actual project and region\n", + "# TODO: Put your project id here\n", + "PROJECT_ID = \"my_project_id\"\n", + "REGION = \"us-central1\"\n", + "\n", + "# TODO: Change path to the covertype_kfp_pipeline.yaml file:\n", + "GCS_VERTEX_AI_PIPELINE_YAML = \"gs:// ... /dags/covertype_kfp_pipeline.yaml\"\n", + "\n", + "GCS_SOURCE_DATASET_PATH = \"data/covertype/dataset.csv\"\n", + "GCS_BUCKET_NAME = \"asl-public\"\n", + "\n", + "# TODO: Put your BigQuery dataset id here:\n", + "BIGQUERY_DATASET_ID = \"airflow_demo_dataset\"\n", + "TABLE_ID = \"covertype\"\n", + "\n", + "# TODO: Put path for a train dataset\n", + "TRAINING_FILE_PATH = \"gs://.../train_export.csv\"\n", + "\n", + "BIGQUERY_TABLE_SCHEMA = (\n", + " [\n", + " {\"name\": \"Elevation\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " {\"name\": \"Aspect\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " {\"name\": \"Slope\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " {\n", + " \"name\": \"Horizontal_Distance_To_Hydrology\",\n", + " \"type\": \"INTEGER\",\n", + " \"mode\": \"NULLABLE\",\n", + " },\n", + " {\n", + " \"name\": \"Vertical_Distance_To_Hydrology\",\n", + " \"type\": \"INTEGER\",\n", + " \"mode\": \"NULLABLE\",\n", + " },\n", + " {\n", + " \"name\": \"Horizontal_Distance_To_Roadways\",\n", + " \"type\": \"INTEGER\",\n", + " \"mode\": \"NULLABLE\",\n", + " },\n", + " {\"name\": \"Hillshade_9am\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " {\"name\": \"Hillshade_Noon\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " {\"name\": \"Hillshade_3pm\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " {\n", + " \"name\": \"Horizontal_Distance_To_Fire_Points\",\n", + " \"type\": \"INTEGER\",\n", + " \"mode\": \"NULLABLE\",\n", + " },\n", + " {\"name\": \"Wilderness_Area\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"},\n", + " {\"name\": \"Soil_Type\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"},\n", + " {\"name\": \"Cover_Type\", \"type\": \"INTEGER\", \"mode\": \"NULLABLE\"},\n", + " ],\n", + ")\n", + "\n", + "default_args = {\n", + " 'retries': 0, # Disable retries\n", + "}\n", + "\n", + "with DAG(\n", + " dag_id=\"composer_vertex_ai_pipelines\",\n", + " start_date=datetime.datetime(2025, 1, 1),\n", + " schedule=None,\n", + " catchup=False,\n", + " default_args=default_args,\n", + " tags=[\"vertex_ai\", \"pipeline\", \"ml\"],\n", + ") as dag:\n", + "\n", + " # Load dataset from GCS to BigQuery (Emulating basic ETL process)\n", + " load_gcs_to_bigquery = GCSToBigQueryOperator(\n", + " task_id=\"load_csv_to_bigquery\",\n", + " bucket=GCS_BUCKET_NAME,\n", + " source_objects=[GCS_SOURCE_DATASET_PATH],\n", + " destination_project_dataset_table=f\"{BIGQUERY_DATASET_ID}.{TABLE_ID}\",\n", + " # Optional: Define schema, remove if auto-detect works for you\n", + " schema_fields=BIGQUERY_TABLE_SCHEMA,\n", + " # Or \"NEWLINE_DELIMITED_JSON\", \"PARQUET\", \"AVRO\", etc.\n", + " source_format=\"CSV\",\n", + " # Creates the table if it doesn't exist\n", + " create_disposition=\"CREATE_IF_NEEDED\",\n", + " # Overwrites the table if it exists. Use \"WRITE_APPEND\" to append.\n", + " write_disposition=\"WRITE_TRUNCATE\",\n", + " skip_leading_rows=1, # For CSVs with a header row\n", + " field_delimiter=\",\", # For CSVs\n", + " )\n", + "\n", + " # Export dataset from BigQuery to GCS\n", + " bigquery_to_gcs = BigQueryToGCSOperator(\n", + " task_id=\"bigquery_to_gcs_export\",\n", + " source_project_dataset_table=f\"{BIGQUERY_DATASET_ID}.{TABLE_ID}\",\n", + " destination_cloud_storage_uris=TRAINING_FILE_PATH,\n", + " export_format=\"CSV\",\n", + " print_header=True,\n", + " )\n", + "\n", + " # Trigger the pipeline with a GCS compiled yaml file\n", + " run_vertex_ai_pipeline = RunPipelineJobOperator(\n", + " task_id=\"start_vertex_ai_pipeline\",\n", + " project_id=PROJECT_ID,\n", + " region=REGION,\n", + " template_path=GCS_VERTEX_AI_PIPELINE_YAML,\n", + " # example of passing params to kubeflow pipeline to override default values:\n", + " parameter_values={\n", + " \"training_file_path\": TRAINING_FILE_PATH,\n", + " },\n", + " #turn on caching for the run\n", + " enable_caching=False,\n", + " # Unique display name\n", + " display_name=\"triggered-demo-pipeline-{{ ts_nodash }}\",\n", + " )\n", + "\n", + " # Get VertexAI pipeline job information\n", + " get_vertexai_ai_pipline_status = GetPipelineJobOperator(\n", + " task_id=\"vertex_ai_pipline_status\",\n", + " project_id=PROJECT_ID,\n", + " region=REGION,\n", + " pipeline_job_id=\"{{ task_instance.xcom_pull(\"\n", + " \"task_ids='start_vertex_ai_pipeline', \"\n", + " \"key='pipeline_job_id') }}\",\n", + " )\n", + "\n", + " # Delete VertexAI pipeline job\n", + " delete_pipeline_job = DeletePipelineJobOperator(\n", + " task_id=\"delete_vertex_ai_pipeline_job\",\n", + " project_id=PROJECT_ID,\n", + " region=REGION,\n", + " pipeline_job_id=\"{{ task_instance.xcom_pull(\"\n", + " \"task_ids='start_vertex_ai_pipeline', \"\n", + " \"key='pipeline_job_id') }}\",\n", + " trigger_rule=TriggerRule.ALL_DONE,\n", + " )\n", + "\n", + " # Combine all steps into a DAG\n", + " (\n", + " load_gcs_to_bigquery\n", + " >> bigquery_to_gcs\n", + " >> run_vertex_ai_pipeline\n", + " >> get_vertexai_ai_pipline_status\n", + " >> delete_pipeline_job\n", + " )" + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Overwriting ./dags/composer_vertex_ai_pipelines.py\n" + ] + } + ], + "execution_count": 9 + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Airflow DAG Code\n", + "Inspect saved Airflow DAG .py file (`./dags/composer_vertex_ai_pipelines.py`) that you intend to upload to your Cloud Composer environment." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Uploading the DAG to Cloud Composer Storage\n", + "\n", + "To deploy this DAG to your Cloud Composer environment, you need to upload it to the DAGs folder in your Composer's associated Cloud Storage bucket.\n", + "\n", + "**Before running this cell, make sure you identified your Composer DAGs bucket:**\n", + "This is typically named `gs://us-central1-YOUR_COMPOSER_ENV_NAME-HASH-bucket/dags/`. \n", + "You can find this in the Cloud Composer console." + ] + }, + { + "cell_type": "code", + "metadata": { + "tags": [], + "ExecuteTime": { + "end_time": "2025-06-11T10:57:13.581816Z", + "start_time": "2025-06-11T10:57:13.578706Z" + } + }, + "source": [ + "# TODO: put your Cloud Composer Bucket here:\n", + "CLOUD_COMPOSER_BUCKET = \"gs://...-bucket\"\n", + "CLOUD_COMPOSER_DAGS_PATH = f\"{CLOUD_COMPOSER_BUCKET}/dags\"\n", + "%env CLOUD_COMPOSER_DAGS_PATH={CLOUD_COMPOSER_DAGS_PATH}" + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "env: CLOUD_COMPOSER_DAGS_PATH=gs://...-bucket/dags\n" + ] + } + ], + "execution_count": 1 + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Copying file://./dags/composer_vertex_ai_pipelines.py [Content-Type=text/x-python]...\n", + "/ [1 files][ 6.2 KiB/ 6.2 KiB] \n", + "Operation completed over 1 objects/6.2 KiB. \n" + ] + } + ], + "source": [ + "%%bash\n", + "\n", + "gsutil cp ./dags/composer_vertex_ai_pipelines.py $CLOUD_COMPOSER_DAGS_PATH" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running the DAG\n", + "\n", + "You can trigger the DAG manually from the Airflow UI:\n", + "\n", + "1. Navigate to your Cloud Composer environment in the Google Cloud Console.\n", + "2. Click on the \"Airflow UI\" link.\n", + "3. In the Airflow UI, find the `composer_vertex_ai_pipelines` DAG.\n", + "4. Toggle the DAG to \"On\" if it's not already.\n", + "5. Click the \"Trigger DAG\" button.\n", + "\n", + "You can also schedule the DAG by uncommenting and configuring the `schedule_interval` parameter in the DAG definition.\n", + "\n", + "---\n", + "\n", + "## Monitoring\n", + "\n", + "Monitor the DAG run from the Airflow UI. You can view the status of each task, logs, and XCom values. For Vertex AI Pipeline job details, you can refer to the Vertex AI section in the Google Cloud Console.\n", + "\n", + "---\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright 2025 Google LLC\n", + "\n", + "Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "you may not use this file except in compliance with the License.\n", + "You may obtain a copy of the License at\n", + "\n", + " https://www.apache.org/licenses/LICENSE-2.0\n", + "\n", + "Unless required by applicable law or agreed to in writing, software\n", + "distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "See the License for the specific language governing permissions and\n", + "limitations under the License." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "environment": { + "kernel": "conda-base-py", + "name": "workbench-notebooks.m129", + "type": "gcloud", + "uri": "us-docker.pkg.dev/deeplearning-platform-release/gcr.io/workbench-notebooks:m129" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "conda-base-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.16" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}