From 440aede1a43f17dcb92c1c405e7316bcb03d0034 Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 00:56:23 +0530 Subject: [PATCH 01/10] fix: add SerializedDagModel.get_count() that raises on None instead of silently returning 0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit COUNT queries on aggregate columns always return an integer — a None result from session.scalar() signals a transient DB connectivity failure, not an empty table. Replacing `or 0` with an explicit RuntimeError ensures the failure surfaces loudly rather than being silently emitted as a zero count that triggers false on-call pages. * Add SerializedDagModel.get_count() classmethod with None-guard * Emit serialized_dag.count metric in dag_processing/manager.py emit_metrics(), wrapped in try/except so a DB error logs a warning but never kills the parse loop * Add unit tests for get_count() and emit_metrics() serialized_dag.count path Fixes: apache/airflow#66796 --- .../src/airflow/dag_processing/manager.py | 9 ++++++ .../src/airflow/models/serialized_dag.py | 17 +++++++++++ .../tests/unit/dag_processing/test_manager.py | 30 +++++++++++++++++++ .../tests/unit/models/test_serialized_dag.py | 19 ++++++++++++ 4 files changed, 75 insertions(+) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index ed5c61c604c79..3b37d8faefd79 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -62,6 +62,7 @@ from airflow.models.dag import DagModel from airflow.models.dagbag import DagPriorityParsingRequest from airflow.models.dagbundle import DagBundleModel +from airflow.models.serialized_dag import SerializedDagModel from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ParseImportError @@ -91,6 +92,9 @@ from airflow.sdk.api.client import Client +log = structlog.get_logger(__name__) + + class DagParsingStat(NamedTuple): """Information on processing progress.""" @@ -1566,6 +1570,11 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): stats.gauge("dag_processing.total_parse_time", parse_time) stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats)) stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats)) + try: + with create_session() as session: + stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) + except Exception: + log.exception("Failed to emit serialized_dag.count metric") def process_parse_results( diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index d2d9d47d61c6b..505cae3ed301b 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -875,6 +875,23 @@ def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool: """ return session.scalar(select(literal(True)).where(cls.dag_id == dag_id).limit(1)) is not None + @classmethod + @provide_session + def get_count(cls, session: Session = NEW_SESSION) -> int: + """ + Return the total number of serialized DAGs stored in the database. + + :param session: ORM Session + :raises RuntimeError: if the database returns None for the COUNT query, which indicates + a transient connectivity issue rather than an empty table (COUNT always returns an int). + """ + result = session.scalar(select(func.count()).select_from(cls)) + if result is None: + raise RuntimeError( + "COUNT query on serialized_dag returned None — possible database connectivity issue" + ) + return result + @classmethod @provide_session def get_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> SerializedDAG | None: diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 33aaa82888830..383cc3c548e69 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2790,3 +2790,33 @@ def test_bundle_version_data_stored_after_refresh(self, session): assert manager._bundle_versions["mock_bundle"] == "newhash" assert manager._bundle_version_data["mock_bundle"] == test_data + + +class TestEmitMetrics: + """Tests for the emit_metrics module-level function.""" + + @pytest.mark.db_test + def test_emit_metrics_emits_serialized_dag_count(self, dag_maker, session): + """emit_metrics emits the serialized_dag.count gauge with the DB count.""" + from airflow.dag_processing.manager import emit_metrics + + with dag_maker("emit_count_dag"): + pass + + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) + + calls = {call[0][0]: call[0][1] for call in mock_stats.gauge.call_args_list} + assert "serialized_dag.count" in calls + assert calls["serialized_dag.count"] == 1 + + def test_emit_metrics_does_not_raise_on_db_error(self): + """emit_metrics logs and swallows RuntimeError from get_count on DB failure.""" + from airflow.dag_processing.manager import emit_metrics + + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", + side_effect=RuntimeError("COUNT query on serialized_dag returned None"), + ): + with mock.patch("airflow.dag_processing.manager.stats"): + emit_metrics(parse_time=1.0, dag_file_stats=[]) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 54438f8e82fc9..bb395725839f5 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -912,3 +912,22 @@ def test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund # The name must have been updated in the DB. assert updated_alert.name == "updated name" + + def test_get_count_returns_zero_on_empty_table(self, session): + """get_count() returns 0 when no serialized DAGs are stored.""" + assert SDM.get_count(session=session) == 0 + + def test_get_count_returns_correct_value(self, dag_maker, session): + """get_count() returns the exact number of serialized DAGs in the table.""" + with dag_maker("dag_count_1"): + pass + with dag_maker("dag_count_2"): + pass + count = SDM.get_count(session=session) + assert count == 2 + + def test_get_count_raises_on_none_result(self, session): + """get_count() raises RuntimeError when session.scalar returns None (simulates DB failure).""" + with mock.patch.object(session, "scalar", return_value=None): + with pytest.raises(RuntimeError, match="COUNT query on serialized_dag returned None"): + SDM.get_count(session=session) From 468c0a563c5e90ffb4f9426fa53bd3418c70f05d Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:03:10 +0530 Subject: [PATCH 02/10] review: address Copilot feedback on PR #67572 - Narrow except clause from Exception to (RuntimeError, SQLAlchemyError) so unexpected programming errors still propagate - Replace Unicode em dash with ASCII hyphen in RuntimeError message - Fix test_emit_metrics_emits_serialized_dag_count: mock get_count instead of relying on dag_maker rows being visible across a new session boundary - Update pytest.raises match string to reflect the corrected error message --- .../src/airflow/dag_processing/manager.py | 3 ++- .../src/airflow/models/serialized_dag.py | 2 +- .../tests/unit/dag_processing/test_manager.py | 19 +++++++++---------- .../tests/unit/models/test_serialized_dag.py | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 3b37d8faefd79..17dcd29d6922f 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -41,6 +41,7 @@ import attrs import structlog from sqlalchemy import select, update +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import load_only from tabulate import tabulate from uuid6 import uuid7 @@ -1573,7 +1574,7 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): try: with create_session() as session: stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) - except Exception: + except (RuntimeError, SQLAlchemyError): log.exception("Failed to emit serialized_dag.count metric") diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 505cae3ed301b..bcf7be3bf0ba6 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -888,7 +888,7 @@ def get_count(cls, session: Session = NEW_SESSION) -> int: result = session.scalar(select(func.count()).select_from(cls)) if result is None: raise RuntimeError( - "COUNT query on serialized_dag returned None — possible database connectivity issue" + "COUNT query on serialized_dag returned None - possible database connectivity issue" ) return result diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 383cc3c548e69..b386525c24aac 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2795,20 +2795,19 @@ def test_bundle_version_data_stored_after_refresh(self, session): class TestEmitMetrics: """Tests for the emit_metrics module-level function.""" - @pytest.mark.db_test - def test_emit_metrics_emits_serialized_dag_count(self, dag_maker, session): - """emit_metrics emits the serialized_dag.count gauge with the DB count.""" + def test_emit_metrics_emits_serialized_dag_count(self): + """emit_metrics emits the serialized_dag.count gauge with the value from get_count.""" from airflow.dag_processing.manager import emit_metrics - with dag_maker("emit_count_dag"): - pass - - with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: - emit_metrics(parse_time=1.0, dag_file_stats=[]) + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3 + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) calls = {call[0][0]: call[0][1] for call in mock_stats.gauge.call_args_list} assert "serialized_dag.count" in calls - assert calls["serialized_dag.count"] == 1 + assert calls["serialized_dag.count"] == 3 def test_emit_metrics_does_not_raise_on_db_error(self): """emit_metrics logs and swallows RuntimeError from get_count on DB failure.""" @@ -2816,7 +2815,7 @@ def test_emit_metrics_does_not_raise_on_db_error(self): with mock.patch( "airflow.dag_processing.manager.SerializedDagModel.get_count", - side_effect=RuntimeError("COUNT query on serialized_dag returned None"), + side_effect=RuntimeError("COUNT query on serialized_dag returned None - possible"), ): with mock.patch("airflow.dag_processing.manager.stats"): emit_metrics(parse_time=1.0, dag_file_stats=[]) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index bb395725839f5..b4aa7efa127b6 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -929,5 +929,5 @@ def test_get_count_returns_correct_value(self, dag_maker, session): def test_get_count_raises_on_none_result(self, session): """get_count() raises RuntimeError when session.scalar returns None (simulates DB failure).""" with mock.patch.object(session, "scalar", return_value=None): - with pytest.raises(RuntimeError, match="COUNT query on serialized_dag returned None"): + with pytest.raises(RuntimeError, match="COUNT query on serialized_dag returned None - possible"): SDM.get_count(session=session) From bea29b93cae48416a9e47f8813f6e7b0d9a3df53 Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:10:12 +0530 Subject: [PATCH 03/10] review: address second round of Copilot feedback on PR #67572 - Switch get_count() from scalar()+None-check to scalar_one(): COUNT(*) always returns exactly one row, so scalar_one() is the correct API and lets SQLAlchemyError surface naturally on DB failure instead of returning None - Narrow except clause to SQLAlchemyError only (RuntimeError no longer raised) - Add performance comment on the COUNT(*) round-trip in emit_metrics() - Fix test_emit_metrics_logs_and_swallows_db_error: assert log.exception is called with the expected message, and use SQLAlchemyError as the side-effect - Rename test to reflect it now validates logging, not just absence of raise - Add session.flush() in test_get_count_returns_correct_value to make the dag_maker-written rows explicitly visible before asserting the count - Replace test_get_count_raises_on_none_result with test_get_count_propagates_db_error that mocks session.execute to raise SQLAlchemyError --- airflow-core/src/airflow/dag_processing/manager.py | 5 ++++- airflow-core/src/airflow/models/serialized_dag.py | 14 ++++++-------- .../tests/unit/dag_processing/test_manager.py | 13 +++++++++---- .../tests/unit/models/test_serialized_dag.py | 13 +++++++++---- 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 17dcd29d6922f..fca7312c0ba40 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1571,10 +1571,13 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): stats.gauge("dag_processing.total_parse_time", parse_time) stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats)) stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats)) + # COUNT(*) on the serialized_dag table adds one DB round-trip per parse loop. + # On large installations this is typically fast (index scan on the PK), but + # we isolate the call so that a transient DB error never kills the parse loop. try: with create_session() as session: stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) - except (RuntimeError, SQLAlchemyError): + except SQLAlchemyError: log.exception("Failed to emit serialized_dag.count metric") diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index bcf7be3bf0ba6..09dfeab0fc6e1 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -881,16 +881,14 @@ def get_count(cls, session: Session = NEW_SESSION) -> int: """ Return the total number of serialized DAGs stored in the database. + Uses ``scalar_one()`` so that a DB connectivity failure surfaces as a + ``SQLAlchemyError`` rather than a silent ``None`` return. ``COUNT(*)`` + always produces exactly one row, so ``NoResultFound`` is never raised + under normal conditions. + :param session: ORM Session - :raises RuntimeError: if the database returns None for the COUNT query, which indicates - a transient connectivity issue rather than an empty table (COUNT always returns an int). """ - result = session.scalar(select(func.count()).select_from(cls)) - if result is None: - raise RuntimeError( - "COUNT query on serialized_dag returned None - possible database connectivity issue" - ) - return result + return session.execute(select(func.count()).select_from(cls)).scalar_one() @classmethod @provide_session diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index b386525c24aac..4119e7108827b 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2809,13 +2809,18 @@ def test_emit_metrics_emits_serialized_dag_count(self): assert "serialized_dag.count" in calls assert calls["serialized_dag.count"] == 3 - def test_emit_metrics_does_not_raise_on_db_error(self): - """emit_metrics logs and swallows RuntimeError from get_count on DB failure.""" + def test_emit_metrics_logs_and_swallows_db_error(self): + """emit_metrics logs via log.exception and swallows SQLAlchemyError from get_count.""" + from sqlalchemy.exc import SQLAlchemyError + from airflow.dag_processing.manager import emit_metrics with mock.patch( "airflow.dag_processing.manager.SerializedDagModel.get_count", - side_effect=RuntimeError("COUNT query on serialized_dag returned None - possible"), + side_effect=SQLAlchemyError("db failure"), ): with mock.patch("airflow.dag_processing.manager.stats"): - emit_metrics(parse_time=1.0, dag_file_stats=[]) + with mock.patch("airflow.dag_processing.manager.log") as mock_log: + emit_metrics(parse_time=1.0, dag_file_stats=[]) + + mock_log.exception.assert_called_once_with("Failed to emit serialized_dag.count metric") diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index b4aa7efa127b6..bb833801a3e0d 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -923,11 +923,16 @@ def test_get_count_returns_correct_value(self, dag_maker, session): pass with dag_maker("dag_count_2"): pass + # dag_maker writes SerializedDagModel rows on context exit; flush to make + # them visible within the same session before asserting. + session.flush() count = SDM.get_count(session=session) assert count == 2 - def test_get_count_raises_on_none_result(self, session): - """get_count() raises RuntimeError when session.scalar returns None (simulates DB failure).""" - with mock.patch.object(session, "scalar", return_value=None): - with pytest.raises(RuntimeError, match="COUNT query on serialized_dag returned None - possible"): + def test_get_count_propagates_db_error(self, session): + """get_count() lets SQLAlchemyError propagate so callers can handle DB failures.""" + from sqlalchemy.exc import SQLAlchemyError + + with mock.patch.object(session, "execute", side_effect=SQLAlchemyError("db failure")): + with pytest.raises(SQLAlchemyError): SDM.get_count(session=session) From cc21af5f65c0e55bfb3a40655dbdd796f7c4f5b8 Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:34:00 +0530 Subject: [PATCH 04/10] review: address third round of Copilot feedback on PR #67572 - Rephrase COUNT(*) comment: remove the inaccurate "index scan on the PK" claim; note the query plan is DB-dependent and can be expensive, with throttling noted as a straightforward future follow-up - Use baseline+2 instead of hard-coded == 2 in test_get_count_returns_correct_value so the assertion holds even if other fixtures leave rows in the table - Replace dict-fold assertion with assert_any_call("serialized_dag.count", 3) to avoid masking duplicate emissions --- airflow-core/src/airflow/dag_processing/manager.py | 6 ++++-- airflow-core/tests/unit/dag_processing/test_manager.py | 4 +--- airflow-core/tests/unit/models/test_serialized_dag.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index fca7312c0ba40..8a5597bf1ee69 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1572,8 +1572,10 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats)) stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats)) # COUNT(*) on the serialized_dag table adds one DB round-trip per parse loop. - # On large installations this is typically fast (index scan on the PK), but - # we isolate the call so that a transient DB error never kills the parse loop. + # This can be expensive on large deployments (query plan is DB-dependent and + # may involve a full table scan). The call is isolated so that a transient + # DB error never kills the parse loop; throttling this metric in the future + # is a straightforward follow-up if the round-trip becomes a bottleneck. try: with create_session() as session: stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 4119e7108827b..a5a85590896d3 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2805,9 +2805,7 @@ def test_emit_metrics_emits_serialized_dag_count(self): with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: emit_metrics(parse_time=1.0, dag_file_stats=[]) - calls = {call[0][0]: call[0][1] for call in mock_stats.gauge.call_args_list} - assert "serialized_dag.count" in calls - assert calls["serialized_dag.count"] == 3 + mock_stats.gauge.assert_any_call("serialized_dag.count", 3) def test_emit_metrics_logs_and_swallows_db_error(self): """emit_metrics logs via log.exception and swallows SQLAlchemyError from get_count.""" diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index bb833801a3e0d..bbf2112aa0707 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -919,6 +919,7 @@ def test_get_count_returns_zero_on_empty_table(self, session): def test_get_count_returns_correct_value(self, dag_maker, session): """get_count() returns the exact number of serialized DAGs in the table.""" + baseline = SDM.get_count(session=session) with dag_maker("dag_count_1"): pass with dag_maker("dag_count_2"): @@ -926,8 +927,7 @@ def test_get_count_returns_correct_value(self, dag_maker, session): # dag_maker writes SerializedDagModel rows on context exit; flush to make # them visible within the same session before asserting. session.flush() - count = SDM.get_count(session=session) - assert count == 2 + assert SDM.get_count(session=session) == baseline + 2 def test_get_count_propagates_db_error(self, session): """get_count() lets SQLAlchemyError propagate so callers can handle DB failures.""" From e58ba6e025e5a7e1fdd071f9408010e65df73105 Mon Sep 17 00:00:00 2001 From: Radhakrishnan Pachyappan Date: Wed, 27 May 2026 01:37:21 +0530 Subject: [PATCH 05/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- airflow-core/tests/unit/models/test_serialized_dag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index bbf2112aa0707..8e3e413038162 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -915,6 +915,8 @@ def test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund def test_get_count_returns_zero_on_empty_table(self, session): """get_count() returns 0 when no serialized DAGs are stored.""" + session.execute(delete(SDM)) + session.commit() assert SDM.get_count(session=session) == 0 def test_get_count_returns_correct_value(self, dag_maker, session): From 4c4f960ca415b8540712f17a67876d9413790282 Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:40:25 +0530 Subject: [PATCH 06/10] review: address fourth round of Copilot feedback on PR #67572 - Fix get_count() docstring: scalar_one() is justified by "exactly-one-row semantics", not by connectivity-error behavior (both scalar() and scalar_one() raise on connectivity failures) - Narrow except clause from SQLAlchemyError to OperationalError: the intent is to swallow transient connectivity failures only; schema/programming errors from SQLAlchemy should still propagate - Update tests to use OperationalError consistently with the new except scope - Add docstring note to test_get_count_returns_zero_on_empty_table explaining it relies on the autouse setup_test_cases fixture for the empty-table state --- .../src/airflow/dag_processing/manager.py | 4 ++-- .../src/airflow/models/serialized_dag.py | 8 ++++---- .../tests/unit/dag_processing/test_manager.py | 4 ++-- .../tests/unit/models/test_serialized_dag.py | 16 +++++++++------- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 8a5597bf1ee69..301621b2373e0 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -41,7 +41,7 @@ import attrs import structlog from sqlalchemy import select, update -from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.exc import OperationalError from sqlalchemy.orm import load_only from tabulate import tabulate from uuid6 import uuid7 @@ -1579,7 +1579,7 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): try: with create_session() as session: stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) - except SQLAlchemyError: + except OperationalError: log.exception("Failed to emit serialized_dag.count metric") diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 09dfeab0fc6e1..3aed02e022760 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -881,10 +881,10 @@ def get_count(cls, session: Session = NEW_SESSION) -> int: """ Return the total number of serialized DAGs stored in the database. - Uses ``scalar_one()`` so that a DB connectivity failure surfaces as a - ``SQLAlchemyError`` rather than a silent ``None`` return. ``COUNT(*)`` - always produces exactly one row, so ``NoResultFound`` is never raised - under normal conditions. + Uses ``scalar_one()`` to enforce "exactly one row" semantics. + ``COUNT(*)`` always returns exactly one row, so ``scalar_one()`` makes + that contract explicit and will raise ``MultipleResultsFound`` if the + query shape ever changes unexpectedly. :param session: ORM Session """ diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index a5a85590896d3..50cbf06ad6057 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2809,13 +2809,13 @@ def test_emit_metrics_emits_serialized_dag_count(self): def test_emit_metrics_logs_and_swallows_db_error(self): """emit_metrics logs via log.exception and swallows SQLAlchemyError from get_count.""" - from sqlalchemy.exc import SQLAlchemyError + from sqlalchemy.exc import OperationalError from airflow.dag_processing.manager import emit_metrics with mock.patch( "airflow.dag_processing.manager.SerializedDagModel.get_count", - side_effect=SQLAlchemyError("db failure"), + side_effect=OperationalError("db failure", None, None), ): with mock.patch("airflow.dag_processing.manager.stats"): with mock.patch("airflow.dag_processing.manager.log") as mock_log: diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 8e3e413038162..e2bf4b56f8c3a 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -914,9 +914,11 @@ def test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund assert updated_alert.name == "updated name" def test_get_count_returns_zero_on_empty_table(self, session): - """get_count() returns 0 when no serialized DAGs are stored.""" - session.execute(delete(SDM)) - session.commit() + """get_count() returns 0 when no serialized DAGs are stored. + + The autouse ``setup_test_cases`` fixture calls ``db.clear_db_serialized_dags()`` + before this test runs, guaranteeing a known-empty starting state. + """ assert SDM.get_count(session=session) == 0 def test_get_count_returns_correct_value(self, dag_maker, session): @@ -932,9 +934,9 @@ def test_get_count_returns_correct_value(self, dag_maker, session): assert SDM.get_count(session=session) == baseline + 2 def test_get_count_propagates_db_error(self, session): - """get_count() lets SQLAlchemyError propagate so callers can handle DB failures.""" - from sqlalchemy.exc import SQLAlchemyError + """get_count() lets OperationalError propagate so callers can handle DB failures.""" + from sqlalchemy.exc import OperationalError - with mock.patch.object(session, "execute", side_effect=SQLAlchemyError("db failure")): - with pytest.raises(SQLAlchemyError): + with mock.patch.object(session, "execute", side_effect=OperationalError("db failure", None, None)): + with pytest.raises(OperationalError): SDM.get_count(session=session) From 06958fbae24398032dba79eb0a4cc10a40c10f3b Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:48:50 +0530 Subject: [PATCH 07/10] fix: catch SQLAlchemyError broadly in emit_metrics and add explicit db clear in test - Change `except OperationalError` to `except SQLAlchemyError` in emit_metrics() so all DB failures (disconnects, timeouts, etc.) are swallowed, not just OperationalError - Update test_emit_metrics_logs_and_swallows_db_error to use SQLAlchemyError side-effect to match the exception actually caught by the production code - Add explicit db.clear_db_serialized_dags() in test_get_count_returns_zero_on_empty_table to make the empty-table precondition self-contained and not rely solely on autouse fixture --- airflow-core/src/airflow/dag_processing/manager.py | 4 ++-- airflow-core/tests/unit/dag_processing/test_manager.py | 4 ++-- airflow-core/tests/unit/models/test_serialized_dag.py | 7 ++----- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 301621b2373e0..8a5597bf1ee69 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -41,7 +41,7 @@ import attrs import structlog from sqlalchemy import select, update -from sqlalchemy.exc import OperationalError +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import load_only from tabulate import tabulate from uuid6 import uuid7 @@ -1579,7 +1579,7 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): try: with create_session() as session: stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) - except OperationalError: + except SQLAlchemyError: log.exception("Failed to emit serialized_dag.count metric") diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 50cbf06ad6057..a5a85590896d3 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2809,13 +2809,13 @@ def test_emit_metrics_emits_serialized_dag_count(self): def test_emit_metrics_logs_and_swallows_db_error(self): """emit_metrics logs via log.exception and swallows SQLAlchemyError from get_count.""" - from sqlalchemy.exc import OperationalError + from sqlalchemy.exc import SQLAlchemyError from airflow.dag_processing.manager import emit_metrics with mock.patch( "airflow.dag_processing.manager.SerializedDagModel.get_count", - side_effect=OperationalError("db failure", None, None), + side_effect=SQLAlchemyError("db failure"), ): with mock.patch("airflow.dag_processing.manager.stats"): with mock.patch("airflow.dag_processing.manager.log") as mock_log: diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index e2bf4b56f8c3a..c49da278a3df8 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -914,11 +914,8 @@ def test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund assert updated_alert.name == "updated name" def test_get_count_returns_zero_on_empty_table(self, session): - """get_count() returns 0 when no serialized DAGs are stored. - - The autouse ``setup_test_cases`` fixture calls ``db.clear_db_serialized_dags()`` - before this test runs, guaranteeing a known-empty starting state. - """ + """get_count() returns 0 when no serialized DAGs are stored.""" + db.clear_db_serialized_dags() assert SDM.get_count(session=session) == 0 def test_get_count_returns_correct_value(self, dag_maker, session): From 9e21b7ec198a5838bc054dc4b9391e50de688d2c Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:54:29 +0530 Subject: [PATCH 08/10] fix: emit error counter on serialized_dag.count failure and clarify comment - Add stats.incr("serialized_dag.count_error") in the except block so dashboards receive an explicit failure signal instead of silently showing a stale gauge value - Tighten the inline comment to mention the error-counter rationale - Update test to assert the error counter is incremented on SQLAlchemyError --- airflow-core/src/airflow/dag_processing/manager.py | 9 +++++---- airflow-core/tests/unit/dag_processing/test_manager.py | 5 +++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 8a5597bf1ee69..f78633951f651 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1572,15 +1572,16 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats)) stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats)) # COUNT(*) on the serialized_dag table adds one DB round-trip per parse loop. - # This can be expensive on large deployments (query plan is DB-dependent and - # may involve a full table scan). The call is isolated so that a transient - # DB error never kills the parse loop; throttling this metric in the future - # is a straightforward follow-up if the round-trip becomes a bottleneck. + # This can be expensive on large deployments; throttling this metric is a + # straightforward follow-up if the round-trip becomes a bottleneck. + # On failure, an error counter is incremented so dashboards can alert on + # missing samples rather than silently showing a stale last value. try: with create_session() as session: stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) except SQLAlchemyError: log.exception("Failed to emit serialized_dag.count metric") + stats.incr("serialized_dag.count_error") def process_parse_results( diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index a5a85590896d3..f70fa5c7b663c 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2808,7 +2808,7 @@ def test_emit_metrics_emits_serialized_dag_count(self): mock_stats.gauge.assert_any_call("serialized_dag.count", 3) def test_emit_metrics_logs_and_swallows_db_error(self): - """emit_metrics logs via log.exception and swallows SQLAlchemyError from get_count.""" + """emit_metrics logs, increments error counter, and swallows SQLAlchemyError from get_count.""" from sqlalchemy.exc import SQLAlchemyError from airflow.dag_processing.manager import emit_metrics @@ -2817,8 +2817,9 @@ def test_emit_metrics_logs_and_swallows_db_error(self): "airflow.dag_processing.manager.SerializedDagModel.get_count", side_effect=SQLAlchemyError("db failure"), ): - with mock.patch("airflow.dag_processing.manager.stats"): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: with mock.patch("airflow.dag_processing.manager.log") as mock_log: emit_metrics(parse_time=1.0, dag_file_stats=[]) mock_log.exception.assert_called_once_with("Failed to emit serialized_dag.count metric") + mock_stats.incr.assert_called_once_with("serialized_dag.count_error") From e7a879ef87ca2ff7a60d64326cbd8e4e72572368 Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 01:59:35 +0530 Subject: [PATCH 09/10] fix: gate serialized_dag.count behind config, isolate tests, fix OperationalError orig - Wrap the COUNT(*) query in a conf.getboolean guard (scheduler.emit_serialized_dag_count_metric, default True) so large deployments can opt out of the extra DB round-trip without code changes - Patch create_session in both TestEmitMetrics tests so they are pure unit tests with no real DB session creation - Pass Exception("db failure") as the orig arg to OperationalError in test_get_count_propagates_db_error to match real-world error construction --- .../src/airflow/dag_processing/manager.py | 19 +++++++------- .../tests/unit/dag_processing/test_manager.py | 26 ++++++++++--------- .../tests/unit/models/test_serialized_dag.py | 4 ++- 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index f78633951f651..5dc07445f5109 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1571,17 +1571,16 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]): stats.gauge("dag_processing.total_parse_time", parse_time) stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats)) stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats)) - # COUNT(*) on the serialized_dag table adds one DB round-trip per parse loop. - # This can be expensive on large deployments; throttling this metric is a - # straightforward follow-up if the round-trip becomes a bottleneck. - # On failure, an error counter is incremented so dashboards can alert on + # Gated by config so large deployments can opt out of the extra DB round-trip. + # On failure an error counter is incremented so dashboards can alert on # missing samples rather than silently showing a stale last value. - try: - with create_session() as session: - stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) - except SQLAlchemyError: - log.exception("Failed to emit serialized_dag.count metric") - stats.incr("serialized_dag.count_error") + if conf.getboolean("scheduler", "emit_serialized_dag_count_metric", fallback=True): + try: + with create_session() as session: + stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) + except SQLAlchemyError: + log.exception("Failed to emit serialized_dag.count metric") + stats.incr("serialized_dag.count_error") def process_parse_results( diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index f70fa5c7b663c..bb329b94e9c7d 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2799,11 +2799,12 @@ def test_emit_metrics_emits_serialized_dag_count(self): """emit_metrics emits the serialized_dag.count gauge with the value from get_count.""" from airflow.dag_processing.manager import emit_metrics - with mock.patch( - "airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3 - ): - with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: - emit_metrics(parse_time=1.0, dag_file_stats=[]) + with mock.patch("airflow.dag_processing.manager.create_session"): + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3 + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) mock_stats.gauge.assert_any_call("serialized_dag.count", 3) @@ -2813,13 +2814,14 @@ def test_emit_metrics_logs_and_swallows_db_error(self): from airflow.dag_processing.manager import emit_metrics - with mock.patch( - "airflow.dag_processing.manager.SerializedDagModel.get_count", - side_effect=SQLAlchemyError("db failure"), - ): - with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: - with mock.patch("airflow.dag_processing.manager.log") as mock_log: - emit_metrics(parse_time=1.0, dag_file_stats=[]) + with mock.patch("airflow.dag_processing.manager.create_session"): + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", + side_effect=SQLAlchemyError("db failure"), + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + with mock.patch("airflow.dag_processing.manager.log") as mock_log: + emit_metrics(parse_time=1.0, dag_file_stats=[]) mock_log.exception.assert_called_once_with("Failed to emit serialized_dag.count metric") mock_stats.incr.assert_called_once_with("serialized_dag.count_error") diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index c49da278a3df8..3a05b16f5582b 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -934,6 +934,8 @@ def test_get_count_propagates_db_error(self, session): """get_count() lets OperationalError propagate so callers can handle DB failures.""" from sqlalchemy.exc import OperationalError - with mock.patch.object(session, "execute", side_effect=OperationalError("db failure", None, None)): + with mock.patch.object( + session, "execute", side_effect=OperationalError("db failure", None, Exception("db failure")) + ): with pytest.raises(OperationalError): SDM.get_count(session=session) From 0edd85245e81c7ab844f24781875d126908fb454 Mon Sep 17 00:00:00 2001 From: gingeekrishna Date: Wed, 27 May 2026 02:04:31 +0530 Subject: [PATCH 10/10] fix: pin conf mock in emit_metrics tests, add disabled-config test, fix OperationalError params - Mock conf.getboolean explicitly to True in existing emit_metrics tests so they are deterministic regardless of the runtime Airflow config - Add test_emit_metrics_skips_serialized_dag_count_when_disabled to cover the config=False branch: asserts get_count is not called and no gauge is emitted - Pass {} (not None) as the params arg to OperationalError to match real SQLAlchemy error construction and avoid brittle stringification behaviour --- .../tests/unit/dag_processing/test_manager.py | 50 +++++++++++++------ .../tests/unit/models/test_serialized_dag.py | 2 +- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index bb329b94e9c7d..f86c5f7be344f 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -2799,12 +2799,14 @@ def test_emit_metrics_emits_serialized_dag_count(self): """emit_metrics emits the serialized_dag.count gauge with the value from get_count.""" from airflow.dag_processing.manager import emit_metrics - with mock.patch("airflow.dag_processing.manager.create_session"): - with mock.patch( - "airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3 - ): - with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: - emit_metrics(parse_time=1.0, dag_file_stats=[]) + with mock.patch("airflow.dag_processing.manager.conf") as mock_conf: + mock_conf.getboolean.return_value = True + with mock.patch("airflow.dag_processing.manager.create_session"): + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3 + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) mock_stats.gauge.assert_any_call("serialized_dag.count", 3) @@ -2814,14 +2816,34 @@ def test_emit_metrics_logs_and_swallows_db_error(self): from airflow.dag_processing.manager import emit_metrics - with mock.patch("airflow.dag_processing.manager.create_session"): - with mock.patch( - "airflow.dag_processing.manager.SerializedDagModel.get_count", - side_effect=SQLAlchemyError("db failure"), - ): - with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: - with mock.patch("airflow.dag_processing.manager.log") as mock_log: - emit_metrics(parse_time=1.0, dag_file_stats=[]) + with mock.patch("airflow.dag_processing.manager.conf") as mock_conf: + mock_conf.getboolean.return_value = True + with mock.patch("airflow.dag_processing.manager.create_session"): + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count", + side_effect=SQLAlchemyError("db failure"), + ): + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + with mock.patch("airflow.dag_processing.manager.log") as mock_log: + emit_metrics(parse_time=1.0, dag_file_stats=[]) mock_log.exception.assert_called_once_with("Failed to emit serialized_dag.count metric") mock_stats.incr.assert_called_once_with("serialized_dag.count_error") + + def test_emit_metrics_skips_serialized_dag_count_when_disabled(self): + """emit_metrics does not query or emit serialized_dag.count when config is False.""" + from airflow.dag_processing.manager import emit_metrics + + with mock.patch("airflow.dag_processing.manager.conf") as mock_conf: + mock_conf.getboolean.return_value = False + with mock.patch( + "airflow.dag_processing.manager.SerializedDagModel.get_count" + ) as mock_get_count: + with mock.patch("airflow.dag_processing.manager.stats") as mock_stats: + emit_metrics(parse_time=1.0, dag_file_stats=[]) + + mock_get_count.assert_not_called() + assert not any( + call == mock.call("serialized_dag.count", mock.ANY) + for call in mock_stats.gauge.call_args_list + ) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 3a05b16f5582b..ae2fa339a66a1 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -935,7 +935,7 @@ def test_get_count_propagates_db_error(self, session): from sqlalchemy.exc import OperationalError with mock.patch.object( - session, "execute", side_effect=OperationalError("db failure", None, Exception("db failure")) + session, "execute", side_effect=OperationalError("db failure", {}, Exception("db failure")) ): with pytest.raises(OperationalError): SDM.get_count(session=session)