Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions capabilities/web-security/agents/web-security.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Use tools proactively when they reduce uncertainty or verify a finding. Match th
- Use `get_callback_url` and `check_callbacks` for out-of-band testing (blind SSRF, blind XSS, DNS exfiltration).
- Use `list_free_phone_numbers` and `read_phone_inbox` when signup or MFA flows require SMS verification, unless prompted by the user. Free public numbers first — fall back to `request_private_number`/`poll_private_number` (paid API, needs key via `store_credential`) only when the target blocks public numbers.
- Use `generate_rebinding_hostname` and `list_rebinding_presets` for DNS rebinding SSRF bypass when IP filters validate resolved addresses before fetching.
- Use `log_image_output`, `log_audio_output`, and `log_video_output` when another tool has already written useful PoC media to disk and you need it attached to the current Dreadnode run as typed output. Use `log_file_artifact` when you want the raw file uploaded as an artifact instead of rendered media.
- Use `bbscope_find` at the start of an engagement to check if a target is covered by any bug bounty program and retrieve scope boundaries. Use `bbscope_program` to get full in-scope/out-of-scope details for a specific program. Use `bbscope_targets` to enumerate targets by type (wildcards, domains, URLs, IPs, CIDRs) for reconnaissance. Use `bbscope_updates` to find freshly added targets that may be under-tested.

### MCP tools
Expand Down
38 changes: 33 additions & 5 deletions capabilities/web-security/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ class ToolMessage:
tool_call_id: str | None = None

class _Tool:
def __init__(self, instance: object, method: Any, metadata: dict[str, Any]) -> None:
def __init__(
self, instance: object, method: Any, metadata: dict[str, Any]
) -> None:
self._instance = instance
self._method = method
self.name = metadata["name"]
self.description = metadata["description"]
self.catch = metadata["catch"]
self.parameters_schema = _schema_for(method)

async def handle_tool_call(self, tool_call: ToolCall) -> tuple[ToolMessage, bool]:
async def handle_tool_call(
self, tool_call: ToolCall
) -> tuple[ToolMessage, bool]:
arguments = json.loads(tool_call.function.arguments or "{}")
result = await self._method(**arguments)
return ToolMessage(content=str(result), tool_call_id=tool_call.id), False

def _schema_for(method: Any) -> dict[str, Any]:
signature = inspect.signature(method)
properties = {
name: {"type": "string"}
for name in signature.parameters
if name != "self"
name: {"type": "string"} for name in signature.parameters if name != "self"
}
return {"type": "object", "properties": properties}

Expand Down Expand Up @@ -85,6 +87,32 @@ def get_tools(self):
agents.tools = tools
dreadnode.agents = agents

class _Media:
def __init__(self, data: object, caption: str | None = None, **_: Any) -> None:
self.data = data
self.caption = caption

class Image(_Media):
pass

class Audio(_Media):
pass

class Video(_Media):
pass

def log_output(name: str, value: object, **_: Any) -> None:
return None

def log_artifact(local_uri: object, **_: Any) -> None:
return None

dreadnode.Image = Image
dreadnode.Audio = Audio
dreadnode.Video = Video
dreadnode.log_output = log_output
dreadnode.log_artifact = log_artifact

