Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] Upsert metric for execution #85

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

RissyRan
Copy link
Collaborator

@RissyRan RissyRan commented Jan 23, 2024

Description

Context: at high level, in prod stage,it should only produce 1 metric record for an execution (i.e. a DAG is scheduled on daily basis, and there should only have 1 record from day 2024-1-4, and 1 record from day 2024-1-5 etc. In the current implementation, if we clear the old execution, and a new metric record will be inserted into the BigQuery table. So duplicate metrics occur for a single execution.

Upsert metric for execution by deleting the existing ones (if any) and insert new metrics.

  • Update process_id from random uuid to a id based on benchmark_id and run_id
  • Add delete() method with a known issue in BigQuery (cannot delete up to 90min after data insertion)
  • Add unit test for the method

Tests

Please describe the tests that you ran on Cloud VM to verify changes.

Instruction and/or command lines to reproduce your tests: ...
Upload to Airflow and test

List links for your tests (use go/shortn-gen for any internal link): ...
If new record or existing records can be deleted (tested both cases), we get logs - link:

[2024-01-24, 01:51:31 UTC] {metric.py:444} INFO - The run_model state is success, and the job status is success.
[2024-01-24, 01:51:31 UTC] {logging_mixin.py:150} INFO - Test run rows: [TestRun(job_history=JobHistoryRow(uuid='c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde', timestamp=datetime.datetime(2024, 1, 24, 1, 51, 31, 557515), owner='Shiva S.', job_name='flax_resnet_imagenet-v2-8', job_status=0), metric_history=[], metadata_history=[MetadataHistoryRow(job_uuid='c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde', metadata_key='run_id', metadata_value='manual__2024-01-22T19:29:46.899501+00:00'), MetadataHistoryRow(job_uuid='c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde', metadata_key='prev_start_date_success', metadata_value='2024-01-22T19:28:52.539430+00:00'), MetadataHistoryRow(job_uuid='c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde', metadata_key='airflow_dag_run_link', metadata_value='https://7ef24502ca144d038c5754964af60450-dot-us-central1.composer.googleusercontent.com/dags/flax_latest_supported/grid?dag_run_id=manual__2024-01-22T19%3A29%3A46.899501%2B00%3A00&task_id=flax_resnet_imagenet-v2-8.post_process.process_metrics')])]
[2024-01-24, 01:51:34 UTC] {bigquery.py:140} INFO - No matching records or successfully deleted records in cloud-ml-auto-solutions.xlml_dataset.job_history with id c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde.
[2024-01-24, 01:51:37 UTC] {bigquery.py:140} INFO - No matching records or successfully deleted records in cloud-ml-auto-solutions.xlml_dataset.metric_history with id c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde.
[2024-01-24, 01:51:39 UTC] {bigquery.py:140} INFO - No matching records or successfully deleted records in cloud-ml-auto-solutions.xlml_dataset.metadata_history with id c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde.
[2024-01-24, 01:51:39 UTC] {bigquery.py:182} INFO - Inserting 1 rows into BigQuery table cloud-ml-auto-solutions.xlml_dataset.job_history.
[2024-01-24, 01:51:39 UTC] {bigquery.py:189} INFO - Successfully added rows to Bigquery.
[2024-01-24, 01:51:39 UTC] {bigquery.py:182} INFO - Inserting 3 rows into BigQuery table cloud-ml-auto-solutions.xlml_dataset.metadata_history.
[2024-01-24, 01:51:39 UTC] {bigquery.py:189} INFO - Successfully added rows to Bigquery.

If within 90min buffer period and we cannot delete, we will get logs - link task will fail:

2024-01-23, 20:13:08 UTC] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/xlml/utils/bigquery.py", line 138, in delete
    result = query_job.result()
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1580, in result
    do_get_result()
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 372, in retry_wrapped_func
    return retry_target(
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 207, in retry_target
    result = target()
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1570, in do_get_result
    super(QueryJob, self).result(retry=retry, timeout=timeout)
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 922, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 UPDATE or DELETE statement over table cloud-ml-auto-solutions.xlml_dataset.job_history would affect rows in the streaming buffer, which is not supported

Location: US
Job ID: 6300ca55-b3dc-47a2-b0a3-ced29c8377d7


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/decorators/base.py", line 220, in execute
    return_value = super().execute(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 181, in execute
    return_value = self.execute_callable()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 198, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/xlml/utils/metric.py", line 536, in process_metrics
    bigquery_metric.delete(row_ids)
  File "/home/airflow/gcs/dags/xlml/utils/bigquery.py", line 144, in delete
    raise RuntimeError(
RuntimeError: Failed to delete records in cloud-ml-auto-solutions.xlml_dataset.job_history with id c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde and error: `400 UPDATE or DELETE statement over table cloud-ml-auto-solutions.xlml_dataset.job_history would affect rows in the streaming buffer, which is not supported

Location: US
Job ID: 6300ca55-b3dc-47a2-b0a3-ced29c8377d7
` Please note you cannot delete or update table in the streaming buffer period, which can last up to 90 min after data insertion.
[2024-01-23, 20:13:08 UTC] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=flax_latest_supported, task_id=flax_resnet_imagenet-v2-8.post_process.process_metrics, execution_date=20240122T192946, start_date=20240123T201306, end_date=20240123T201308
[2024-01-23, 20:13:09 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 21959 for task flax_resnet_imagenet-v2-8.post_process.process_metrics (Failed to delete records in cloud-ml-auto-solutions.xlml_dataset.job_history with id c7fb75ba32200c2aa3f2e6b285c8248a998e43782061361409ed936df541dcde and error: `400 UPDATE or DELETE statement over table cloud-ml-auto-solutions.xlml_dataset.job_history would affect rows in the streaming buffer, which is not supported

Location: US
Job ID: 6300ca55-b3dc-47a2-b0a3-ced29c8377d7
` Please note you cannot delete or update table in the streaming buffer period, w

Checklist

Before submitting this PR, please make sure (put X in square brackets):

  • I have performed a self-review of my code.
  • I have necessary comments in my code, particularly in hard-to-understand areas.
  • I have run one-shot tests and provided workload links above if applicable.
  • I have made or will make corresponding changes to the doc if needed.

Copy link
Contributor

@will-cromar will-cromar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, BQ doesn't support upserts directly. I found a couple of answers on the forums and stack overflow recommending using MERGE to mimic an upsert. Would this be more consistent than check/delete/insert? https://www.googlecloudcommunity.com/gc/Data-Analytics/How-to-UPSERT-in-BigQuery/m-p/547777

xlml/utils/metric.py Outdated Show resolved Hide resolved
xlml/utils/bigquery.py Show resolved Hide resolved
@RissyRan RissyRan changed the title Upsert metric for execution [WIP] Upsert metric for execution Jan 23, 2024
@RissyRan
Copy link
Collaborator Author

Unfortunately, BQ doesn't support upserts directly. I found a couple of answers on the forums and stack overflow recommending using MERGE to mimic an upsert. Would this be more consistent than check/delete/insert? https://www.googlecloudcommunity.com/gc/Data-Analytics/How-to-UPSERT-in-BigQuery/m-p/547777

I see that thread too. But from the BigQuery official tutorial, Merge statement won't work (link). But the Storage Write API works (please see here). If we want to adapt the method, we need to do following 3 steps: 1) open a stream; 2) send a protocol buffer; 3) close the stream (see here). That seems a big change to our existing system. I doubt if we should onboard it at this moment. Thoughts?

@RissyRan RissyRan force-pushed the upsert_metric branch 2 times, most recently from f43279f to e35e0e1 Compare January 24, 2024 02:03
@RissyRan RissyRan changed the title [WIP] Upsert metric for execution Upsert metric for execution Jan 24, 2024
@RissyRan RissyRan requested a review from zpcore January 24, 2024 19:43
@RissyRan RissyRan force-pushed the upsert_metric branch 3 times, most recently from 7472154 to 0f50195 Compare January 26, 2024 03:49
@RissyRan RissyRan changed the title Upsert metric for execution Update uuid to be unique for a specific execution Jan 26, 2024
@RissyRan RissyRan changed the title Update uuid to be unique for a specific execution Update uuid in dataset to be unique for a specific run Jan 26, 2024
@RissyRan RissyRan changed the title Update uuid in dataset to be unique for a specific run Upsert metric for execution Jan 26, 2024
@RissyRan RissyRan changed the title Upsert metric for execution [Do not merge] Upsert metric for execution Feb 2, 2024
@RissyRan RissyRan self-assigned this Feb 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants