Skip to content

Commit

Permalink
Rearrange dag flow (#27)
Browse files Browse the repository at this point in the history
* DocAI Form Parser microservice (#12)

* DocAI form parser processor integration

* form processor build conatiner image script

* DocAI form parser code integration

* DocAI Form Parser fixes

* Changes:
 - Re-sync'd the development constraints (shared) based on the form
   parser requirements.in
 - Moved requirements.txt to requirements.in for form parser
 - Updated tasks.py to also generate requirements.txt from
   requirements.in
 - Reformatted terraform from pre-commit

---------

Co-authored-by: Mark Scannell <[email protected]>

* down stream tasks only depends on the supported files are moved but will wait for pdf form processor to finish (#13)

* made downstream tasks only depends on files move but wait for pdf forms files moved

* ignore pylint import errors

* form-parser-metadata-load-bigquery

* Composer task to trigger Doc AI Form Parser Cloud Run- first version

* form-parser-metadata-load-bigquery (#18)

* form-parser-metadata-load-bigquery

* fixes in form parser

* Updated README.md

* Updated DPU to EKS and user agent string and label for revenue tracking

* skip pre-commit

* removing pre-commit check for terraform fmt

---------

Co-authored-by: Dharmesh Patel <[email protected]>

* Updated Ref. Arch. diagram and added DATAFLOW.md

* Changed labels to eks-solution

* Composer task to trigger DocAI Form Parser and metadata update for Form parser

* Updated labels for tracking (#19)

* DocAI form API microservice trigger from Cloud Composer

* fix in form parser

* Fixed type issue from assigning a `str | None` type to `str` type when reading environment variables. This is done by calling the `os.environ[]` instead of `os.environ.get()` method. This will fail fast if the environment variable does not exist.

* batch deletion based on batch-id (#16)

* location parameter and batch-id based deletion

* Updated the README.md for batch delete.

* updated the delete_doc.sh script

---------

Co-authored-by: Dharmesh Patel <[email protected]>

* Parallelized form parsing and docs parsing, including importing to the data store. (#21)

Co-authored-by: Eyal Ben Ivri <[email protected]>

* refactored many of the operations in the DAG to a utils package, to reduce complixty and code in the DAG file, and move logic to other files, where the logic is seperated from the airflow runtime.

* reordered dag steps and dependencies to optimize runtime

* down stream tasks only depends on the supported files are moved but will wait for pdf form processor to finish (#13)

* made downstream tasks only depends on files move but wait for pdf forms files moved

* ignore pylint import errors

* Composer task to trigger Doc AI Form Parser Cloud Run- first version

* Composer task to trigger DocAI Form Parser and metadata update for Form parser

* DocAI form API microservice trigger from Cloud Composer

* form-parser-metadata-load-bigquery (#18)

* form-parser-metadata-load-bigquery

* fixes in form parser

* Updated README.md

* Updated DPU to EKS and user agent string and label for revenue tracking

* skip pre-commit

* removing pre-commit check for terraform fmt

---------

Co-authored-by: Dharmesh Patel <[email protected]>

* Updated labels for tracking (#19)

* Fixed type issue from assigning a `str | None` type to `str` type when reading environment variables. This is done by calling the `os.environ[]` instead of `os.environ.get()` method. This will fail fast if the environment variable does not exist.

* Parallelized form parsing and docs parsing, including importing to the data store. (#21)

Co-authored-by: Eyal Ben Ivri <[email protected]>

* refactored many of the operations in the DAG to a utils package, to reduce complixty and code in the DAG file, and move logic to other files, where the logic is seperated from the airflow runtime.

* reordered dag steps and dependencies to optimize runtime

* removed commented out step

* added license information to new files.

* added license information to new files.

* Copy all files in `src` folder to `dags` folder in GCS

---------

Co-authored-by: anuradha-bajpai-google <[email protected]>
Co-authored-by: Mark Scannell <[email protected]>
Co-authored-by: Charlie Wang <[email protected]>
Co-authored-by: anuradha-bajpai-google <[email protected]>
Co-authored-by: Dharmesh Patel <[email protected]>
Co-authored-by: Mark Scannell <[email protected]>
Co-authored-by: Eyal Ben Ivri <[email protected]>
  • Loading branch information
8 people authored and eeaton committed Aug 30, 2024
1 parent df7ddd3 commit b0927d8
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 67 deletions.
13 changes: 13 additions & 0 deletions components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
13 changes: 13 additions & 0 deletions components/dpu-workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
13 changes: 13 additions & 0 deletions components/dpu-workflow/src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
168 changes: 114 additions & 54 deletions components/dpu-workflow/src/docs_processing_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,37 @@

# pylint: disable=import-error

from datetime import datetime, timedelta
from collections import defaultdict
import os
import random
import string
import sys
from datetime import datetime, timedelta

import pdf_classifier
from airflow import DAG # type: ignore
from airflow.models.param import Param # type: ignore
<<<<<<< HEAD
from airflow.operators.python import (
BranchPythonOperator, # type: ignore
PythonOperator,
)
=======
>>>>>>> 443f60a (Rearrange dag flow (#27))
from airflow.operators.dummy import DummyOperator # type: ignore
from airflow.operators.python import (
BranchPythonOperator, # type: ignore
PythonOperator,
)
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryCreateEmptyTableOperator # type: ignore
from airflow.providers.google.cloud.operators.cloud_run import \
CloudRunExecuteJobOperator # type: ignore
from airflow.providers.google.cloud.operators.gcs import \
GCSListObjectsOperator # type: ignore
from airflow.providers.google.cloud.transfers.gcs_to_gcs import \
GCSToGCSOperator # type: ignore
from airflow.utils.trigger_rule import TriggerRule # type: ignore
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyTableOperator # type: ignore
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator # type: ignore
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator # type: ignore
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator # type: ignore
from google.api_core.client_options import ClientOptions # type: ignore
from google.api_core.gapic_v1.client_info import ClientInfo # type: ignore
from google.cloud import discoveryengine, storage
from google.cloud import storage

from utils import pdf_classifier, file_utils, datastore_utils, cloud_run_utils

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

Expand All @@ -49,22 +57,17 @@
"retries": 0,
}

<<<<<<< HEAD
USER_AGENT = "cloud-solutions/eks-agent-builder-v1"


=======
>>>>>>> 443f60a (Rearrange dag flow (#27))
def get_supported_file_types(**context):
file_list = context["ti"].xcom_pull(task_ids="list_all_input_files")
file_type_to_processor = context["params"]["supported_files"]
supported_file_types = set(
item["file-suffix"].lower() for item in file_type_to_processor
)

files_by_type = defaultdict(list)
for input_file in file_list:
file_type = input_file.split(".")[-1].lower()
if file_type in supported_file_types:
files_by_type[file_type].append(input_file)

files_by_type = file_utils.supported_files_by_type(file_list,
file_type_to_processor)
context["ti"].xcom_push(key="types_to_process", value=files_by_type)


Expand All @@ -77,16 +80,16 @@ def has_files_to_process(**context):


def generate_process_folder(**context):
process_folder = f"docs-processing-{datetime.now().strftime('%d-%m-%Y')}-{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
process_folder = file_utils.get_random_process_folder_name()
context["ti"].xcom_push(key="process_folder", value=process_folder)


def generate_mv_params(**context):
files_to_process = context["ti"].xcom_pull(key="types_to_process")
process_folder = context["ti"].xcom_pull(key="process_folder")
input_folder = context["params"]["input_folder"]
input_folder_with_prefix = f"{input_folder}/" if input_folder else ""
process_bucket = os.environ.get("DPU_PROCESS_BUCKET")
<<<<<<< HEAD
parameter_obj_list = []

for typ in files_to_process.keys():
Expand All @@ -96,20 +99,25 @@ def generate_mv_params(**context):
"destination_object": f"{process_folder}/{typ}/",
}
parameter_obj_list.append(parameter_obj)
=======

parameter_obj_list = file_utils.get_mv_params(files_to_process,
input_folder,
process_bucket, process_folder)
>>>>>>> 443f60a (Rearrange dag flow (#27))
return parameter_obj_list


def data_store_import_docs(**context):
data_store_region = os.environ.get("DPU_DATA_STORE_REGION")
bq_table = context["ti"].xcom_pull(key="bigquery_table")

client_options = (
ClientOptions(
api_endpoint=f"{data_store_region}-discoveryengine.googleapis.com"
)
if data_store_region != "global"
else None
operation_name = datastore_utils.import_docs_to_datastore(
bq_table,
data_store_region,
os.environ.get("DPU_DATA_STORE_ID")
)
<<<<<<< HEAD

client = discoveryengine.DocumentServiceClient(
client_options=client_options, client_info=ClientInfo(user_agent=USER_AGENT)
Expand Down Expand Up @@ -138,6 +146,9 @@ def data_store_import_docs(**context):
print(response)
print(metadata)
return operation.operation.name
=======
return operation_name
>>>>>>> 443f60a (Rearrange dag flow (#27))


def generate_process_job_params(**context):
Expand All @@ -148,31 +159,13 @@ def generate_process_job_params(**context):
doc_processor_job_name = os.environ.get("DOC_PROCESSOR_JOB_NAME")
gcs_reject_bucket = os.environ.get("DPU_REJECT_BUCKET")

process_job_params = []
for mv_obj in mv_params:
job_param = {
"overrides": {
"container_overrides": [
{
"name": f"{doc_processor_job_name}",
"args": [
f"gs://{mv_obj['destination_bucket']}/{mv_obj['destination_object']}",
f"gs://{gcs_reject_bucket}/{mv_obj['destination_object']}",
"--write_json=False",
f"--write_bigquery={bq_table['project_id']}.{bq_table['dataset_id']}.{bq_table['table_id']}",
],
"clear_args": False,
}
],
"task_count": 1,
"timeout": "300s",
}
}
process_job_params.append(job_param)
process_job_params = cloud_run_utils.get_process_job_params(bq_table,
doc_processor_job_name,
gcs_reject_bucket, mv_params)
return process_job_params


def generete_output_table_name(**context):
def generate_output_table_name(**context):
process_folder = context["ti"].xcom_pull(key="process_folder")
output_table_name = process_folder.replace("-", "_")
context["ti"].xcom_push(key="output_table_name", value=output_table_name)
Expand All @@ -181,19 +174,32 @@ def generete_output_table_name(**context):
def generate_form_process_job_params(**context):
# Build BigQuery table id <project_id>.<dataset_id>.<table_id>
bq_table = context["ti"].xcom_pull(key="bigquery_table")
<<<<<<< HEAD
bq_table_id = (
f"{bq_table['project_id']}.{bq_table['dataset_id']}.{bq_table['table_id']}"
)
=======
>>>>>>> 443f60a (Rearrange dag flow (#27))

# Build GCS input and out prefix - gs://<process_bucket_name>/<process_folder>/pdf_forms/<input|output>
process_bucket = os.environ.get("DPU_PROCESS_BUCKET")
process_folder = context["ti"].xcom_pull(key="process_folder")
<<<<<<< HEAD
gcs_input_prefix = f"gs://{process_bucket}/{process_folder}/pdf-forms/input/"
gcs_output_prefix = f"gs://{process_bucket}/{process_folder}/pdf-forms/output/"

context["ti"].xcom_push(key="output_table_id", value=bq_table_id)
context["ti"].xcom_push(key="gcs_input_prefix", value=gcs_input_prefix)
context["ti"].xcom_push(key="gcs_output_prefix", value=gcs_output_prefix)
=======

form_parser_job_params = cloud_run_utils.forms_parser_job_params(
bq_table,
process_bucket,
process_folder,
)
return form_parser_job_params
>>>>>>> 443f60a (Rearrange dag flow (#27))


def generate_pdf_forms_folder(**context):
Expand All @@ -211,6 +217,7 @@ def generate_pdf_forms_list(**context):
location = context["params"]["pdf_classifier_location"]
processor_id = context["params"]["pdf_classifier_processor_id"]

<<<<<<< HEAD
pdf_forms_list = []
storage_client = storage.Client()
bucket = storage_client.bucket(process_bucket)
Expand Down Expand Up @@ -244,6 +251,10 @@ def generate_pdf_forms_list(**context):
pdf_forms_list.append(pdf_form)

return pdf_forms_list
=======
return pdf_classifier.get_forms_list(processor_id, project_id, location,
process_bucket, process_folder)
>>>>>>> 443f60a (Rearrange dag flow (#27))


with DAG(
Expand All @@ -257,7 +268,6 @@ def generate_pdf_forms_list(**context):
"supported_files": Param(
[
{"file-suffix": "pdf", "processor": "agent-builder"},
{"file-suffix": "docx", "processor": "agent-builder"},
{"file-suffix": "txt", "processor": "agent-builder"},
{"file-suffix": "html", "processor": "agent-builder"},
{"file-suffix": "msg", "processor": "dpu-doc-processor"},
Expand Down Expand Up @@ -349,17 +359,20 @@ def generate_pdf_forms_list(**context):
move_object=True,
).expand_kwargs(generate_pdf_forms_l.output)

<<<<<<< HEAD
move_files_done = DummyOperator(
task_id="move_files_done", trigger_rule=TriggerRule.ALL_SUCCESS
)
=======
>>>>>>> 443f60a (Rearrange dag flow (#27))

forms_pdf_moved_or_skipped = DummyOperator(
task_id="forms_pdf_moved_or_skipped", trigger_rule=TriggerRule.ALL_DONE
)

create_output_table_name = PythonOperator(
task_id="create_output_table_name",
python_callable=generete_output_table_name,
python_callable=generate_output_table_name,
provide_context=True,
)

Expand Down Expand Up @@ -418,14 +431,15 @@ def generate_pdf_forms_list(**context):
provide_context=True,
)

execute_forms_parser = CloudRunExecuteJobOperator(
execute_forms_parser = CloudRunExecuteJobOperator.partial(
# Calling os.environ[] instead of using the .get method to verify we
# have set environment variables, not allowing None values.
project_id=os.environ["GCP_PROJECT"],
region=os.environ["DPU_REGION"],
task_id="execute_forms_parser",
job_name=os.environ["FORMS_PARSER_JOB_NAME"],
deferrable=False,
<<<<<<< HEAD
overrides={
"container_overrides": [
{
Expand All @@ -448,21 +462,45 @@ def generate_pdf_forms_list(**context):
"task_count": 1,
"timeout": "300s",
},
=======
).expand_kwargs(
# Converting to a list, since expand_kwargs expects a list,
# even though we have only one task to execute.
[create_form_process_job_params.output]
>>>>>>> 443f60a (Rearrange dag flow (#27))
)

(
# initial common actions - ends with a decision whether to continue
# to basic processing, or stop working
GCS_Files
>> process_supported_types
>> has_files
>> [create_process_folder, skip_bucket_creation]
<<<<<<< HEAD
) # pyright: ignore[reportOperatorIssue]
(create_process_folder >> generate_files_move_parameters >> move_to_processing)
(
move_to_processing
=======
) # pyright: ignore[reportOperatorIssue]
(
# In the case we continue working, moving documents to processing
# folder, and creating an output table where metadata will be saved
create_process_folder
>> create_output_table_name
>> create_output_table
>> generate_files_move_parameters
>> move_to_processing

# We then want to see if there are any forms, since those will be
# handled differently.
>>>>>>> 443f60a (Rearrange dag flow (#27))
>> generate_pdf_forms_l
>> move_forms
>> forms_pdf_moved_or_skipped
)
<<<<<<< HEAD
(move_to_processing >> move_files_done)
(
[move_files_done, forms_pdf_moved_or_skipped]
Expand All @@ -477,7 +515,29 @@ def generate_pdf_forms_list(**context):
)
(
[create_output_table, move_forms]
=======

(
# Continue to process forms, depending on move_forms executed
# successfully. This doesn't have to wait for general processing.
move_forms
>>>>>>> 443f60a (Rearrange dag flow (#27))
>> create_form_process_job_params
>> execute_forms_parser
>> import_forms_to_data_store
)
<<<<<<< HEAD
=======

(
# General document processing has to wait for the forms to
# move/skipped, since we don't want to process the forms using this job.
forms_pdf_moved_or_skipped
>> create_process_job_params
>> execute_doc_processors
>> import_docs_to_data_store
)



>>>>>>> 443f60a (Rearrange dag flow (#27))
Loading

0 comments on commit b0927d8

Please sign in to comment.