Skip to content

Malformed stored log entry crashes task log fetch API with unhandled ValidationError #69304

Description

@hkc-8010

Apache Airflow version

main (Airflow 3.x)

What happened

When a task logs a non-string value as the sole log message (for example self.log.info(some_list) with no %s placeholder), Airflow 3's structured logging stores that value verbatim as the log event. When the log backend persists structured logs as JSON (as the Elasticsearch and OpenSearch remote log handlers do), that event field is stored as a JSON array/object instead of a string.

When the log is later read back through the task log API, both ElasticsearchTaskHandler._read() and OpensearchTaskHandler._read() construct a StructuredLogMessage directly from the stored fields:

StructuredLogMessage(**_build_log_fields(hit.to_dict()))

StructuredLogMessage.event (airflow-core/src/airflow/utils/log/file_task_handler.py) is a required str field with no coercion. A non-string stored value raises pydantic.ValidationError, which is not caught anywhere in the read path (_read()read_log_chunks/read_log_stream → the /dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs/{try_number} endpoint). The exception propagates unhandled and the entire log-fetch request 500s for that task attempt, instead of degrading gracefully for the one malformed line.

What you think should happen instead

A single malformed stored log entry should not fail the entire log-fetch request. Airflow already has a precedent for this exact situation: _log_stream_to_parsed_log_stream in file_task_handler.py catches ValidationError when parsing a stored log line and falls back to a stringified StructuredLogMessage. The Elasticsearch and OpenSearch task log handlers should apply the same fallback.

How to reproduce

  1. Run a task that logs a non-string object as the sole message argument, e.g. self.log.info(["a", "b"]), with the Elasticsearch or OpenSearch remote logging backend configured.
  2. Attempt to view that task's logs in the Airflow UI or via the task instance logs API.
  3. The request fails with a 500 and a traceback ending in pydantic_core._pydantic_core.ValidationError: 1 validation error for StructuredLogMessage / event / Input should be a valid string.

Operating System

N/A (server-side)

Versions of Apache Airflow Providers

apache-airflow-providers-elasticsearch, apache-airflow-providers-opensearch (both affected independently — each has its own copy of the same unguarded pattern)

Deployment

Other

Deployment details

No response

Anything else

This affects both providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py and providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py independently — they have separately duplicated _build_log_fields() implementations, not a shared one. Planning to submit two separate PRs, one per provider, both referencing this issue.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions