Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from tenacity import before_sleep_log, wait_random_exponential

from airflow.api_fastapi.auth.tokens import JWTGenerator
from airflow.configuration import conf
from airflow.providers.common.compat.sdk import conf
from airflow.providers.edge3.models.edge_worker import (
EdgeWorkerDuplicateException,
EdgeWorkerVersionException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.providers.common.compat.sdk import conf
from airflow.providers.edge3.cli.dataclasses import MaintenanceMarker, WorkerStatus
from airflow.providers.edge3.cli.signalling import (
EDGE_WORKER_PROCESS_NAME,
Expand Down
3 changes: 1 addition & 2 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
from lockfile.pidlockfile import remove_existing_pidfile

from airflow import __version__ as airflow_version
from airflow.configuration import conf
from airflow.providers.common.compat.sdk import timezone
from airflow.providers.common.compat.sdk import conf, timezone
from airflow.providers.edge3 import __version__ as edge_provider_version
from airflow.providers.edge3.cli.api_client import (
jobs_fetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.orm import Session

from airflow.configuration import conf
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.providers.common.compat.sdk import Stats, timezone
from airflow.providers.common.compat.sdk import Stats, conf, timezone
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, reset_metrics
Expand Down Expand Up @@ -245,8 +244,8 @@ def _update_orphaned_jobs(self, session: Session) -> bool:
def _purge_jobs(self, session: Session) -> bool:
"""Clean finished jobs."""
purged_marker = False
job_success_purge = conf.getint("edge", "job_success_purge")
job_fail_purge = conf.getint("edge", "job_fail_purge")
job_success_purge = conf.getint("edge", "job_success_purge", fallback=5)
job_fail_purge = conf.getint("edge", "job_fail_purge", fallback=60)
jobs: Sequence[EdgeJobModel] = session.scalars(
select(EdgeJobModel)
.with_for_update(skip_locked=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import sys
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.providers.common.compat.sdk import AirflowPlugin
from airflow.providers.common.compat.sdk import AirflowPlugin, conf
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)

from airflow.api_fastapi.auth.tokens import JWTValidator
from airflow.configuration import conf
from airflow.providers.common.compat.sdk import conf
from airflow.providers.edge3.worker_api.datamodels import JsonRpcRequestBase # noqa: TCH001

log = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.configuration import conf
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import conf
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import PushLogsBody, WorkerApiDocs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import time_machine
from sqlalchemy import delete, select

from airflow.configuration import conf
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Stats, timezone
from airflow.providers.common.compat.sdk import Stats, conf, timezone
from airflow.providers.edge3.executors.edge_executor import EdgeExecutor
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState
Expand Down
Loading