diff --git a/README.md b/README.md index e364f3c..ec896e7 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ The exporter is based on this [prometheus exporter for Airflow](https://github.c The plugin has been tested with: -- Airflow >= 1.10.4 +- Airflow >= 1.10.8 - Python 3.6+ The scheduler metrics assume that there is a DAG named `canary_dag`. In our setup, the `canary_dag` is a DAG which has a tasks which perform very simple actions such as establishing database connections. This DAG is used to test the uptime of the Airflow scheduler itself. diff --git a/airflow_prometheus_exporter/prometheus_exporter.py b/airflow_prometheus_exporter/prometheus_exporter.py index 232d458..398eeab 100644 --- a/airflow_prometheus_exporter/prometheus_exporter.py +++ b/airflow_prometheus_exporter/prometheus_exporter.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from airflow.configuration import conf -from airflow.models import DagModel, DagRun, TaskInstance, TaskFail, XCom +from airflow.models import DagModel, DagRun, TaskInstance, TaskFail, XCom, DagTag from airflow.plugins_manager import AirflowPlugin from airflow.settings import RBAC, Session from airflow.utils.state import State @@ -39,15 +39,18 @@ def get_dag_state_info(): with session_scope(Session) as session: dag_status_query = ( session.query( + DagTag.name, DagRun.dag_id, DagRun.state, func.count(DagRun.state).label("count"), - ) - .group_by(DagRun.dag_id, DagRun.state) + ).outerjoin( + DagTag,DagTag.dag_id == TaskInstance.dag_id + ).group_by(DagRun.dag_id, DagRun.state) .subquery() ) return ( session.query( + dag_status_query.c.name, dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.count, @@ -138,18 +141,21 @@ def get_task_state_info(): with session_scope(Session) as session: task_status_query = ( session.query( + DagTag.name, TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state, func.count(TaskInstance.dag_id).label("value"), - ) - .group_by( + ).outerjoin( + DagTag,DagTag.dag_id == TaskInstance.dag_id + ).group_by( TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state ) .subquery() ) return ( session.query( + task_status_query.c.name, task_status_query.c.dag_id, task_status_query.c.task_id, task_status_query.c.state, @@ -360,11 +366,11 @@ def collect(self): t_state = GaugeMetricFamily( "airflow_task_status", "Shows the number of task instances with particular status", - labels=["dag_id", "task_id", "owner", "status"], + labels=["tag", "dag_id", "task_id", "owner", "status"], ) for task in task_info: t_state.add_metric( - [task.dag_id, task.task_id, task.owners, task.state or "none"], + [task.name or "none", task.dag_id, task.task_id, task.owners, task.state or "none"], task.value, ) yield t_state @@ -400,10 +406,10 @@ def collect(self): d_state = GaugeMetricFamily( "airflow_dag_status", "Shows the number of dag starts with this status", - labels=["dag_id", "owner", "status"], + labels=["tag", "dag_id", "owner", "status"], ) for dag in dag_info: - d_state.add_metric([dag.dag_id, dag.owners, dag.state], dag.count) + d_state.add_metric([dag.name or "none", dag.dag_id, dag.owners, dag.state], dag.count) yield d_state dag_duration = GaugeMetricFamily( diff --git a/setup.py b/setup.py index d8ad0be..6b7c7b8 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ readme = readme_file.read() install_requirements = [ - 'apache-airflow>=1.10.4', + 'apache-airflow>=1.10.8', 'prometheus_client>=0.4.2', ],