Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Add ability to update dag run note in PATCH dag_run endpoint #43508

Merged
Merged
10 changes: 9 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,15 @@ components:
DAGRunPatchBody:
properties:
state:
$ref: '#/components/schemas/DAGRunPatchStates'
anyOf:
- $ref: '#/components/schemas/DAGRunPatchStates'
- type: 'null'
note:
anyOf:
- type: string
maxLength: 1000
- type: 'null'
title: Note
type: object
required:
- state
Expand Down
33 changes: 24 additions & 9 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from fastapi import Depends, HTTPException, Query, Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload
from typing_extensions import Annotated

from airflow.api.common.mark_tasks import (
Expand Down Expand Up @@ -78,7 +78,10 @@ async def patch_dag_run_state(
update_mask: list[str] | None = Query(None),
) -> DAGRunResponse:
"""Modify a DAG Run."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
ALLOWED_FIELD_MASK = ["state", "note"]
dag_run = session.scalar(
select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_run_note))
)
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
Expand All @@ -90,21 +93,33 @@ async def patch_dag_run_state(
raise HTTPException(404, f"Dag with id {dag_id} was not found")

if update_mask:
if update_mask != ["state"]:
raise HTTPException(400, "Only `state` field can be updated through the REST API")
for each in update_mask:
if each not in ALLOWED_FIELD_MASK:
raise HTTPException(400, f"Invalid field `{each}` in update mask")
else:
update_mask = ["state"]
update_mask = ALLOWED_FIELD_MASK

for attr_name in update_mask:
attr_value = getattr(patch_body, attr_name)
if attr_name == "state":
state = getattr(patch_body, attr_name)
if state == DAGRunPatchStates.SUCCESS:
if attr_value is None:
raise HTTPException(400, "state cannot be empty when it is included in the update mask")
if attr_value == DAGRunPatchStates.SUCCESS:
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True)
elif state == DAGRunPatchStates.QUEUED:
elif attr_value == DAGRunPatchStates.QUEUED:
set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True)
else:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True)

elif attr_name == "note":
# Once Authentication is implemented in this FastAPI app,
# user id will be added when updating dag run note
# Refer to https://github.com/apache/airflow/issues/43534
if dag_run.dag_run_note is None:
dag_run.note = (attr_value, None)
else:
dag_run.dag_run_note.content = attr_value
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
dag_run.dag_run_note.user_id = None
session.commit()
dag_run = session.get(DagRun, dag_run.id)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/serializers/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class DAGRunPatchStates(str, Enum):
class DAGRunPatchBody(BaseModel):
"""DAG Run Serializer for PATCH requests."""

state: DAGRunPatchStates
state: DAGRunPatchStates | None
note: str | None = Field(None, max_length=1000)


class DAGRunResponse(BaseModel):
Expand Down
21 changes: 20 additions & 1 deletion airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,26 @@ export const $DAGResponse = {
export const $DAGRunPatchBody = {
properties: {
state: {
$ref: "#/components/schemas/DAGRunPatchStates",
anyOf: [
{
$ref: "#/components/schemas/DAGRunPatchStates",
},
{
type: "null",
},
],
},
note: {
anyOf: [
{
type: "string",
maxLength: 1000,
},
{
type: "null",
},
],
title: "Note",
},
},
type: "object",
Expand Down
3 changes: 2 additions & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ export type DAGResponse = {
* DAG Run Serializer for PATCH requests.
*/
export type DAGRunPatchBody = {
state: DAGRunPatchStates;
state: DAGRunPatchStates | null;
note?: string | null;
};

/**
Expand Down