sys.modules["dreadnode"] = dreadnode
sys.modules["dreadnode.agents"] = agents
sys.modules["dreadnode.agents.tools"] = tools
Expand Down
191 changes: 191 additions & 0 deletions capabilities/web-security/tests/test_media_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""Tests for multimedia logging tools."""

from __future__ import annotations

from pathlib import Path
import importlib.util

import pytest


MODULE_PATH = Path(__file__).resolve().parent.parent / "tools" / "media_logging.py"
SPEC = importlib.util.spec_from_file_location("media_logging", MODULE_PATH)
assert SPEC and SPEC.loader
MODULE = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(MODULE)

MediaLogging = MODULE.MediaLogging


@pytest.fixture
def toolset() -> MediaLogging:
return MediaLogging()


@pytest.fixture
def media_files(tmp_path: Path) -> dict[str, Path]:
image_path = tmp_path / "sample.png"
image_path.write_bytes(b"png-bytes")

audio_path = tmp_path / "sample.wav"
audio_path.write_bytes(b"wav-bytes")

video_path = tmp_path / "sample.mp4"
video_path.write_bytes(b"mp4-bytes")

artifact_path = tmp_path / "notes.txt"
artifact_path.write_text("artifact", encoding="utf-8")

return {
"image": image_path,
"audio": audio_path,
"video": video_path,
"artifact": artifact_path,
}


class TestToolDiscovery:
def test_tools_discovered(self, toolset: MediaLogging) -> None:
names = {tool.name for tool in toolset.get_tools()}
assert names == {
"log_image_output",
"log_audio_output",
"log_video_output",
"log_file_artifact",
}


class TestMediaLogging:
@pytest.mark.asyncio
async def test_log_image_output(
self,
toolset: MediaLogging,
media_files: dict[str, Path],
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}

def fake_log_output(name: str, value: object, **_: object) -> None:
captured["name"] = name
captured["value"] = value

monkeypatch.setattr(MODULE.dn, "log_output", fake_log_output)

result = await toolset.log_image_output(
"screenshot/home", str(media_files["image"]), caption="Home"
)
assert result == {
"kind": "image",
"path": str(media_files["image"]),
"name": "screenshot/home",
"caption": "Home",
}
assert captured["name"] == "screenshot/home"
assert isinstance(captured["value"], MODULE.dn.Image)
assert captured["value"].data == media_files["image"]

@pytest.mark.asyncio
async def test_log_audio_output(
self,
toolset: MediaLogging,
media_files: dict[str, Path],
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}

def fake_log_output(name: str, value: object, **_: object) -> None:
captured["name"] = name
captured["value"] = value

monkeypatch.setattr(MODULE.dn, "log_output", fake_log_output)

result = await toolset.log_audio_output(
"audio/sample", str(media_files["audio"]), caption="Sample"
)
assert result == {
"kind": "audio",
"path": str(media_files["audio"]),
"name": "audio/sample",
"caption": "Sample",
}
assert captured["name"] == "audio/sample"
assert isinstance(captured["value"], MODULE.dn.Audio)
assert captured["value"].data == media_files["audio"]

@pytest.mark.asyncio
async def test_log_video_output(
self,
toolset: MediaLogging,
media_files: dict[str, Path],
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}

def fake_log_output(name: str, value: object, **_: object) -> None:
captured["name"] = name
captured["value"] = value

monkeypatch.setattr(MODULE.dn, "log_output", fake_log_output)

result = await toolset.log_video_output(
"video/demo", str(media_files["video"]), caption="Demo"
)
assert result == {
"kind": "video",
"path": str(media_files["video"]),
"name": "video/demo",
"caption": "Demo",
}
assert captured["name"] == "video/demo"
assert isinstance(captured["value"], MODULE.dn.Video)
assert captured["value"].data == media_files["video"]

@pytest.mark.asyncio
async def test_log_file_artifact(
self,
toolset: MediaLogging,
media_files: dict[str, Path],
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}

def fake_log_artifact(local_uri: object, **kwargs: object) -> None:
captured["path"] = local_uri
captured["name"] = kwargs.get("name")

monkeypatch.setattr(MODULE.dn, "log_artifact", fake_log_artifact)

result = await toolset.log_file_artifact(
str(media_files["artifact"]), name="notes.txt"
)
assert result == {
"kind": "artifact",
"path": str(media_files["artifact"]),
"name": "notes.txt",
}
assert captured["path"] == media_files["artifact"]
assert captured["name"] == "notes.txt"

@pytest.mark.asyncio
async def test_missing_file_raises(self, toolset: MediaLogging) -> None:
with pytest.raises(FileNotFoundError):
await toolset.log_image_output("missing", "/tmp/nope.png")

with pytest.raises(FileNotFoundError):
await toolset.log_audio_output("missing", "/tmp/nope.wav")

with pytest.raises(FileNotFoundError):
await toolset.log_video_output("missing", "/tmp/nope.mp4")

with pytest.raises(FileNotFoundError):
await toolset.log_file_artifact("/tmp/nope.bin")

@pytest.mark.asyncio
async def test_directory_path_raises(
self, toolset: MediaLogging, tmp_path: Path
) -> None:
directory = tmp_path / "dir"
directory.mkdir()

with pytest.raises(ValueError):
await toolset.log_file_artifact(str(directory))
94 changes: 94 additions & 0 deletions capabilities/web-security/tools/media_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Typed multimedia logging helpers for Dreadnode runs.

These tools mirror the old v1 ``web-agent`` screenshot-ingest idea while using
the current SDK primitives directly. They accept existing local files and log
them either as typed outputs (`Image`, `Audio`, `Video`) or as uploaded
artifacts.
"""

