-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add Cloud Composer Vertex AI Integration DAG #605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
olex-snk
wants to merge
18
commits into
master
Choose a base branch
from
osaienko/kubeflow_with_airflow_integration_demo
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
7b1c177
added_cloud_composer_vertex_ai_integration_dag
olex-snk 9a8a638
reformatted and fixed pylint errors
olex-snk 966213b
added params template
olex-snk b3ea16f
refactored
olex-snk 2b32f34
updated Readme file
olex-snk 7bc887a
style fixes
olex-snk 6ac17ac
wip-adding-solution-notebook
olex-snk 99ed4ae
moved_DAG_to_examples
olex-snk 6b1418b
added readme for dag example
olex-snk ce84d9a
fixed solution description
olex-snk b4e01a4
wip
olex-snk 6e2ada8
WIP_cleaning_and_refactoring
olex-snk 647071d
WIP_refactoring
olex-snk 6781cf7
refactored as a notebook solution
olex-snk e3952aa
Merge branch 'master' into osaienko/kubeflow_with_airflow_integration…
olex-snk 2575f23
fixed typos
olex-snk 9385c42
updated license
olex-snk 74bcb7d
addded_dags_folder_creation
olex-snk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
90 changes: 90 additions & 0 deletions
90
notebooks/kubeflow_pipelines/integration/cloud_composer/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # Orchestrating Vertex AI Pipelines with Cloud Composer | ||
|
|
||
| --- | ||
|
|
||
| This README provides an overview, setup instructions, and functionality details for an Apache Airflow DAG designed to integrate with Google Cloud's **Vertex AI Pipelines**. This DAG demonstrates a more comprehensive data preparation workflow before orchestrating the execution and management of a Kubeflow pipeline on Vertex AI. | ||
|
|
||
| ## Purpose | ||
|
|
||
| The primary purpose of this DAG is to showcase a robust MLOps workflow orchestrated by **Cloud Composer (Airflow)**. Specifically, it illustrates: | ||
|
|
||
| 1. **Data Ingestion (GCS to BigQuery)**: Loading raw data from a Google Cloud Storage (GCS) bucket into a BigQuery table. This step emulating basic ETL pipeline orchestrated by Cloud Composer. | ||
| 2. **Data Export (BigQuery to GCS)**: Exporting processed or prepared data from BigQuery back to GCS. This is a common pattern for creating datasets ready for consumption by ML training jobs or Vertex AI Pipelines. | ||
| 3. **Vertex AI Pipeline Execution**: Triggering a pre-compiled Kubeflow Pipeline (defined in a YAML file on GCS) on **Vertex AI Pipelines**. This allows for the execution of complex machine learning workflows, including data preprocessing, model training, evaluation, and deployment, using data prepared in the preceding steps. | ||
| 4. **Pipeline Job Management**: Demonstrating how to programmatically retrieve the status of a running Vertex AI Pipeline job and subsequently delete it. This is crucial for monitoring and cleanup in automated MLOps environments. | ||
|
|
||
| ## Setup | ||
|
|
||
| To set up and run this DAG, you'll need the following: | ||
|
|
||
| ### Google Cloud Project Configuration | ||
|
|
||
| 1. **Enable APIs**: Ensure the following Google Cloud APIs are enabled in your project: | ||
| * **Cloud Composer API** | ||
| * **Vertex AI API** | ||
| * **BigQuery API** | ||
| * **Cloud Storage API** | ||
| 2. **Service Account Permissions**: The service account associated with your Cloud Composer environment must have the necessary permissions to: | ||
| * Read from the specified GCS bucket (`storage.objects.get`, `storage.objects.list`). | ||
| * Write to BigQuery (`bigquery.datasets.create`, `bigquery.tables.create`, `bigquery.tables.updateData`). | ||
| * Read from BigQuery (`bigquery.tables.getData`, `bigquery.tables.list`). | ||
| * Write to GCS (`storage.objects.create`, `storage.objects.delete`). | ||
| * Run and manage Vertex AI Pipeline jobs (`aiplatform.pipelineJobs.create`, `aiplatform.pipelineJobs.get`, `aiplatform.pipelineJobs.delete`). | ||
| 3. **Cloud Composer Environment**: You need an active Cloud Composer environment. | ||
| 4. **Google Cloud Storage (GCS) Bucket**: | ||
| * The DAG expects a GCS bucket named `asl-public` containing the source dataset `data/covertype/dataset.csv`. You can replace this with your own bucket and data. | ||
| * You'll also need a pre-compiled Kubeflow Pipeline YAML file stored on GCS. Update the `VERTEX_AI_PIPELINE_YAML` variable in the DAG with the path to your compiled pipeline (e.g., `gs://your-bucket/path/to/covertype_kfp_pipeline.yaml`). | ||
| * The DAG will export data to the path specified by `GCS_TRAIN_DATASET_PATH` (e.g., `gs://.../train_export.csv`). Ensure the bucket exists and the service account has write permissions. | ||
|
|
||
| ### DAG Configuration | ||
|
|
||
| 1. **Update Placeholders**: | ||
| * `PROJECT_ID`: Replace `"...project id..."` with your actual Google Cloud **Project ID**. | ||
| * `REGION`: The region for your Vertex AI operations (e.g., `"us-central1"`). | ||
| * `GCS_VERTEX_AI_PIPELINE_YAML`: Replace `"gs://.../covertype_kfp_pipeline.yaml"` with the actual GCS path to your compiled Kubeflow pipeline YAML file. | ||
| * `GCS_SOURCE_DATASET_PATH` and `GCS_BUCKET_NAME`: Adjust if your source data is in a different location. | ||
| * `GCS_TRAIN_DATASET_PATH`: Update this to your desired GCS path for the exported training data. | ||
| * `BIGQUERY_DATASET_ID` and `TABLE_ID`: Customize these if you prefer different BigQuery dataset and table names. | ||
| 2. **Upload DAG**: Upload the DAG file (`composer_vertex_ai_pipelines.py`) to the `dags` folder of your Cloud Composer environment. | ||
|
|
||
| ### Kubeflow Pipeline (Pre-requisite) | ||
|
|
||
| This DAG assumes you have a pre-compiled Kubeflow Pipeline YAML file. This YAML file is typically generated from a Kubeflow Pipelines SDK definition in Python and compiled using `kfp.compiler.Compiler().compile()`. Ensure this compiled YAML is accessible at the `VERTEX_AI_PIPELINE_YAML` path specified in the DAG. The pipeline should be designed to accept `training_file_path` as a parameter. | ||
|
|
||
| ## Functionality | ||
|
|
||
| The `demo_vertex_ai_pipeline_integration` DAG consists of the following tasks: | ||
|
|
||
| 1. **`load_csv_to_bigquery`**: | ||
| * **Operator**: `GCSToBigQueryOperator` | ||
| * **Purpose**: This task transfers a CSV file (`data/covertype/dataset.csv` from the `asl-public` bucket) to a specified BigQuery table (`airflow_demo_dataset.covertype`). | ||
| * **Configuration**: It's configured to create the table if it doesn't exist and truncate it if it does, ensuring a fresh load for each run. It also handles skipping a header row. | ||
| * **Trigger**: This is the initial task, executing first. | ||
|
|
||
| 2. **`bigquery_to_gcs_export`**: | ||
| * **Operator**: `BigQueryToGCSOperator` | ||
| * **Purpose**: This task exports the data from the BigQuery table (`airflow_demo_dataset.covertype`) to a CSV file in GCS, specified by `params.gcs_train_dataset_path`. This emulates a data preparation step where data is transformed and then made available for downstream ML processes. | ||
| * **Trigger**: Executes once `load_csv_to_bigquery` successfully completes. | ||
|
|
||
| 3. **`start_vertex_ai_pipeline`**: | ||
| * **Operator**: `RunPipelineJobOperator` | ||
| * **Purpose**: After the data is exported to GCS, this task triggers a new **Vertex AI Pipeline** job using the specified compiled pipeline YAML file from GCS. | ||
| * **Parameter Passing**: It passes the GCS path of the exported training data as the `training_file_path` parameter to the Kubeflow pipeline. | ||
| * **Dynamic Naming**: The `display_name` is dynamically generated with a timestamp to ensure uniqueness for each pipeline run. | ||
| * **XCom**: The `pipeline_job_id` of the triggered pipeline is pushed to XCom, allowing subsequent tasks to reference this specific job. | ||
| * **Trigger**: Executes once `bigquery_to_gcs_export` successfully completes. | ||
|
|
||
| 4. **`vertex_ai_pipline_status`**: | ||
| * **Operator**: `GetPipelineJobOperator` | ||
| * **Purpose**: This task retrieves detailed information and the current status of the Vertex AI Pipeline job initiated by the previous task. It uses the `pipeline_job_id` pulled from XCom. | ||
| * **Trigger**: Executes once `start_vertex_ai_pipeline` successfully completes. | ||
|
|
||
| 5. **`delete_vertex_ai_pipeline_job`**: | ||
| * **Operator**: `DeletePipelineJobOperator` | ||
| * **Purpose**: This task cleans up the Vertex AI Pipeline job by deleting it. This is important for managing resources and keeping your Vertex AI environment tidy. | ||
| * **Trigger Rule**: `TriggerRule.ALL_DONE` ensures this task runs regardless of whether the preceding tasks succeeded or failed, as long as they have all completed their execution. This is a robust approach for cleanup tasks. | ||
| * **Trigger**: Executes once `vertex_ai_pipline_status` completes (or if any previous task fails, due to `ALL_DONE` trigger rule). | ||
|
|
||
| --- | ||
|
|
||
| This DAG provides a comprehensive example for integrating your MLOps data preparation and pipeline execution workflows with Google Cloud's powerful Vertex AI platform, all orchestrated seamlessly using Cloud Composer. | ||
Binary file added
BIN
+375 KB
...beflow_pipelines/integration/cloud_composer/img/airflow_vertexai_monitoring.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think deleting the pipeline is not necessary. Vertex AI pipeline is a serverless service and the resource is automatically shut down after the execution.
This deletion step seems to be deleting the job record (not resource) from the pipeline history, which is not ideal for logging purpose.