diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index ed5c61c604c79..5dc07445f5109 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -41,6 +41,7 @@ import attrs import structlog from sqlalchemy import select, update +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import load_only from tabulate import tabulate from uuid6 import uuid7 @@ -62,6 +63,7 @@ from airflow.models.dag import DagModel from airflow.models.dagbag import DagPriorityParsingRequest from airflow.models.dagbundle import DagBundleModel +from airflow.models.serialized_dag import SerializedDagModel from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ParseImportError @@ -91,6 +93,9 @@ from airflow.sdk.api.client import Client +log = structlog.get_logger(__name__) + + class DagParsingStat(NamedTuple): """Information on processing progress.""" @@ -1566,6 +1571,16 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): stats.gauge("dag_processing.total_parse_time", parse_time) stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats)) stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats)) + # Gated by config so large deployments can opt out of the extra DB round-trip. + # On failure an error counter is incremented so dashboards can alert on + # missing samples rather than silently showing a stale last value. + if conf.getboolean("scheduler", "emit_serialized_dag_count_metric", fallback=True): + try: + with create_session() as session: + stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) + except SQLAlchemyError: + log.exception("Failed to emit serialized_dag.count metric") + stats.incr("serialized_dag.count_error") def process_parse_results( diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index d2d9d47d61c6b..3aed02e022760 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -875,6 +875,21 @@ def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool: """ return session.scalar(select(literal(True)).where(cls.dag_id == dag_id).limit(1)) is not None + @classmethod + @provide_session + def get_count(cls, session: Session = NEW_SESSION) -> int: + """ + Return the total number of serialized DAGs stored in the database. + + Uses ``scalar_one()`` to enforce "exactly one row" semantics. + ``COUNT(*)`` always returns exactly one row, so ``scalar_one()`` makes + that contract explicit and will raise ``MultipleResultsFound`` if the + query shape ever changes unexpectedly. + + :param session: ORM Session + """ + return session.execute(select(func.count()).select_from(cls)).scalar_one() + @classmethod @provide_session def get_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> SerializedDAG | None: diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 33aaa82888830..f86c5f7be344f 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2790,3 +2790,60 @@ def test_bundle_version_data_stored_after_refresh(self, session): assert manager._bundle_versions["mock_bundle"] == "newhash" assert manager._bundle_version_data["mock_bundle"] == test_data + + +class TestEmitMetrics: + """Tests for the emit_metrics module-level function.""" + + def test_emit_metrics_emits_serialized_dag_count(self): + """emit_metrics emits the serialized_dag.count gauge with the value from get_count.""" + from airflow.dag_processing.manager import emit_metrics + + with mock.patch("airflow.dag_processing.manager.conf") as mock_conf: + mock_conf.getboolean.return_value = True + with mock.patch("airflow.dag_processing.manager.create_session"): + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3 + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) + + mock_stats.gauge.assert_any_call("serialized_dag.count", 3) + + def test_emit_metrics_logs_and_swallows_db_error(self): + """emit_metrics logs, increments error counter, and swallows SQLAlchemyError from get_count.""" + from sqlalchemy.exc import SQLAlchemyError + + from airflow.dag_processing.manager import emit_metrics + + with mock.patch("airflow.dag_processing.manager.conf") as mock_conf: + mock_conf.getboolean.return_value = True + with mock.patch("airflow.dag_processing.manager.create_session"): + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", + side_effect=SQLAlchemyError("db failure"), + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + with mock.patch("airflow.dag_processing.manager.log") as mock_log: + emit_metrics(parse_time=1.0, dag_file_stats=[]) + + mock_log.exception.assert_called_once_with("Failed to emit serialized_dag.count metric") + mock_stats.incr.assert_called_once_with("serialized_dag.count_error") + + def test_emit_metrics_skips_serialized_dag_count_when_disabled(self): + """emit_metrics does not query or emit serialized_dag.count when config is False.""" + from airflow.dag_processing.manager import emit_metrics + + with mock.patch("airflow.dag_processing.manager.conf") as mock_conf: + mock_conf.getboolean.return_value = False + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count" + ) as mock_get_count: + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) + + mock_get_count.assert_not_called() + assert not any( + call == mock.call("serialized_dag.count", mock.ANY) + for call in mock_stats.gauge.call_args_list + ) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 54438f8e82fc9..ae2fa339a66a1 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -912,3 +912,30 @@ def test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund # The name must have been updated in the DB. assert updated_alert.name == "updated name" + + def test_get_count_returns_zero_on_empty_table(self, session): + """get_count() returns 0 when no serialized DAGs are stored.""" + db.clear_db_serialized_dags() + assert SDM.get_count(session=session) == 0 + + def test_get_count_returns_correct_value(self, dag_maker, session): + """get_count() returns the exact number of serialized DAGs in the table.""" + baseline = SDM.get_count(session=session) + with dag_maker("dag_count_1"): + pass + with dag_maker("dag_count_2"): + pass + # dag_maker writes SerializedDagModel rows on context exit; flush to make + # them visible within the same session before asserting. + session.flush() + assert SDM.get_count(session=session) == baseline + 2 + + def test_get_count_propagates_db_error(self, session): + """get_count() lets OperationalError propagate so callers can handle DB failures.""" + from sqlalchemy.exc import OperationalError + + with mock.patch.object( + session, "execute", side_effect=OperationalError("db failure", {}, Exception("db failure")) + ): + with pytest.raises(OperationalError): + SDM.get_count(session=session)