-
Notifications
You must be signed in to change notification settings - Fork 39
feat(server): bound database connection pool #236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ed28d63
029eaf1
3f231c9
6d33097
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,14 @@ | ||
| from collections.abc import AsyncGenerator | ||
| from typing import Any | ||
|
|
||
| from prometheus_client import Gauge | ||
| from sqlalchemy import event | ||
| from sqlalchemy.engine.url import make_url | ||
| from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine | ||
| from sqlalchemy.ext.asyncio.engine import AsyncEngine | ||
| from sqlalchemy.orm import DeclarativeBase | ||
|
|
||
| from .config import db_config | ||
| from .config import AgentControlServerDatabaseConfig, db_config | ||
|
|
||
|
|
||
| class Base(DeclarativeBase): | ||
|
|
@@ -13,11 +18,83 @@ class Base(DeclarativeBase): | |
| # Async SQLAlchemy setup for PostgreSQL | ||
| db_url = db_config.get_url() | ||
|
|
||
| async_engine = create_async_engine( | ||
| db_url, | ||
| echo=False, | ||
| SQLALCHEMY_CHECKED_OUT_CONNECTIONS = Gauge( | ||
| "agent_control_server_sqlalchemy_checked_out_connections", | ||
| "Number of checked out SQLAlchemy connections.", | ||
| ["pool_name"], | ||
| multiprocess_mode="livesum", | ||
| ) | ||
|
|
||
|
|
||
| def _supports_queue_pool_config(url: str) -> bool: | ||
| """Return whether SQLAlchemy QueuePool kwargs should be applied for this URL.""" | ||
| return make_url(url).get_backend_name() != "sqlite" | ||
|
|
||
|
|
||
| def _build_connect_args( | ||
| url: str, | ||
| config: AgentControlServerDatabaseConfig, | ||
| ) -> dict[str, Any]: | ||
| """Build driver-level connect args bounding connection setup and statement runtime. | ||
|
|
||
| Pool timeouts only bound how long a request waits for a connection; these | ||
| args bound how long a connection takes to establish and how long any one | ||
| statement may hold it. Drivers without known timeout args get none. | ||
| """ | ||
| driver = make_url(url).get_driver_name() | ||
| statement_timeout_ms = int(config.statement_timeout_seconds * 1000) | ||
| if driver == "psycopg": | ||
| connect_args: dict[str, Any] = {"connect_timeout": config.connect_timeout_seconds} | ||
| if statement_timeout_ms: | ||
| connect_args["options"] = f"-c statement_timeout={statement_timeout_ms}" | ||
| return connect_args | ||
| if driver == "asyncpg": | ||
| connect_args = {"timeout": float(config.connect_timeout_seconds)} | ||
| if statement_timeout_ms: | ||
| connect_args["server_settings"] = {"statement_timeout": str(statement_timeout_ms)} | ||
| return connect_args | ||
| return {} | ||
|
|
||
|
|
||
| def _build_async_engine_kwargs( | ||
| url: str, | ||
| config: AgentControlServerDatabaseConfig, | ||
| ) -> dict[str, Any]: | ||
| """Build async SQLAlchemy engine kwargs from database config.""" | ||
| kwargs: dict[str, Any] = {"echo": False} | ||
| if not _supports_queue_pool_config(url): | ||
| return kwargs | ||
|
|
||
| kwargs.update( | ||
| pool_pre_ping=True, | ||
| pool_size=config.pool_size, | ||
| max_overflow=config.max_overflow, | ||
| pool_timeout=config.pool_timeout_seconds, | ||
| pool_reset_on_return="rollback", | ||
| ) | ||
| connect_args = _build_connect_args(url, config) | ||
| if connect_args: | ||
| kwargs["connect_args"] = connect_args | ||
| return kwargs | ||
|
|
||
|
|
||
| def _instrument_connection_pool(engine: AsyncEngine) -> None: | ||
| """Track checked-out connections from the async engine's underlying pool.""" | ||
| # Create the labeled series eagerly so idle processes scrape as 0, not absent. | ||
| SQLALCHEMY_CHECKED_OUT_CONNECTIONS.labels("default").set(0) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The checkout/checkin event pair is the standard approach, but on a connection that's invalidated rather than returned, the matching checkin may not fire, leaking the counter over a long-lived process. pool_pre_ping=True reduces the frequency. If exactness matters, gauging off pool.checkedout() at scrape time avoids the drift; otherwise fine as-is. |
||
|
|
||
| @event.listens_for(engine.sync_engine.pool, "checkin") | ||
| def receive_checkin(dbapi_conn: Any, connection_record: Any) -> None: | ||
| SQLALCHEMY_CHECKED_OUT_CONNECTIONS.labels("default").dec() | ||
|
|
||
| @event.listens_for(engine.sync_engine.pool, "checkout") | ||
| def receive_checkout(dbapi_conn: Any, connection_record: Any, connection_proxy: Any) -> None: | ||
| SQLALCHEMY_CHECKED_OUT_CONNECTIONS.labels("default").inc() | ||
|
|
||
|
|
||
| async_engine = create_async_engine(db_url, **_build_async_engine_kwargs(db_url, db_config)) | ||
| _instrument_connection_pool(async_engine) | ||
|
|
||
| AsyncSessionLocal = async_sessionmaker( | ||
| bind=async_engine, | ||
| autoflush=False, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_build_connect_args returns {} for any driver other than psycopg/asyncpg, while pool kwargs still apply. It's documented ("Drivers without known timeout args get none") and a bare postgresql:// URL would fail create_async_engine anyway, so this is mostly theoretical — but a future valid async driver would lose timeouts without warning. A debug-level log when connect_args come back empty for a non-sqlite backend would make that visible.