Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import attrs
import structlog
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import load_only
from tabulate import tabulate
from uuid6 import uuid7
Expand All @@ -62,6 +63,7 @@
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.models.dagbundle import DagBundleModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
Expand Down Expand Up @@ -91,6 +93,9 @@
from airflow.sdk.api.client import Client


log = structlog.get_logger(__name__)


class DagParsingStat(NamedTuple):
"""Information on processing progress."""

Expand Down Expand Up @@ -1566,6 +1571,16 @@ def emit_metrics(*, parse_time: float, dag_file_stats: Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in dag_file_stats))
# Gated by config so large deployments can opt out of the extra DB round-trip.
# On failure an error counter is incremented so dashboards can alert on
# missing samples rather than silently showing a stale last value.
if conf.getboolean("scheduler", "emit_serialized_dag_count_metric", fallback=True):
try:
with create_session() as session:
stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session))
except SQLAlchemyError:
log.exception("Failed to emit serialized_dag.count metric")
stats.incr("serialized_dag.count_error")
Comment thread
gingeekrishna marked this conversation as resolved.


def process_parse_results(
Expand Down
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,21 @@ def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool:
"""
return session.scalar(select(literal(True)).where(cls.dag_id == dag_id).limit(1)) is not None

@classmethod
@provide_session
def get_count(cls, session: Session = NEW_SESSION) -> int:
"""
Return the total number of serialized DAGs stored in the database.

Uses ``scalar_one()`` to enforce "exactly one row" semantics.
``COUNT(*)`` always returns exactly one row, so ``scalar_one()`` makes
that contract explicit and will raise ``MultipleResultsFound`` if the
query shape ever changes unexpectedly.

:param session: ORM Session
"""
return session.execute(select(func.count()).select_from(cls)).scalar_one()

@classmethod
@provide_session
def get_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> SerializedDAG | None:
Expand Down
57 changes: 57 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2790,3 +2790,60 @@ def test_bundle_version_data_stored_after_refresh(self, session):

assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data


class TestEmitMetrics:
"""Tests for the emit_metrics module-level function."""

def test_emit_metrics_emits_serialized_dag_count(self):
"""emit_metrics emits the serialized_dag.count gauge with the value from get_count."""
from airflow.dag_processing.manager import emit_metrics

with mock.patch("airflow.dag_processing.manager.conf") as mock_conf:
mock_conf.getboolean.return_value = True
with mock.patch("airflow.dag_processing.manager.create_session"):
with mock.patch(
"airflow.dag_processing.manager.SerializedDagModel.get_count", return_value=3
):
with mock.patch("airflow.dag_processing.manager.stats") as mock_stats:
emit_metrics(parse_time=1.0, dag_file_stats=[])

mock_stats.gauge.assert_any_call("serialized_dag.count", 3)

def test_emit_metrics_logs_and_swallows_db_error(self):
"""emit_metrics logs, increments error counter, and swallows SQLAlchemyError from get_count."""
from sqlalchemy.exc import SQLAlchemyError

from airflow.dag_processing.manager import emit_metrics

with mock.patch("airflow.dag_processing.manager.conf") as mock_conf:
mock_conf.getboolean.return_value = True
with mock.patch("airflow.dag_processing.manager.create_session"):
with mock.patch(
"airflow.dag_processing.manager.SerializedDagModel.get_count",
side_effect=SQLAlchemyError("db failure"),
):
with mock.patch("airflow.dag_processing.manager.stats") as mock_stats:
with mock.patch("airflow.dag_processing.manager.log") as mock_log:
emit_metrics(parse_time=1.0, dag_file_stats=[])

mock_log.exception.assert_called_once_with("Failed to emit serialized_dag.count metric")
mock_stats.incr.assert_called_once_with("serialized_dag.count_error")

def test_emit_metrics_skips_serialized_dag_count_when_disabled(self):
"""emit_metrics does not query or emit serialized_dag.count when config is False."""
from airflow.dag_processing.manager import emit_metrics

with mock.patch("airflow.dag_processing.manager.conf") as mock_conf:
mock_conf.getboolean.return_value = False
with mock.patch(
"airflow.dag_processing.manager.SerializedDagModel.get_count"
) as mock_get_count:
with mock.patch("airflow.dag_processing.manager.stats") as mock_stats:
emit_metrics(parse_time=1.0, dag_file_stats=[])

mock_get_count.assert_not_called()
assert not any(
call == mock.call("serialized_dag.count", mock.ANY)
for call in mock_stats.gauge.call_args_list
)
27 changes: 27 additions & 0 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,3 +912,30 @@ def test_deadline_name_change_updates_db_and_returns_true(self, testing_dag_bund

# The name must have been updated in the DB.
assert updated_alert.name == "updated name"

def test_get_count_returns_zero_on_empty_table(self, session):
"""get_count() returns 0 when no serialized DAGs are stored."""
db.clear_db_serialized_dags()
assert SDM.get_count(session=session) == 0

def test_get_count_returns_correct_value(self, dag_maker, session):
"""get_count() returns the exact number of serialized DAGs in the table."""
baseline = SDM.get_count(session=session)
with dag_maker("dag_count_1"):
pass
with dag_maker("dag_count_2"):
pass
Comment thread
gingeekrishna marked this conversation as resolved.
# dag_maker writes SerializedDagModel rows on context exit; flush to make
# them visible within the same session before asserting.
session.flush()
assert SDM.get_count(session=session) == baseline + 2

def test_get_count_propagates_db_error(self, session):
"""get_count() lets OperationalError propagate so callers can handle DB failures."""
from sqlalchemy.exc import OperationalError

with mock.patch.object(
session, "execute", side_effect=OperationalError("db failure", {}, Exception("db failure"))
):
with pytest.raises(OperationalError):
SDM.get_count(session=session)
Loading