Skip to content

Fix XCom migration failing for NaN/Infinity float values#62686

Open
jedcunningham wants to merge 2 commits intoapache:mainfrom
astronomer:fix_xcom_migration_nan_inf
Open

Fix XCom migration failing for NaN/Infinity float values#62686
jedcunningham wants to merge 2 commits intoapache:mainfrom
astronomer:fix_xcom_migration_nan_inf

Conversation

@jedcunningham
Copy link
Member

XCom values containing float('nan'), float('inf'), or float('-inf') caused the database migration to silently corrupt data or fail outright when upgrading. Three bugs were present across backends:

  • Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced, leaving bare NaN/Infinity in the output and breaking the JSON cast.
  • Infinity and -Infinity were not handled at all — only NaN was.
  • Bare top-level values (a single NaN or Infinity, not inside a list or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce the wrong output (one of these was pre-existing from #57866).

An example of a failing Infinity in xcom:

2026-03-02T03:23:06.876232Z [info     ] Running upgrade 9fc3fc5de720 -> eed27faa34e3, Remove pickled data from xcom table. [alembic.runtime.migration] loc=m
igration.py:621
Traceback (most recent call last):
  File "/usr/python/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/python/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type json
DETAIL:  Token "Infinity" is invalid.
CONTEXT:  JSON data, line 1: Infinity
…
sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type json
DETAIL:  Token "Infinity" is invalid.
CONTEXT:  JSON data, line 1: Infinity

[SQL:
            ALTER TABLE xcom
            ALTER COLUMN value TYPE JSONB
            USING CASE
                WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSONB)
                ELSE NULL
            END
            ]
(Background on this error at: https://sqlalche.me/e/20/9h9h)

A Dag that can be used to test:

from __future__ import annotations

try:
    from airflow.sdk import DAG, task
except ImportError:
    from airflow import DAG
    from airflow.decorators import task


@task
def push_nan_values():
    return {
        "nan_value": float("nan"),
        "nested": {
            "stats": {
                "count": 1.0,
                "mean": 0.196,
                "std": float("nan"),
            },
            "list_of_nans": [float("nan"), float("nan"), float("nan")],
        },
        "normal_value": 42,
    }


@task
def push_inf_values():
    return {
        "inf_value": float("inf"),
        "neg_inf_value": float("-inf"),
        "nested": {
            "stats": {
                "count": 1.0,
                "mean": 0.196,
                "min": float("-inf"),
                "max": float("inf"),
            },
            "list_of_infs": [float("inf"), float("-inf"), float("inf"), float("-inf")],
        },
        "normal_value": 42,
    }


@task
def push_top_level_nan():
    return float("nan")


@task
def push_top_level_inf():
    return float("inf")



with DAG(dag_id="nan_xcom", schedule=None):
    push_nan_values()


with DAG(dag_id="infinity_xcom", schedule=None):
    push_inf_values()


with DAG(dag_id="top_level_nan_xcom", schedule=None):
    push_top_level_nan()


with DAG(dag_id="top_level_inf_xcom", schedule=None):
    push_top_level_inf()


Was generative AI tooling used to co-author this PR?
  • Yes

Generated-by: Cursor

XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from apache#57866).
@uranusjr uranusjr added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Mar 2, 2026
@vatsrahul1001
Copy link
Contributor

I was trying to test with example dag mentioned in PR, but the worker is crashing

airflow celery worker
[Breeze:3.10.19] root@946185dbe688:/op
t/airflow$ airflow
celery worker
2026-03-02T11:41:32.934963Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-03-02T11:41:32.935112Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-03-02T11:41:32.935173Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-03-02T11:41:32.935221Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-03-02T11:41:32.935272Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-03-02T11:41:32.935326Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
2026-03-02T11:41:33.735984Z [info     ] starting stale bundle cleanup process [airflow.providers.celery.cli.celery_command] loc=celery_command.py:141
2026-03-02T11:41:33.740551Z [info     ] Starting log server on http://[::]:8793 [airflow.utils.serve_logs.core] loc=core.py:50
WARNING:  ASGI app factory detected. Using it, but please consider setting the --factory flag explicitly.
INFO:     Started server process [229]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://:8793 (Press CTRL+C to quit)
2026-03-02T11:41:35.700076Z [warning  ] urllib3 (2.6.3) or chardet (6.0.0.post1)/charset_normalizer (3.4.4) doesn't match a supported version! [py.warnings] category=RequestsDependencyWarning filename=/usr/python/lib/python3.10/site-packages/requests/__init__.py lineno=113
2026-03-02T11:41:36.213664Z [warning  ] You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0
 [py.warnings] category=SecurityWarning filename=/usr/python/lib/python3.10/site-packages/celery/platforms.py lineno=841

 -------------- celery@946185dbe688 v5.6.2 (recovery)
--- ***** -----
-- ******* ---- Linux-6.13.7-orbstack-00283-g9d1400e7e9c6-aarch64-with-glibc2.36 2026-03-02 11:41:36
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.providers.celery.executors.celery_executor:0xffffa430ff40
- ** ---------- .> transport:   redis://redis:6379/0
- ** ---------- .> results:     postgresql+psycopg2://postgres:**@postgres/airflow
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . execute_workload

2026-03-02T11:41:37.602510Z [info     ] Connected to redis://redis:6379/0 [celery.worker.consumer.connection] loc=connection.py:22
2026-03-02T11:41:37.604243Z [info     ] mingle: searching for neighbors [celery.worker.consumer.mingle] loc=mingle.py:40
2026-03-02T11:41:38.611854Z [info     ] mingle: all alone              [celery.worker.consumer.mingle] loc=mingle.py:49
2026-03-02T11:41:38.630845Z [info     ] celery@946185dbe688 ready.     [celery.apps.worker] loc=worker.py:176
2026-03-02T11:45:17.180575Z [info     ] Task execute_workload[ab776e19-daea-48d4-8d88-a489beb57c4d] received [celery.worker.strategy] loc=strategy.py:161
2026-03-02T11:45:17.218643Z [info     ] [ab776e19-daea-48d4-8d88-a489beb57c4d] Executing workload in Celery: token='eyJ***' dag_rel_path=PurePosixPath('nan_dags.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=infinity_xcom/run_id=manual__2026-03-02T11:45:16.004390+00:00/task_id=push_inf_values/attempt=1.log' ti=TaskInstanceDTO(id=UUID('019cae5d-e8fe-7f59-bd2f-78bd8d52eccb'), dag_version_id=UUID('019cae36-5158-72e0-9b4f-b4e9bee75d2e'), task_id='push_inf_values', dag_id='infinity_xcom', run_id='manual__2026-03-02T11:45:16.004390+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}) sentry_integration='sentry_sdk.integrations.celery.CeleryIntegration' type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] loc=celery_executor_utils.py:193
2026-03-02T11:45:17.252991Z [info     ] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:2081
2026-03-02T11:45:19.129578Z [error    ] Task execute_workload[ab776e19-daea-48d4-8d88-a489beb57c4d] raised unexpected: ValueError('Out of range float values are not JSON compliant') [celery.app.trace] loc=trace.py:285
Traceback (most recent call last):
  File "/usr/python/lib/python3.10/site-packages/celery/app/trace.py", line 479, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/celery/app/trace.py", line 779, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py", line 202, in execute_workload
    supervise(
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 2100, in supervise
    exit_code = process.wait()
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 1060, in wait
    self._monitor_subprocess()
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 1125, in _monitor_subprocess
    alive = self._service_subprocess(max_wait_time=max_wait_time) is None
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 801, in _service_subprocess
    need_more = socket_handler(key.fileobj)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 1861, in cb
    gen.send(request)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 642, in handle_requests
    self._handle_request(msg, log, request.id)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 1339, in _handle_request
    self.client.xcoms.set(
  File "/opt/airflow/task-sdk/src/airflow/sdk/api/client.py", line 530, in set
    self.client.post(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", params=params, json=value)
  File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 1144, in post
    return self.request(
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 331, in wrapped_f
    return copy(f, *args, **kw)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 470, in __call__
    do = self.iter(retry_state=retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 371, in iter
    result = action(retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 393, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 473, in __call__
    result = fn(*args, **kwargs)
  File "/opt/airflow/task-sdk/src/airflow/sdk/api/client.py", line 961, in request
    return super().request(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 812, in request
    request = self.build_request(
  File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 378, in build_request
    return Request(
  File "/usr/python/lib/python3.10/site-packages/httpx/_models.py", line 408, in __init__
    headers, stream = encode_request(
  File "/usr/python/lib/python3.10/site-packages/httpx/_content.py", line 216, in encode_request
    return encode_json(json)
  File "/usr/python/lib/python3.10/site-packages/httpx/_content.py", line 177, in encode_json
    body = json_dumps(
  File "/usr/python/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/python/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/python/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
ValueError: Out of range float values are not JSON compliant


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:db-migrations PRs with DB migration backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch kind:documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants