diff --git a/capabilities/web-security/tests/test_xss_verifier.py b/capabilities/web-security/tests/test_xss_verifier.py
new file mode 100644
index 0000000..a68d735
--- /dev/null
+++ b/capabilities/web-security/tests/test_xss_verifier.py
@@ -0,0 +1,348 @@
+"""Tests for XssVerifier — programmatic XSS verification via agent-browser canary."""
+
+from __future__ import annotations
+
+import json
+import sys
+from pathlib import Path
+from unittest.mock import AsyncMock, patch
+
+import pytest
+
+pytestmark = pytest.mark.asyncio
+
+_REPO_ROOT = Path(__file__).resolve()
+while _REPO_ROOT != _REPO_ROOT.parent:
+ if (_REPO_ROOT / "capabilities" / "web-security" / "tools").is_dir():
+ break
+ _REPO_ROOT = _REPO_ROOT.parent
+sys.path.insert(0, str(_REPO_ROOT / "capabilities" / "web-security" / "tools"))
+
+from xss_verifier import XssVerifier
+
+
+@pytest.fixture
+def verifier() -> XssVerifier:
+ return XssVerifier()
+
+
+def _mock_eval(return_value: str) -> AsyncMock:
+ return AsyncMock(return_value=return_value)
+
+
+class TestToolDiscovery:
+ def test_tools_discovered(self, verifier: XssVerifier) -> None:
+ tools = verifier.get_tools()
+ names = {t.name for t in tools}
+ assert names == {"xss_inject_canary", "xss_verify", "xss_reset"}
+
+ def test_tools_have_catch(self, verifier: XssVerifier) -> None:
+ for tool in verifier.get_tools():
+ assert tool.catch is True
+
+ def test_inject_has_description(self, verifier: XssVerifier) -> None:
+ tools = {t.name: t for t in verifier.get_tools()}
+ assert "BEFORE" in tools["xss_inject_canary"].description
+
+ def test_verify_has_description(self, verifier: XssVerifier) -> None:
+ tools = {t.name: t for t in verifier.get_tools()}
+ assert "AFTER" in tools["xss_verify"].description
+
+
+class TestInjectCanary:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_arms_canary(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ result = await verifier.inject_canary()
+ assert "armed" in result.lower()
+ assert verifier._nonce is not None
+
+ @patch("xss_verifier._eval_js")
+ async def test_inject_generates_unique_nonce(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ first_nonce = verifier._nonce
+ await verifier.inject_canary()
+ assert verifier._nonce != first_nonce
+
+ @patch("xss_verifier._eval_js")
+ async def test_inject_passes_global_args(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary(global_args=["--session-name", "app"])
+ assert verifier._global_args == ["--session-name", "app"]
+
+
+class TestVerify:
+ async def test_verify_without_inject_raises(self, verifier: XssVerifier) -> None:
+ with pytest.raises(RuntimeError, match="No canary injected"):
+ await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+
+ @patch("xss_verifier._eval_js")
+ async def test_verify_unparseable_response_raises(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.return_value = "not-json-at-all"
+ with pytest.raises(RuntimeError, match="Could not parse canary state"):
+ await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+
+ @patch("xss_verifier._eval_js")
+ async def test_confirmed_on_alert(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ # Arm canary
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce = verifier._nonce
+
+ # Verify with alert signal
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce,
+ "alerts": ["1"],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+ assert result.startswith("CONFIRMED")
+ assert "alert() called 1x" in result
+
+ @patch("xss_verifier._eval_js")
+ async def test_confirmed_on_confirm_dialog(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce = verifier._nonce
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce,
+ "alerts": [],
+ "confirms": ["xss"],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="dom",
+ payload_used="
",
+ )
+ assert result.startswith("CONFIRMED")
+
+ @patch("xss_verifier._eval_js")
+ async def test_partial_on_script_injection_only(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce = verifier._nonce
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce,
+ "alerts": [],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [
+ {"src": None, "inline": "fetch('https://evil.com')"}
+ ],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="stored",
+ payload_used="",
+ )
+ assert result.startswith("PARTIAL")
+ assert "",
+ )
+ assert result.startswith("NOT_DETECTED")
+ assert "HTML-encoded" in result
+
+ @patch("xss_verifier._eval_js")
+ async def test_canary_lost_on_navigation(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.return_value = json.dumps({"armed": False})
+ result = await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+ assert "CANARY_LOST" in result
+
+ @patch("xss_verifier._eval_js")
+ async def test_nonce_mismatch(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": "wrong_nonce",
+ "alerts": [],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+ assert "NONCE_MISMATCH" in result
+
+
+class TestReset:
+ @patch("xss_verifier._eval_js")
+ async def test_reset_clears_state(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary(global_args=["--session-name", "test"])
+ assert verifier._nonce is not None
+ assert verifier._global_args is not None
+
+ result = await verifier.reset()
+ assert "reset" in result.lower()
+ assert verifier._nonce is None
+ assert verifier._global_args is None
+
+
+class TestMultipleCycles:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_verify_reset_cycle(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ # Cycle 1: inject, verify (confirmed), reset
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce1 = verifier._nonce
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce1,
+ "alerts": ["1"],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="reflected", payload_used=""
+ )
+ assert result.startswith("CONFIRMED")
+
+ await verifier.reset()
+
+ # Cycle 2: inject, verify (not detected)
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce2 = verifier._nonce
+ assert nonce2 != nonce1
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce2,
+ "alerts": [],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="dom", payload_used="
"
+ )
+ assert result.startswith("NOT_DETECTED")
+
+
+class TestErrorPropagation:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_propagates_eval_error(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.side_effect = RuntimeError("agent-browser is not available")
+ with pytest.raises(RuntimeError, match="agent-browser is not available"):
+ await verifier.inject_canary()
+
+ @patch("xss_verifier._eval_js")
+ async def test_verify_propagates_eval_timeout(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.side_effect = RuntimeError("timed out after 60s")
+ with pytest.raises(RuntimeError, match="timed out"):
+ await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+
+
+class TestHandleToolCall:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_via_handle_tool_call(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ from dreadnode.agents.tools import FunctionCall, ToolCall
+
+ mock_eval.return_value = "armed"
+ tools = {t.name: t for t in verifier.get_tools()}
+ tc = ToolCall(
+ id="call_inject",
+ function=FunctionCall(name="xss_inject_canary", arguments="{}"),
+ )
+ message, stop = await tools["xss_inject_canary"].handle_tool_call(tc)
+ assert stop is False
+ assert "armed" in message.content.lower()
diff --git a/capabilities/web-security/tools/xss_verifier.py b/capabilities/web-security/tools/xss_verifier.py
new file mode 100644
index 0000000..fab07f6
--- /dev/null
+++ b/capabilities/web-security/tools/xss_verifier.py
@@ -0,0 +1,289 @@
+"""Programmatic XSS verification via agent-browser.
+
+Injects a JavaScript canary into the page that overrides dialog functions
+(alert, confirm, prompt) and monitors DOM mutations for injected script
+execution. The agent triggers its payload, then calls verify to check
+whether the canary caught real JavaScript execution — not just reflection.
+
+Requires agent-browser to be available (see agent_browser MCP server).
+"""
+
+import asyncio
+import json
+import os
+import secrets
+import shlex
+import shutil
+from typing import Annotated, Literal
+
+from dreadnode.agents.tools import Toolset, tool_method
+
+_DEFAULT_TIMEOUT = int(os.environ.get("AGENT_BROWSER_TIMEOUT", "60"))
+
+
+def _resolve_command() -> list[str] | None:
+ configured = os.environ.get("AGENT_BROWSER_COMMAND")
+ if configured:
+ parts = shlex.split(configured)
+ if parts and shutil.which(parts[0]):
+ return parts
+ return None
+ if shutil.which("agent-browser"):
+ return ["agent-browser"]
+ if shutil.which("npx"):
+ return ["npx", "--yes", "agent-browser"]
+ return None
+
+
+async def _eval_js(
+ js: str,
+ *,
+ global_args: list[str] | None = None,
+ timeout: int = _DEFAULT_TIMEOUT,
+) -> str:
+ """Execute JavaScript in the browser page context via agent-browser eval."""
+ command = _resolve_command()
+ if not command:
+ raise RuntimeError(
+ "agent-browser is not available. Install with: npm i -g agent-browser"
+ )
+ argv = command + (global_args or []) + ["eval", js]
+ proc = await asyncio.create_subprocess_exec(
+ *argv,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ try:
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
+ except asyncio.TimeoutError:
+ proc.kill()
+ await proc.communicate()
+ raise RuntimeError(f"agent-browser eval timed out after {timeout}s")
+
+ out = stdout.decode(errors="replace")
+ err = stderr.decode(errors="replace")
+ if proc.returncode != 0:
+ raise RuntimeError(
+ f"agent-browser eval failed (exit {proc.returncode}): {err or out}"
+ )
+ return out.strip()
+
+
+# JavaScript injected into the page to intercept dialog calls and script execution.
+# Uses a per-session nonce so multiple inject/verify cycles don't collide.
+# __NONCE__ is replaced via str.replace (not .format()) to avoid brace escaping.
+_CANARY_SCRIPT = """
+(function(nonce) {
+ if (window.__xssCanary && window.__xssCanary.nonce === nonce) return 'already_armed';
+ window.__xssCanary = {
+ nonce: nonce,
+ alerts: [],
+ confirms: [],
+ prompts: [],
+ scriptExecutions: [],
+ armed: true,
+ };
+ var c = window.__xssCanary;
+ window.alert = function(msg) { c.alerts.push(String(msg)); };
+ window.confirm = function(msg) { c.confirms.push(String(msg)); return false; };
+ window.prompt = function(msg) { c.prompts.push(String(msg)); return null; };
+
+ // MutationObserver: detect ' "
+ "or '
'.",
+ ],
+ global_args: Annotated[
+ list[str] | None,
+ "agent-browser global args. Must match the session used in xss_inject_canary.",
+ ] = None,
+ ) -> str:
+ """Check whether your XSS payload triggered JavaScript execution.
+
+ Call AFTER injecting the canary (xss_inject_canary) and triggering
+ your payload. Returns a structured verdict: CONFIRMED, PARTIAL,
+ or NOT_DETECTED.
+ """
+ if not self._nonce:
+ raise RuntimeError(
+ "No canary injected. Call xss_inject_canary first, "
+ "then trigger your payload, then call xss_verify."
+ )
+
+ args = global_args or self._global_args
+ raw = await _eval_js(_READ_CANARY, global_args=args)
+
+ try:
+ state = json.loads(raw)
+ except (json.JSONDecodeError, ValueError):
+ raise RuntimeError(f"Could not parse canary state: {raw[:500]}")
+
+ if not state.get("armed"):
+ return (
+ "CANARY_LOST — The canary is no longer present in the page. "
+ "This usually means the page navigated away after injection. "
+ "Re-inject the canary on the page where the payload renders, "
+ "then trigger the payload again."
+ )
+
+ if state.get("nonce") != self._nonce:
+ return (
+ "NONCE_MISMATCH — A different canary session is active. "
+ "Call xss_inject_canary again to start a fresh verification."
+ )
+
+ alerts = state.get("alerts", [])
+ confirms = state.get("confirms", [])
+ prompts = state.get("prompts", [])
+ scripts = state.get("scriptExecutions", [])
+
+ dialog_count = len(alerts) + len(confirms) + len(prompts)
+ script_count = len(scripts)
+ total_signals = dialog_count + script_count
+
+ if total_signals == 0:
+ return (
+ f"NOT_DETECTED — No JavaScript execution caught after payload.\n"
+ f" Payload: {payload_used}\n"
+ f" Context: {xss_context}\n"
+ f" Dialogs triggered: 0\n"
+ f" Scripts injected: 0\n\n"
+ "Possible causes:\n"
+ " - Payload was HTML-encoded or sanitized by the application\n"
+ " - CSP blocked inline script execution\n"
+ " - Payload is in a non-executing context (attribute, comment)\n"
+ " - Page navigated away before payload rendered (re-inject canary)\n"
+ " - DOM-based XSS may need user interaction to trigger"
+ )
+
+ evidence_lines = []
+ if alerts:
+ evidence_lines.append(f" alert() called {len(alerts)}x: {alerts[:5]}")
+ if confirms:
+ evidence_lines.append(
+ f" confirm() called {len(confirms)}x: {confirms[:5]}"
+ )
+ if prompts:
+ evidence_lines.append(f" prompt() called {len(prompts)}x: {prompts[:5]}")
+ if scripts:
+ for s in scripts[:3]:
+ src = s.get("src") or "(inline)"
+ inline = s.get("inline", "")[:100]
+ evidence_lines.append(f"