Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,12 @@ tests/test_data/rapl/*
credentials*
.codecarbon.config*
scripts/agent-vm.personal.config.sh

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield
1 change: 1 addition & 0 deletions codecarbon/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Optional integrations for frameworks and platforms."""
13 changes: 13 additions & 0 deletions codecarbon/integrations/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""FastAPI integration: middleware and lifespan helpers."""

from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan
from codecarbon.integrations.fastapi.middleware import (
CodeCarbonMiddleware,
add_codecarbon_middleware,
)

__all__ = [
"CodeCarbonMiddleware",
"add_codecarbon_middleware",
"create_codecarbon_lifespan",
]
119 changes: 119 additions & 0 deletions codecarbon/integrations/fastapi/_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Configurable response headers from emissions measurements."""

from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from typing import Union

from starlette.requests import Request
from starlette.responses import Response

from codecarbon.output_methods.emissions_data import EmissionsData

HeaderConfig = Union[bool, str, Sequence[str], Mapping[str, str], None]
HeaderFormatter = Callable[[EmissionsData, Request], Mapping[str, str]]

FIELD_UNITS: dict[str, str] = {
"emissions": "kg",
"emissions_rate": "kg-per-s",
"duration": "s",
"energy_consumed": "kwh",
"cpu_energy": "kwh",
"gpu_energy": "kwh",
"ram_energy": "kwh",
"water_consumed": "l",
"cpu_power": "w",
"gpu_power": "w",
"ram_power": "w",
"cpu_utilization_percent": "percent",
"gpu_utilization_percent": "percent",
"ram_utilization_percent": "percent",
"ram_used_gb": "gb",
"pue": "ratio",
"wue": "l-per-kwh",
}

HEADER_PRESETS: dict[str, dict[str, str]] = {
"emissions": {"emissions": "X-CodeCarbon-Emissions-kg"},
"default": {
"emissions": "X-CodeCarbon-Emissions-kg",
"energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh",
"duration": "X-CodeCarbon-Duration-s",
"emissions_rate": "X-CodeCarbon-Emissions-Rate-kg-per-s",
},
"energy": {
"emissions": "X-CodeCarbon-Emissions-kg",
"energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh",
"cpu_energy": "X-CodeCarbon-Cpu-Energy-kwh",
"gpu_energy": "X-CodeCarbon-Gpu-Energy-kwh",
"ram_energy": "X-CodeCarbon-Ram-Energy-kwh",
"duration": "X-CodeCarbon-Duration-s",
},
"power": {
"emissions": "X-CodeCarbon-Emissions-kg",
"cpu_power": "X-CodeCarbon-Cpu-Power-w",
"gpu_power": "X-CodeCarbon-Gpu-Power-w",
"ram_power": "X-CodeCarbon-Ram-Power-w",
"duration": "X-CodeCarbon-Duration-s",
},
}

FULL_HEADER_FIELDS: tuple[str, ...] = tuple(FIELD_UNITS.keys())


def _auto_header_name(field: str) -> str:
unit = FIELD_UNITS.get(field, "")
title = "-".join(part.capitalize() for part in field.split("_"))
suffix = f"-{unit}" if unit else ""
return f"X-CodeCarbon-{title}{suffix}"


def resolve_header_mapping(config: HeaderConfig) -> dict[str, str]:
"""Normalize ``response_headers`` settings to ``{field_name: header_name}``.

Args:
config: ``None`` or ``False`` for no headers; ``True`` for the emissions preset;
a preset name (``emissions``, ``default``, ``energy``, ``power``, ``full``);
a sequence of field names (auto header names); or an explicit mapping.

Returns:
Mapping from :class:`~codecarbon.output_methods.emissions_data.EmissionsData`
attribute names to HTTP header names.

Raises:
ValueError: If ``config`` is a string that is not a known preset (other than
``full``).
"""
if config is None or config is False:
return {}
if config is True:
return dict(HEADER_PRESETS["emissions"])
if isinstance(config, str):
preset = HEADER_PRESETS.get(config)
if preset is None:
if config == "full":
return {field: _auto_header_name(field) for field in FULL_HEADER_FIELDS}
raise ValueError(f"Unknown response_headers preset: {config!r}")
return dict(preset)
if isinstance(config, Mapping):
return dict(config)
return {field: _auto_header_name(field) for field in config}


def apply_response_headers(
response: Response,
emissions_data: EmissionsData,
header_mapping: Mapping[str, str],
) -> None:
"""Write selected emission fields onto an HTTP response as headers.

Args:
response: Outgoing Starlette response (headers are updated in place).
emissions_data: Values read via ``getattr`` for each key in ``header_mapping``.
header_mapping: Field name to HTTP header name; unknown fields are skipped.
"""
for field, header_name in header_mapping.items():
if not hasattr(emissions_data, field):
continue
value = getattr(emissions_data, field)
response.headers[header_name] = str(value)
135 changes: 135 additions & 0 deletions codecarbon/integrations/fastapi/_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Route naming and endpoint filter helpers for FastAPI/Starlette."""

from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from starlette.requests import Request

DEFAULT_EXCLUDE: frozenset[str] = frozenset(
{
"/docs",
"/redoc",
"/openapi.json",
"/health",
"/healthz",
"/ready",
"/live",
}
)

HTTP_METHODS = frozenset(
{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", "TRACE", "CONNECT"}
)


def get_endpoint_path(request: "Request") -> str:
"""Return the mounted route template or the raw URL path.

Args:
request: Current Starlette/FastAPI request.

Returns:
Route template such as ``/items/{item_id}``, or ``request.url.path``.
"""
route = request.scope.get("route")
if route is not None:
return route.path
return request.url.path


def build_endpoint_key(request: "Request") -> str:
"""Build a stable endpoint identifier such as ``GET /predict``.

Args:
request: Current Starlette/FastAPI request.

Returns:
HTTP method plus route template or URL path.
"""
return f"{request.method} {get_endpoint_path(request)}"


def is_method_pattern(pattern: str) -> bool:
"""Return True when ``pattern`` is ``METHOD /path``."""
method, _, path = pattern.partition(" ")
return method in HTTP_METHODS and path.startswith("/")


def matches_exclude(
pattern: str,
url_path: str,
endpoint_key: str,
endpoint_path: str,
) -> bool:
"""Return True when an exclude pattern matches the request."""
if is_method_pattern(pattern):
return endpoint_key == pattern
if not pattern.startswith("/"):
return endpoint_key == pattern
return (
url_path == pattern
or url_path.startswith(f"{pattern}/")
or endpoint_path == pattern
)


def matches_include(pattern: str, endpoint_key: str, endpoint_path: str) -> bool:
"""Return True when an include pattern matches the request."""
if is_method_pattern(pattern):
return endpoint_key == pattern
if pattern.startswith("/"):
return endpoint_path == pattern
return endpoint_key == pattern


def should_track_request(
request: "Request",
include: Iterable[str] | None,
exclude: Iterable[str],
) -> bool:
"""Return True when the request should be measured.

Patterns use one of two forms:

* ``METHOD /route/template`` — one HTTP method on one route (e.g. ``GET /predict``)
* ``/route/template`` — any method on that route, or a URL path prefix when excluding

Args:
request: Current Starlette/FastAPI request.
include: When set, only matching endpoints are tracked.
exclude: Endpoints or URL prefixes to skip.

Returns:
True when CodeCarbon should track this request.
"""
url_path = request.url.path
endpoint_key = build_endpoint_key(request)
endpoint_path = get_endpoint_path(request)
for pattern in exclude:
if matches_exclude(pattern, url_path, endpoint_key, endpoint_path):
return False
if include is None:
return True
return any(
matches_include(pattern, endpoint_key, endpoint_path) for pattern in include
)


def build_task_name(
request: "Request",
formatter: Callable[["Request"], str] | None = None,
) -> str:
"""Derive a stable label like ``GET /items/{item_id}`` for task-scoped tracking.

Args:
request: Current Starlette/FastAPI request.
formatter: Optional function that returns the task name instead of the default.

Returns:
Method plus route template when a route is mounted on the request scope,
otherwise method plus the raw URL path.
"""
if formatter is not None:
return formatter(request)
return build_endpoint_key(request)
38 changes: 38 additions & 0 deletions codecarbon/integrations/fastapi/lifespan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Lifespan helpers for sharing one ``EmissionsTracker`` across requests."""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

from codecarbon import EmissionsTracker


@asynccontextmanager
async def create_codecarbon_lifespan(
app: Any,
*,
project_name: str = "codecarbon-fastapi",
**tracker_kwargs: Any,
) -> AsyncIterator[None]:
"""Start a tracker for the app lifetime and expose it on ``app.state``.

Args:
app: Starlette/FastAPI application with ``state`` namespace.
project_name: ``project_name`` for :class:`~codecarbon.EmissionsTracker`.
**tracker_kwargs: Extra constructor kwargs for the tracker.

Yields:
``None`` while the app runs.
"""
merged = dict(tracker_kwargs)
merged.setdefault("allow_multiple_runs", True)
tracker = EmissionsTracker(project_name=project_name, **merged)
tracker.start()
app.state.codecarbon_tracker = tracker
try:
yield
finally:
tracker.stop()
app.state.codecarbon_tracker = None
Loading
Loading