from __future__ import annotations

from pathlib import Path
from typing import Annotated, Any

import dreadnode as dn
from dreadnode.agents.tools import Toolset, tool_method


def _existing_file(path: str) -> Path:
file_path = Path(path)
if not file_path.exists():
raise FileNotFoundError(f"File does not exist: {file_path}")
if not file_path.is_file():
raise ValueError(f"Path is not a file: {file_path}")
return file_path


def _result(
kind: str, path: Path, name: str | None = None, caption: str | None = None
) -> dict[str, Any]:
result: dict[str, Any] = {"kind": kind, "path": str(path)}
if name:
result["name"] = name
if caption:
result["caption"] = caption
return result


class MediaLogging(Toolset):
"""Log images, audio, video, and arbitrary files to the current Dreadnode run."""

@tool_method(name="log_image_output", catch=True)
async def log_image_output(
self,
name: Annotated[str, "Output name to log under the current task or run."],
path: Annotated[str, "Path to an existing local image file."],
caption: Annotated[
str | None, "Optional caption shown with the image output."
] = None,
) -> dict[str, Any]:
"""Log an existing image file as a typed Dreadnode output."""
file_path = _existing_file(path)
dn.log_output(name, dn.Image(file_path, caption=caption))
return _result("image", file_path, name=name, caption=caption)

@tool_method(name="log_audio_output", catch=True)
async def log_audio_output(
self,
name: Annotated[str, "Output name to log under the current task or run."],
path: Annotated[str, "Path to an existing local audio file."],
caption: Annotated[
str | None, "Optional caption shown with the audio output."
] = None,
) -> dict[str, Any]:
"""Log an existing audio file as a typed Dreadnode output."""
file_path = _existing_file(path)
dn.log_output(name, dn.Audio(file_path, caption=caption))
return _result("audio", file_path, name=name, caption=caption)

@tool_method(name="log_video_output", catch=True)
async def log_video_output(
self,
name: Annotated[str, "Output name to log under the current task or run."],
path: Annotated[str, "Path to an existing local video file."],
caption: Annotated[
str | None, "Optional caption shown with the video output."
] = None,
) -> dict[str, Any]:
"""Log an existing video file as a typed Dreadnode output."""
file_path = _existing_file(path)
dn.log_output(name, dn.Video(file_path, caption=caption))
return _result("video", file_path, name=name, caption=caption)

@tool_method(name="log_file_artifact", catch=True)
async def log_file_artifact(
self,
path: Annotated[
str, "Path to an existing local file to upload as an artifact."
],
name: Annotated[str | None, "Optional artifact name override."] = None,
) -> dict[str, Any]:
"""Upload an existing local file as a Dreadnode artifact."""
file_path = _existing_file(path)
dn.log_artifact(file_path, name=name)
return _result("artifact", file_path, name=name)
Loading