diff --git a/agent/src/policy.py b/agent/src/policy.py index 1a50fbb9..a27e101e 100644 --- a/agent/src/policy.py +++ b/agent/src/policy.py @@ -110,6 +110,43 @@ def _load_shared_constants() -> dict: _ATS = _SHARED_CONSTANTS["approval_timeout_s"] FLOOR_TIMEOUT_S: int = int(_ATS["min"]) # §6 decision #6: rejected below this at load DEFAULT_TASK_TIMEOUT_S: int = int(_ATS["default"]) # §6 decision #6 default + + +def _validate_constants() -> None: + """Fail-fast on invariant violations in contracts/constants.json.""" + if FLOOR_TIMEOUT_S <= 0: + raise ValueError( + f"contracts/constants.json: approval_timeout_s.min must be > 0, got {FLOOR_TIMEOUT_S}" + ) + if DEFAULT_TASK_TIMEOUT_S < FLOOR_TIMEOUT_S: + raise ValueError( + f"contracts/constants.json: approval_timeout_s.default ({DEFAULT_TASK_TIMEOUT_S}) " + f"must be >= min ({FLOOR_TIMEOUT_S})" + ) + ats_max = int(_ATS["max"]) + if ats_max < DEFAULT_TASK_TIMEOUT_S: + raise ValueError( + f"contracts/constants.json: approval_timeout_s.max ({ats_max}) " + f"must be >= default ({DEFAULT_TASK_TIMEOUT_S})" + ) + if APPROVAL_GATE_CAP_MIN <= 0: + raise ValueError( + f"contracts/constants.json: approval_gate_cap.min must be > 0, " + f"got {APPROVAL_GATE_CAP_MIN}" + ) + if DEFAULT_APPROVAL_GATE_CAP < APPROVAL_GATE_CAP_MIN: + raise ValueError( + f"contracts/constants.json: approval_gate_cap.default ({DEFAULT_APPROVAL_GATE_CAP}) " + f"must be >= min ({APPROVAL_GATE_CAP_MIN})" + ) + if APPROVAL_GATE_CAP_MAX < DEFAULT_APPROVAL_GATE_CAP: + raise ValueError( + f"contracts/constants.json: approval_gate_cap.max ({APPROVAL_GATE_CAP_MAX}) " + f"must be >= default ({DEFAULT_APPROVAL_GATE_CAP})" + ) + + +_validate_constants() CACHE_MAX_ENTRIES: int = 50 # §12.9: decoupled from approvalGateCap CACHE_TTL_S: float = 60.0 # §12.8 sliding-window TTL on DENIED/TIMED_OUT POLICIES_MAX_BYTES: int = 64 * 1024 # finding #12: reject blueprints > 64 KB diff --git a/agent/tests/test_policy.py b/agent/tests/test_policy.py index 5908e7e8..37532e81 100644 --- a/agent/tests/test_policy.py +++ b/agent/tests/test_policy.py @@ -1,10 +1,13 @@ """Unit tests for policy.py — Cedar policy engine.""" +from unittest.mock import patch + import pytest cedarpy = pytest.importorskip("cedarpy") -from policy import PolicyDecision, PolicyEngine +import policy +from policy import PolicyDecision, PolicyEngine, _validate_constants class TestPolicyDecision: @@ -231,3 +234,66 @@ def test_task_type_property(self): def test_task_type_pr_review(self): engine = PolicyEngine(task_type="pr_review", repo="owner/repo") assert engine.task_type == "pr_review" + + +class TestConstantsSemanticValidation: + """Verify _validate_constants rejects invariant violations.""" + + def test_rejects_approval_timeout_min_zero(self): + with ( + patch.object(policy, "FLOOR_TIMEOUT_S", 0), + pytest.raises(ValueError, match=r"approval_timeout_s\.min must be > 0"), + ): + _validate_constants() + + def test_rejects_approval_timeout_min_negative(self): + with ( + patch.object(policy, "FLOOR_TIMEOUT_S", -1), + pytest.raises(ValueError, match=r"approval_timeout_s\.min must be > 0"), + ): + _validate_constants() + + def test_rejects_approval_timeout_default_below_min(self): + with ( + patch.object(policy, "FLOOR_TIMEOUT_S", 60), + patch.object(policy, "DEFAULT_TASK_TIMEOUT_S", 30), + pytest.raises(ValueError, match=r"approval_timeout_s\.default .* must be >= min"), + ): + _validate_constants() + + def test_rejects_approval_timeout_max_below_default(self): + with ( + patch.object(policy, "FLOOR_TIMEOUT_S", 30), + patch.object(policy, "DEFAULT_TASK_TIMEOUT_S", 300), + patch.object(policy, "_ATS", {"min": 30, "max": 100, "default": 300}), + pytest.raises(ValueError, match=r"approval_timeout_s\.max .* must be >= default"), + ): + _validate_constants() + + def test_rejects_approval_gate_cap_min_zero(self): + with ( + patch.object(policy, "APPROVAL_GATE_CAP_MIN", 0), + pytest.raises(ValueError, match=r"approval_gate_cap\.min must be > 0"), + ): + _validate_constants() + + def test_rejects_approval_gate_cap_default_below_min(self): + with ( + patch.object(policy, "APPROVAL_GATE_CAP_MIN", 10), + patch.object(policy, "DEFAULT_APPROVAL_GATE_CAP", 5), + pytest.raises(ValueError, match=r"approval_gate_cap\.default .* must be >= min"), + ): + _validate_constants() + + def test_rejects_approval_gate_cap_max_below_default(self): + with ( + patch.object(policy, "APPROVAL_GATE_CAP_MIN", 1), + patch.object(policy, "DEFAULT_APPROVAL_GATE_CAP", 100), + patch.object(policy, "APPROVAL_GATE_CAP_MAX", 50), + pytest.raises(ValueError, match=r"approval_gate_cap\.max .* must be >= default"), + ): + _validate_constants() + + def test_passes_with_valid_constants(self): + """Sanity: the real constants pass validation.""" + _validate_constants() diff --git a/scripts/check-constants-sync.ts b/scripts/check-constants-sync.ts index 759675d9..ed44b326 100644 --- a/scripts/check-constants-sync.ts +++ b/scripts/check-constants-sync.ts @@ -108,6 +108,21 @@ function main(): number { return 1; } + // Semantic invariants (belt-and-suspenders — agent also validates at import time) + const invariantErrors: string[] = []; + if (agc.min <= 0) invariantErrors.push('approval_gate_cap.min must be > 0'); + if (agc.default < agc.min) invariantErrors.push('approval_gate_cap.default must be >= min'); + if (agc.max < agc.default) invariantErrors.push('approval_gate_cap.max must be >= default'); + if (ats.min <= 0) invariantErrors.push('approval_timeout_s.min must be > 0'); + if (ats.default < ats.min) invariantErrors.push('approval_timeout_s.default must be >= min'); + if (ats.max < ats.default) invariantErrors.push('approval_timeout_s.max must be >= default'); + + if (invariantErrors.length > 0) { + console.error(`Semantic invariant violations in ${CONSTANTS_JSON}:\n`); + for (const e of invariantErrors) console.error(` - ${e}`); + return 1; + } + const drifts = findDriftInPython(POLICY_PY); if (drifts.length > 0) {