diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b646eb..b57f95d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [2.4.0] - 2026-01-18 + +### Fixed +- **RFC-002 Alignment**: TrustLevel enum values now match RFC-002 §5 exactly +- **BadgeClaims**: Aligned claim field names with RFC-002 specification + +### Added +- **MCP Service Client**: RFC-006/RFC-007 operations via MCP protocol +- **MCP gRPC Client**: Server identity operations + ## [2.3.1] - 2025-01-14 ### Fixed diff --git a/capiscio_sdk/badge.py b/capiscio_sdk/badge.py index a68ad2f..1b99e8f 100644 --- a/capiscio_sdk/badge.py +++ b/capiscio_sdk/badge.py @@ -64,16 +64,30 @@ class VerifyMode(Enum): class TrustLevel(Enum): - """Trust level as defined in RFC-002.""" + """Trust level as defined in RFC-002 §5. + + Levels: + LEVEL_0 (SS): Self-Signed - did:key, iss == sub. Development only. + LEVEL_1 (REG): Registered - Account registration with CA. + LEVEL_2 (DV): Domain Validated - DNS/HTTP domain ownership proof. + LEVEL_3 (OV): Organization Validated - Legal entity verification. + LEVEL_4 (EV): Extended Validated - Manual review + security audit. + """ + + LEVEL_0 = "0" + """Self-Signed (SS) - did:key, iss == sub. Development only.""" LEVEL_1 = "1" - """Domain Validated (DV) - Basic verification.""" + """Registered (REG) - Account registration with CA.""" LEVEL_2 = "2" - """Organization Validated (OV) - Business verification.""" + """Domain Validated (DV) - DNS/HTTP domain ownership proof.""" LEVEL_3 = "3" - """Extended Validation (EV) - Rigorous vetting.""" + """Organization Validated (OV) - Legal entity verification.""" + + LEVEL_4 = "4" + """Extended Validated (EV) - Manual review + security audit.""" @classmethod def from_string(cls, value: str) -> "TrustLevel": @@ -81,7 +95,7 @@ def from_string(cls, value: str) -> "TrustLevel": for level in cls: if level.value == value: return level - raise ValueError(f"Unknown trust level: {value}") + raise ValueError(f"Unknown trust level: {value}. Valid levels: 0 (SS), 1 (REG), 2 (DV), 3 (OV), 4 (EV)") @dataclass @@ -90,15 +104,24 @@ class BadgeClaims: Attributes: jti: Unique badge identifier (UUID). - issuer: Badge issuer URL (CA). + issuer: Badge issuer URL (CA) or did:key for self-signed. subject: Agent DID (did:web format). audience: Optional list of intended audience URLs. issued_at: When the badge was issued. expires_at: When the badge expires. - trust_level: Trust level (1=DV, 2=OV, 3=EV). + trust_level: Trust level per RFC-002 §5: + - 0 (SS): Self-Signed - Development only + - 1 (REG): Registered - Account registration + - 2 (DV): Domain Validated - DNS/HTTP proof + - 3 (OV): Organization Validated - Legal entity + - 4 (EV): Extended Validated - Security audit domain: Agent's verified domain. agent_name: Human-readable agent name. agent_id: Extracted agent ID from subject DID. + ial: Identity Assurance Level (RFC-002 §7.2.1): + - "0": Account-attested (no key proof) + - "1": Proof of Possession (key holder verified, has cnf claim) + raw_claims: Original JWT claims dict for advanced access. """ jti: str @@ -110,6 +133,8 @@ class BadgeClaims: domain: str agent_name: str = "" audience: List[str] = field(default_factory=list) + ial: str = "0" # RFC-002 §7.2.1: Default IAL-0 (account-attested) + raw_claims: Optional[dict] = field(default=None, repr=False) # For advanced access @property def agent_id(self) -> str: @@ -133,6 +158,11 @@ def is_not_yet_valid(self) -> bool: @classmethod def from_dict(cls, data: dict) -> "BadgeClaims": """Create BadgeClaims from a dictionary.""" + # Handle audience - can be string or list + aud = data.get("aud", []) + if isinstance(aud, str): + aud = [aud] if aud else [] + return cls( jti=data.get("jti", ""), issuer=data.get("iss", ""), @@ -142,12 +172,14 @@ def from_dict(cls, data: dict) -> "BadgeClaims": trust_level=TrustLevel.from_string(data.get("trust_level", "1")), domain=data.get("domain", ""), agent_name=data.get("agent_name", ""), - audience=data.get("aud", []), + audience=aud, + ial=data.get("ial", "0"), # RFC-002 §7.2.1 + raw_claims=data, # Preserve for advanced access (cnf, key, etc.) ) def to_dict(self) -> dict: """Convert to dictionary.""" - return { + result = { "jti": self.jti, "iss": self.issuer, "sub": self.subject, @@ -157,7 +189,31 @@ def to_dict(self) -> dict: "domain": self.domain, "agent_name": self.agent_name, "aud": self.audience, + "ial": self.ial, } + return result + + @property + def has_key_binding(self) -> bool: + """Check if this badge has IAL-1 key binding (cnf claim). + + Per RFC-002 §7.2.1, IAL-1 badges include a 'cnf' (confirmation) claim + that cryptographically binds the badge to the agent's private key. + """ + if self.raw_claims is None: + return self.ial == "1" + return "cnf" in self.raw_claims + + @property + def confirmation_key(self) -> Optional[dict]: + """Get the confirmation key (cnf claim) if present. + + Returns the JWK thumbprint or key from the cnf claim for IAL-1 badges. + Returns None for IAL-0 badges or if cnf is not present. + """ + if self.raw_claims is None: + return None + return self.raw_claims.get("cnf") @dataclass @@ -454,7 +510,11 @@ async def request_badge( ca_url: Certificate Authority URL (default: CapiscIO registry). api_key: API key for authentication with the CA. domain: Agent's domain (required for verification). - trust_level: Requested trust level (1=DV, 2=OV, 3=EV). + trust_level: Requested trust level per RFC-002 §5: + - 1 (REG): Registered - Account registration + - 2 (DV): Domain Validated - DNS/HTTP proof + - 3 (OV): Organization Validated - Legal entity + - 4 (EV): Extended Validated - Security audit audience: Optional audience restrictions for the badge. timeout: Request timeout in seconds (not used with gRPC). diff --git a/capiscio_sdk/integrations/fastapi.py b/capiscio_sdk/integrations/fastapi.py index 91b5bb4..0c15a71 100644 --- a/capiscio_sdk/integrations/fastapi.py +++ b/capiscio_sdk/integrations/fastapi.py @@ -1,5 +1,5 @@ """FastAPI integration for Capiscio SimpleGuard.""" -from typing import Callable, Awaitable, Any, Dict +from typing import Callable, Awaitable, Any, Dict, List, Optional try: from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request @@ -15,22 +15,34 @@ class CapiscioMiddleware(BaseHTTPMiddleware): """ Middleware to enforce A2A identity verification on incoming requests. + + Args: + app: The ASGI application. + guard: SimpleGuard instance for verification. + exclude_paths: List of paths to skip verification (e.g., ["/health", "/.well-known/agent-card.json"]). """ - def __init__(self, app: ASGIApp, guard: SimpleGuard) -> None: + def __init__( + self, + app: ASGIApp, + guard: SimpleGuard, + exclude_paths: Optional[List[str]] = None + ) -> None: super().__init__(app) self.guard = guard + self.exclude_paths = exclude_paths or [] async def dispatch( self, request: Request, call_next: Callable[[Request], Awaitable[Response]] ) -> Response: - # Allow health checks or public endpoints if needed - # For now, we assume everything under /agent/ needs protection - # But let's just check for the header. - + # Allow CORS preflight if request.method == "OPTIONS": return await call_next(request) + + # Skip verification for excluded paths + if request.url.path in self.exclude_paths: + return await call_next(request) # RFC-002 §9.1: X-Capiscio-Badge header auth_header = request.headers.get("X-Capiscio-Badge") diff --git a/pyproject.toml b/pyproject.toml index 66a9c7d..432472c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "capiscio-sdk" -version = "2.3.1" +version = "2.4.0" description = "Runtime security middleware for A2A agents" readme = "README.md" requires-python = ">=3.10" diff --git a/tests/unit/test_badge.py b/tests/unit/test_badge.py index f636f7d..dd8806e 100644 --- a/tests/unit/test_badge.py +++ b/tests/unit/test_badge.py @@ -25,21 +25,29 @@ class TestTrustLevel: """Tests for TrustLevel enum.""" def test_from_string_valid(self): - """Test parsing valid trust levels.""" - assert TrustLevel.from_string("1") == TrustLevel.LEVEL_1 - assert TrustLevel.from_string("2") == TrustLevel.LEVEL_2 - assert TrustLevel.from_string("3") == TrustLevel.LEVEL_3 + """Test parsing valid trust levels (RFC-002 §5).""" + assert TrustLevel.from_string("0") == TrustLevel.LEVEL_0 # Self-Signed (SS) + assert TrustLevel.from_string("1") == TrustLevel.LEVEL_1 # Registered (REG) + assert TrustLevel.from_string("2") == TrustLevel.LEVEL_2 # Domain Validated (DV) + assert TrustLevel.from_string("3") == TrustLevel.LEVEL_3 # Organization Validated (OV) + assert TrustLevel.from_string("4") == TrustLevel.LEVEL_4 # Extended Validated (EV) def test_from_string_invalid(self): """Test parsing invalid trust level raises error.""" with pytest.raises(ValueError, match="Unknown trust level"): - TrustLevel.from_string("4") + TrustLevel.from_string("5") # Invalid - only 0-4 exist + with pytest.raises(ValueError, match="Unknown trust level"): + TrustLevel.from_string("99") + with pytest.raises(ValueError, match="Unknown trust level"): + TrustLevel.from_string("invalid") def test_value_property(self): """Test value property returns string.""" + assert TrustLevel.LEVEL_0.value == "0" assert TrustLevel.LEVEL_1.value == "1" assert TrustLevel.LEVEL_2.value == "2" assert TrustLevel.LEVEL_3.value == "3" + assert TrustLevel.LEVEL_4.value == "4" class TestVerifyMode: @@ -167,6 +175,72 @@ def test_to_dict(self): assert data["domain"] == "example.com" assert data["agent_name"] == "Test Agent" assert data["aud"] == ["https://service.example.com"] + assert data["ial"] == "0" # Default IAL-0 + + def test_ial_claim_parsing(self): + """Test IAL claim is correctly parsed from dict (RFC-002 §7.2.1).""" + # IAL-0 badge (account-attested, no key proof) + data_ial0 = { + "jti": "badge-ial0", + "iss": "https://registry.capisc.io", + "sub": "did:web:registry.capisc.io:agents:agent1", + "iat": 1704067200, + "exp": 1735689600, + "trust_level": "1", + "domain": "example.com", + "ial": "0", + } + claims_ial0 = BadgeClaims.from_dict(data_ial0) + assert claims_ial0.ial == "0" + assert not claims_ial0.has_key_binding + + # IAL-1 badge (proof of possession, with cnf claim) + data_ial1 = { + "jti": "badge-ial1", + "iss": "https://registry.capisc.io", + "sub": "did:web:registry.capisc.io:agents:agent2", + "iat": 1704067200, + "exp": 1735689600, + "trust_level": "1", + "domain": "example.com", + "ial": "1", + "cnf": {"jkt": "sha256-thumbprint-of-key"}, + } + claims_ial1 = BadgeClaims.from_dict(data_ial1) + assert claims_ial1.ial == "1" + assert claims_ial1.has_key_binding + assert claims_ial1.confirmation_key == {"jkt": "sha256-thumbprint-of-key"} + + def test_raw_claims_preserved(self): + """Test raw_claims dict is preserved for advanced access.""" + data = { + "jti": "badge-raw", + "iss": "https://registry.capisc.io", + "sub": "did:web:registry.capisc.io:agents:test", + "iat": 1704067200, + "exp": 1735689600, + "trust_level": "2", + "domain": "example.com", + "custom_claim": "custom_value", # Non-standard claim + } + claims = BadgeClaims.from_dict(data) + assert claims.raw_claims is not None + assert claims.raw_claims.get("custom_claim") == "custom_value" + + def test_audience_string_to_list(self): + """Test audience string is converted to list.""" + data = { + "jti": "badge-aud", + "iss": "https://registry.capisc.io", + "sub": "did:web:registry.capisc.io:agents:test", + "iat": 1704067200, + "exp": 1735689600, + "trust_level": "1", + "domain": "example.com", + "aud": "https://single-audience.example.com", # String, not list + } + claims = BadgeClaims.from_dict(data) + assert claims.audience == ["https://single-audience.example.com"] class TestVerifyOptions: diff --git a/tests/unit/test_fastapi_integration.py b/tests/unit/test_fastapi_integration.py index 718b4c3..f85405b 100644 --- a/tests/unit/test_fastapi_integration.py +++ b/tests/unit/test_fastapi_integration.py @@ -104,3 +104,44 @@ def test_middleware_invalid_signature(client, mock_guard): assert response.status_code == 403 assert "Access Denied" in response.json()["error"] + + +def test_middleware_exclude_paths(): + """Test that exclude_paths parameter allows bypassing verification.""" + mock_guard = MagicMock() + mock_guard.agent_id = "test-agent" + + app = FastAPI() + app.add_middleware( + CapiscioMiddleware, + guard=mock_guard, + exclude_paths=["/health", "/.well-known/agent-card.json"] + ) + + @app.get("/health") + async def health(): + return {"status": "ok"} + + @app.get("/.well-known/agent-card.json") + async def agent_card(): + return {"name": "Test Agent"} + + @app.post("/protected") + async def protected(): + return {"secret": "data"} + + client = TestClient(app) + + # Excluded paths should work without header + response = client.get("/health") + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + response = client.get("/.well-known/agent-card.json") + assert response.status_code == 200 + assert response.json()["name"] == "Test Agent" + + # Non-excluded paths should require header + response = client.post("/protected", json={}) + assert response.status_code == 401 + assert "Missing X-Capiscio-Badge" in response.json()["error"]