Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@

from airflow import settings
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.common.compat.sdk import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations

Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are unresolved conflict markers here.

Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,27 @@
from celery import states as celery_states
from deprecated import deprecated

<<<<<<< HEAD
from airflow.configuration import conf
=======
from airflow.cli.cli_config import (
ARG_DAEMON,
ARG_LOG_FILE,
ARG_PID,
ARG_SKIP_SERVE_LOGS,
ARG_STDERR,
ARG_STDOUT,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)
>>>>>>> e0223d0393 (fix(providers/celery): Migrate conf imports to SDK compatibility layer)
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.common.compat.sdk import AirflowTaskTimeout, Stats
from airflow.providers.common.compat.sdk import AirflowTaskTimeout, Stats, conf
from airflow.utils.state import TaskInstanceState

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,6 +95,19 @@ def __getattr__(name):
airflow celery worker
"""

from airflow.cli.cli_config import (
ARG_DAEMON,
ARG_LOG_FILE,
ARG_PID,
ARG_SKIP_SERVE_LOGS,
ARG_STDERR,
ARG_STDOUT,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)

class CeleryExecutor(BaseExecutor):
"""
Expand Down Expand Up @@ -108,15 +137,15 @@ def __init__(self, *args, **kwargs):
# Celery doesn't support bulk sending the tasks (which can become a bottleneck on bigger clusters)
# so we use a multiprocessing pool to speed this up.
# How many worker processes are created for checking celery task state.
self._sync_parallelism = conf.getint("celery", "SYNC_PARALLELISM")
self._sync_parallelism = conf.getint("celery", "SYNC_PARALLELISM", fallback=0)
if self._sync_parallelism == 0:
self._sync_parallelism = max(1, cpu_count() - 1)
from airflow.providers.celery.executors.celery_executor_utils import BulkStateFetcher

self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism)
self.tasks = {}
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries")
self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries", fallback=3)

def start(self) -> None:
self.log.debug("Starting Celery Executor using %s processes for syncing", self._sync_parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@
from celery.signals import import_modules as celery_import_modules
from sqlalchemy import select

from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout, Stats, timeout
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout, Stats, conf, timeout
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand Down Expand Up @@ -77,7 +76,7 @@

TaskTuple = tuple[TaskInstanceKey, CommandType, str | None, Any | None]

OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout")
OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout", fallback=1.0)

# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"
Expand All @@ -97,7 +96,7 @@ def get_celery_configuration() -> dict[str, Any]:
@providers_configuration_loaded
def _get_celery_app() -> Celery:
"""Init providers before importing the configuration, so the _SECRET and _CMD options work."""
celery_app_name = conf.get("celery", "CELERY_APP_NAME")
celery_app_name = conf.get("celery", "CELERY_APP_NAME", fallback="airflow.executors.celery_executor")

return Celery(celery_app_name, config_source=get_celery_configuration())

Expand Down Expand Up @@ -139,8 +138,8 @@ def on_celery_import_modules(*args, **kwargs):
def execute_workload(input: str) -> None:
from pydantic import TypeAdapter

from airflow.configuration import conf
from airflow.executors import workloads
from airflow.providers.common.compat.sdk import conf
from airflow.sdk.execution_time.supervisor import supervise

decoder = TypeAdapter[workloads.All](workloads.All)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@

from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.executors.celery_executor import AIRFLOW_V_3_0_PLUS, CeleryExecutor
from airflow.providers.common.compat.sdk import conf

try:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
except ImportError as e:
raise AirflowOptionalProviderFeatureException(e)

from airflow.utils.providers_configuration_loader import providers_configuration_loaded

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
import re
import ssl

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.common.compat.sdk import AirflowException, conf


def _broker_supports_visibility_timeout(url):
Expand Down Expand Up @@ -117,7 +116,7 @@ def _broker_supports_visibility_timeout(url):

def _get_celery_ssl_active() -> bool:
try:
return conf.getboolean("celery", "SSL_ACTIVE")
return conf.getboolean("celery", "SSL_ACTIVE", fallback=False)
except AirflowConfigException:
log.warning("Celery Executor will run without SSL")
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@
from kubernetes.client import models as k8s
from uuid6 import uuid7

from airflow.configuration import conf
from airflow.executors import workloads
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout, conf
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.state import State

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import pytest

from airflow.cli import cli_parser
from airflow.configuration import conf
from airflow.executors import executor_loader
from airflow.providers.celery.cli import celery_command
from airflow.providers.celery.cli.celery_command import _run_stale_bundle_cleanup
from airflow.providers.common.compat.sdk import conf

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
from celery.result import AsyncResult
from kombu.asynchronous import set_event_loop

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.providers.celery.executors import celery_executor, celery_executor_utils, default_celery
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.common.compat.sdk import conf
from airflow.utils.state import State

from tests_common.test_utils import db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import pytest

from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest
from airflow.configuration import conf
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.providers.common.compat.sdk import conf

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

Expand Down
Loading