diff --git a/capabilities/web-security/agents/web-security.md b/capabilities/web-security/agents/web-security.md index d1f628c..8c9f624 100644 --- a/capabilities/web-security/agents/web-security.md +++ b/capabilities/web-security/agents/web-security.md @@ -86,6 +86,7 @@ Use tools proactively when they reduce uncertainty or verify a finding. Match th - Use `get_callback_url` and `check_callbacks` for out-of-band testing (blind SSRF, blind XSS, DNS exfiltration). - Use `list_free_phone_numbers` and `read_phone_inbox` when signup or MFA flows require SMS verification, unless prompted by the user. Free public numbers first — fall back to `request_private_number`/`poll_private_number` (paid API, needs key via `store_credential`) only when the target blocks public numbers. - Use `generate_rebinding_hostname` and `list_rebinding_presets` for DNS rebinding SSRF bypass when IP filters validate resolved addresses before fetching. +- Use `log_image_output`, `log_audio_output`, and `log_video_output` when another tool has already written useful PoC media to disk and you need it attached to the current Dreadnode run as typed output. Use `log_file_artifact` when you want the raw file uploaded as an artifact instead of rendered media. - Use `bbscope_find` at the start of an engagement to check if a target is covered by any bug bounty program and retrieve scope boundaries. Use `bbscope_program` to get full in-scope/out-of-scope details for a specific program. Use `bbscope_targets` to enumerate targets by type (wildcards, domains, URLs, IPs, CIDRs) for reconnaissance. Use `bbscope_updates` to find freshly added targets that may be under-tested. ### MCP tools diff --git a/capabilities/web-security/tests/conftest.py b/capabilities/web-security/tests/conftest.py index cdda035..0aeefb4 100644 --- a/capabilities/web-security/tests/conftest.py +++ b/capabilities/web-security/tests/conftest.py @@ -34,7 +34,9 @@ class ToolMessage: tool_call_id: str | None = None class _Tool: - def __init__(self, instance: object, method: Any, metadata: dict[str, Any]) -> None: + def __init__( + self, instance: object, method: Any, metadata: dict[str, Any] + ) -> None: self._instance = instance self._method = method self.name = metadata["name"] @@ -42,7 +44,9 @@ def __init__(self, instance: object, method: Any, metadata: dict[str, Any]) -> N self.catch = metadata["catch"] self.parameters_schema = _schema_for(method) - async def handle_tool_call(self, tool_call: ToolCall) -> tuple[ToolMessage, bool]: + async def handle_tool_call( + self, tool_call: ToolCall + ) -> tuple[ToolMessage, bool]: arguments = json.loads(tool_call.function.arguments or "{}") result = await self._method(**arguments) return ToolMessage(content=str(result), tool_call_id=tool_call.id), False @@ -50,9 +54,7 @@ async def handle_tool_call(self, tool_call: ToolCall) -> tuple[ToolMessage, bool def _schema_for(method: Any) -> dict[str, Any]: signature = inspect.signature(method) properties = { - name: {"type": "string"} - for name in signature.parameters - if name != "self" + name: {"type": "string"} for name in signature.parameters if name != "self" } return {"type": "object", "properties": properties} @@ -85,6 +87,32 @@ def get_tools(self): agents.tools = tools dreadnode.agents = agents + class _Media: + def __init__(self, data: object, caption: str | None = None, **_: Any) -> None: + self.data = data + self.caption = caption + + class Image(_Media): + pass + + class Audio(_Media): + pass + + class Video(_Media): + pass + + def log_output(name: str, value: object, **_: Any) -> None: + return None + + def log_artifact(local_uri: object, **_: Any) -> None: + return None + + dreadnode.Image = Image + dreadnode.Audio = Audio + dreadnode.Video = Video + dreadnode.log_output = log_output + dreadnode.log_artifact = log_artifact + sys.modules["dreadnode"] = dreadnode sys.modules["dreadnode.agents"] = agents sys.modules["dreadnode.agents.tools"] = tools diff --git a/capabilities/web-security/tests/test_media_logging.py b/capabilities/web-security/tests/test_media_logging.py new file mode 100644 index 0000000..fbd1786 --- /dev/null +++ b/capabilities/web-security/tests/test_media_logging.py @@ -0,0 +1,191 @@ +"""Tests for multimedia logging tools.""" + +from __future__ import annotations + +from pathlib import Path +import importlib.util + +import pytest + + +MODULE_PATH = Path(__file__).resolve().parent.parent / "tools" / "media_logging.py" +SPEC = importlib.util.spec_from_file_location("media_logging", MODULE_PATH) +assert SPEC and SPEC.loader +MODULE = importlib.util.module_from_spec(SPEC) +SPEC.loader.exec_module(MODULE) + +MediaLogging = MODULE.MediaLogging + + +@pytest.fixture +def toolset() -> MediaLogging: + return MediaLogging() + + +@pytest.fixture +def media_files(tmp_path: Path) -> dict[str, Path]: + image_path = tmp_path / "sample.png" + image_path.write_bytes(b"png-bytes") + + audio_path = tmp_path / "sample.wav" + audio_path.write_bytes(b"wav-bytes") + + video_path = tmp_path / "sample.mp4" + video_path.write_bytes(b"mp4-bytes") + + artifact_path = tmp_path / "notes.txt" + artifact_path.write_text("artifact", encoding="utf-8") + + return { + "image": image_path, + "audio": audio_path, + "video": video_path, + "artifact": artifact_path, + } + + +class TestToolDiscovery: + def test_tools_discovered(self, toolset: MediaLogging) -> None: + names = {tool.name for tool in toolset.get_tools()} + assert names == { + "log_image_output", + "log_audio_output", + "log_video_output", + "log_file_artifact", + } + + +class TestMediaLogging: + @pytest.mark.asyncio + async def test_log_image_output( + self, + toolset: MediaLogging, + media_files: dict[str, Path], + monkeypatch: pytest.MonkeyPatch, + ) -> None: + captured: dict[str, object] = {} + + def fake_log_output(name: str, value: object, **_: object) -> None: + captured["name"] = name + captured["value"] = value + + monkeypatch.setattr(MODULE.dn, "log_output", fake_log_output) + + result = await toolset.log_image_output( + "screenshot/home", str(media_files["image"]), caption="Home" + ) + assert result == { + "kind": "image", + "path": str(media_files["image"]), + "name": "screenshot/home", + "caption": "Home", + } + assert captured["name"] == "screenshot/home" + assert isinstance(captured["value"], MODULE.dn.Image) + assert captured["value"].data == media_files["image"] + + @pytest.mark.asyncio + async def test_log_audio_output( + self, + toolset: MediaLogging, + media_files: dict[str, Path], + monkeypatch: pytest.MonkeyPatch, + ) -> None: + captured: dict[str, object] = {} + + def fake_log_output(name: str, value: object, **_: object) -> None: + captured["name"] = name + captured["value"] = value + + monkeypatch.setattr(MODULE.dn, "log_output", fake_log_output) + + result = await toolset.log_audio_output( + "audio/sample", str(media_files["audio"]), caption="Sample" + ) + assert result == { + "kind": "audio", + "path": str(media_files["audio"]), + "name": "audio/sample", + "caption": "Sample", + } + assert captured["name"] == "audio/sample" + assert isinstance(captured["value"], MODULE.dn.Audio) + assert captured["value"].data == media_files["audio"] + + @pytest.mark.asyncio + async def test_log_video_output( + self, + toolset: MediaLogging, + media_files: dict[str, Path], + monkeypatch: pytest.MonkeyPatch, + ) -> None: + captured: dict[str, object] = {} + + def fake_log_output(name: str, value: object, **_: object) -> None: + captured["name"] = name + captured["value"] = value + + monkeypatch.setattr(MODULE.dn, "log_output", fake_log_output) + + result = await toolset.log_video_output( + "video/demo", str(media_files["video"]), caption="Demo" + ) + assert result == { + "kind": "video", + "path": str(media_files["video"]), + "name": "video/demo", + "caption": "Demo", + } + assert captured["name"] == "video/demo" + assert isinstance(captured["value"], MODULE.dn.Video) + assert captured["value"].data == media_files["video"] + + @pytest.mark.asyncio + async def test_log_file_artifact( + self, + toolset: MediaLogging, + media_files: dict[str, Path], + monkeypatch: pytest.MonkeyPatch, + ) -> None: + captured: dict[str, object] = {} + + def fake_log_artifact(local_uri: object, **kwargs: object) -> None: + captured["path"] = local_uri + captured["name"] = kwargs.get("name") + + monkeypatch.setattr(MODULE.dn, "log_artifact", fake_log_artifact) + + result = await toolset.log_file_artifact( + str(media_files["artifact"]), name="notes.txt" + ) + assert result == { + "kind": "artifact", + "path": str(media_files["artifact"]), + "name": "notes.txt", + } + assert captured["path"] == media_files["artifact"] + assert captured["name"] == "notes.txt" + + @pytest.mark.asyncio + async def test_missing_file_raises(self, toolset: MediaLogging) -> None: + with pytest.raises(FileNotFoundError): + await toolset.log_image_output("missing", "/tmp/nope.png") + + with pytest.raises(FileNotFoundError): + await toolset.log_audio_output("missing", "/tmp/nope.wav") + + with pytest.raises(FileNotFoundError): + await toolset.log_video_output("missing", "/tmp/nope.mp4") + + with pytest.raises(FileNotFoundError): + await toolset.log_file_artifact("/tmp/nope.bin") + + @pytest.mark.asyncio + async def test_directory_path_raises( + self, toolset: MediaLogging, tmp_path: Path + ) -> None: + directory = tmp_path / "dir" + directory.mkdir() + + with pytest.raises(ValueError): + await toolset.log_file_artifact(str(directory)) diff --git a/capabilities/web-security/tools/media_logging.py b/capabilities/web-security/tools/media_logging.py new file mode 100644 index 0000000..78fe1e1 --- /dev/null +++ b/capabilities/web-security/tools/media_logging.py @@ -0,0 +1,94 @@ +"""Typed multimedia logging helpers for Dreadnode runs. + +These tools mirror the old v1 ``web-agent`` screenshot-ingest idea while using +the current SDK primitives directly. They accept existing local files and log +them either as typed outputs (`Image`, `Audio`, `Video`) or as uploaded +artifacts. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Annotated, Any + +import dreadnode as dn +from dreadnode.agents.tools import Toolset, tool_method + + +def _existing_file(path: str) -> Path: + file_path = Path(path) + if not file_path.exists(): + raise FileNotFoundError(f"File does not exist: {file_path}") + if not file_path.is_file(): + raise ValueError(f"Path is not a file: {file_path}") + return file_path + + +def _result( + kind: str, path: Path, name: str | None = None, caption: str | None = None +) -> dict[str, Any]: + result: dict[str, Any] = {"kind": kind, "path": str(path)} + if name: + result["name"] = name + if caption: + result["caption"] = caption + return result + + +class MediaLogging(Toolset): + """Log images, audio, video, and arbitrary files to the current Dreadnode run.""" + + @tool_method(name="log_image_output", catch=True) + async def log_image_output( + self, + name: Annotated[str, "Output name to log under the current task or run."], + path: Annotated[str, "Path to an existing local image file."], + caption: Annotated[ + str | None, "Optional caption shown with the image output." + ] = None, + ) -> dict[str, Any]: + """Log an existing image file as a typed Dreadnode output.""" + file_path = _existing_file(path) + dn.log_output(name, dn.Image(file_path, caption=caption)) + return _result("image", file_path, name=name, caption=caption) + + @tool_method(name="log_audio_output", catch=True) + async def log_audio_output( + self, + name: Annotated[str, "Output name to log under the current task or run."], + path: Annotated[str, "Path to an existing local audio file."], + caption: Annotated[ + str | None, "Optional caption shown with the audio output." + ] = None, + ) -> dict[str, Any]: + """Log an existing audio file as a typed Dreadnode output.""" + file_path = _existing_file(path) + dn.log_output(name, dn.Audio(file_path, caption=caption)) + return _result("audio", file_path, name=name, caption=caption) + + @tool_method(name="log_video_output", catch=True) + async def log_video_output( + self, + name: Annotated[str, "Output name to log under the current task or run."], + path: Annotated[str, "Path to an existing local video file."], + caption: Annotated[ + str | None, "Optional caption shown with the video output." + ] = None, + ) -> dict[str, Any]: + """Log an existing video file as a typed Dreadnode output.""" + file_path = _existing_file(path) + dn.log_output(name, dn.Video(file_path, caption=caption)) + return _result("video", file_path, name=name, caption=caption) + + @tool_method(name="log_file_artifact", catch=True) + async def log_file_artifact( + self, + path: Annotated[ + str, "Path to an existing local file to upload as an artifact." + ], + name: Annotated[str | None, "Optional artifact name override."] = None, + ) -> dict[str, Any]: + """Upload an existing local file as a Dreadnode artifact.""" + file_path = _existing_file(path) + dn.log_artifact(file_path, name=name) + return _result("artifact", file_path, name=name)