diff --git a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py index 547e870d8aeb8..002d5206c0fb2 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py @@ -30,7 +30,7 @@ from tenacity import before_sleep_log, wait_random_exponential from airflow.api_fastapi.auth.tokens import JWTGenerator -from airflow.configuration import conf +from airflow.providers.common.compat.sdk import conf from airflow.providers.edge3.models.edge_worker import ( EdgeWorkerDuplicateException, EdgeWorkerVersionException, diff --git a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py index 864d6851ace18..63a7d9f840b17 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py @@ -33,7 +33,7 @@ from airflow import settings from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.cli.simple_table import AirflowConsole -from airflow.configuration import conf +from airflow.providers.common.compat.sdk import conf from airflow.providers.edge3.cli.dataclasses import MaintenanceMarker, WorkerStatus from airflow.providers.edge3.cli.signalling import ( EDGE_WORKER_PROCESS_NAME, diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 7ffd2b5aaffe6..53902f72480a3 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -34,8 +34,7 @@ from lockfile.pidlockfile import remove_existing_pidfile from airflow import __version__ as airflow_version -from airflow.configuration import conf -from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.compat.sdk import conf, timezone from airflow.providers.edge3 import __version__ as edge_provider_version from airflow.providers.edge3.cli.api_client import ( jobs_fetch, diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 60392062469bc..62bd3155575bd 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -27,11 +27,10 @@ from sqlalchemy.exc import NoSuchTableError from sqlalchemy.orm import Session -from airflow.configuration import conf from airflow.executors import workloads from airflow.executors.base_executor import BaseExecutor from airflow.models.taskinstance import TaskInstance -from airflow.providers.common.compat.sdk import Stats, timezone +from airflow.providers.common.compat.sdk import Stats, conf, timezone from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.models.edge_logs import EdgeLogsModel from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, reset_metrics @@ -245,8 +244,8 @@ def _update_orphaned_jobs(self, session: Session) -> bool: def _purge_jobs(self, session: Session) -> bool: """Clean finished jobs.""" purged_marker = False - job_success_purge = conf.getint("edge", "job_success_purge") - job_fail_purge = conf.getint("edge", "job_fail_purge") + job_success_purge = conf.getint("edge", "job_success_purge", fallback=5) + job_fail_purge = conf.getint("edge", "job_fail_purge", fallback=60) jobs: Sequence[EdgeJobModel] = session.scalars( select(EdgeJobModel) .with_for_update(skip_locked=True) diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py index ad22956258c0e..20c9fa406803c 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py +++ b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py @@ -20,9 +20,8 @@ import sys from typing import TYPE_CHECKING, Any -from airflow.configuration import conf from airflow.exceptions import AirflowConfigException -from airflow.providers.common.compat.sdk import AirflowPlugin +from airflow.providers.common.compat.sdk import AirflowPlugin, conf from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS from airflow.utils.session import NEW_SESSION, provide_session diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py b/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py index a29fd42b2d1e7..6e9ba46ba87cd 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py @@ -31,7 +31,7 @@ ) from airflow.api_fastapi.auth.tokens import JWTValidator -from airflow.configuration import conf +from airflow.providers.common.compat.sdk import conf from airflow.providers.edge3.worker_api.datamodels import JsonRpcRequestBase # noqa: TCH001 log = logging.getLogger(__name__) diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py index 064808b2b0484..0fd3662c0ae02 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py @@ -26,9 +26,9 @@ from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001 from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.configuration import conf from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstancekey import TaskInstanceKey +from airflow.providers.common.compat.sdk import conf from airflow.providers.edge3.models.edge_logs import EdgeLogsModel from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import PushLogsBody, WorkerApiDocs diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py index c38dffed3e918..4bc0fb2f07c5f 100644 --- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py +++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py @@ -23,9 +23,8 @@ import time_machine from sqlalchemy import delete, select -from airflow.configuration import conf from airflow.models.taskinstancekey import TaskInstanceKey -from airflow.providers.common.compat.sdk import Stats, timezone +from airflow.providers.common.compat.sdk import Stats, conf, timezone from airflow.providers.edge3.executors.edge_executor import EdgeExecutor from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState