Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ MCP Client can access the following tools to interact with Windows:
- `Move`: Move mouse pointer or drag (set drag=True) to coordinates.
- `Shortcut`: Press keyboard shortcuts (`Ctrl+c`, `Alt+Tab`, etc).
- `Wait`: Pause for a defined duration.
- `WaitFor`: Wait until text, an active window, an element, or a focused element appears by polling UI state inside one tool call.
- `Screenshot`: Fast screenshot-first desktop capture with cursor position, active/open windows, and an image. Skips UI tree extraction for speed and should be the default first call when you mainly need visual context. Supports `display=[0]` or `display=[0,1]` to capture specific screens. After capture, a brief orange-red glowing border is drawn over the captured area as a visual confirmation (set `WINDOWS_MCP_DISABLE_FLASH=1` to disable).
- `Snapshot`: Full desktop state capture for workflows that need interactive element ids, scrollable regions, or `use_dom=True` browser extraction. Supports `use_vision=True` for including screenshots and `display=[0]` or `display=[0,1]` for limiting all returned Snapshot information to specific screens.
- `App`: To launch an application from the start menu, resize or move the window and switch between apps.
Expand Down
232 changes: 227 additions & 5 deletions src/windows_mcp/tools/input.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
"""Input tools — Click, Type, Scroll, Move, Shortcut, Wait."""
"""Input tools — Click, Type, Scroll, Move, Shortcut, Wait, WaitFor."""

import time
from typing import Literal
from collections.abc import Callable, Iterator
from typing import Any, Literal

from mcp.types import ToolAnnotations
from windows_mcp.infrastructure import with_analytics
from fastmcp import Context


def _resolve_label(desktop, label):
WaitForCondition = Literal[
"text_exists",
"active_window",
"element_exists",
"element_enabled",
"focused_element",
]


def _resolve_label(desktop: Any, label: int) -> list[int]:
"""Resolve a UI element label to screen coordinates."""
if desktop.desktop_state is None:
raise ValueError("Desktop state is empty. Please call Snapshot first.")
Expand All @@ -18,7 +28,150 @@ def _resolve_label(desktop, label):
raise ValueError(f"Failed to find element with label {label}: {e}")


def register(mcp, *, get_desktop, get_analytics):
def _as_bool(value: bool | str) -> bool:
return value is True or (isinstance(value, str) and value.lower() == "true")


def _text_matches(value: object | None, expected: str | None) -> bool:
if expected is None:
return True
if value is None:
return False
return expected.casefold() in str(value).casefold()


def _metadata_text_matches(metadata: dict[str, object], expected: str | None) -> bool:
return any(_text_matches(value, expected) for value in metadata.values())


def _iter_nodes(desktop_state: Any) -> Iterator[Any]:
tree_state = getattr(desktop_state, "tree_state", None)
if tree_state is None:
return
yield from getattr(tree_state, "interactive_nodes", [])
yield from getattr(tree_state, "scrollable_nodes", [])


def _iter_text_sources(desktop_state: Any) -> Iterator[object]:
active_window = getattr(desktop_state, "active_window", None)
if active_window is not None:
yield active_window.name

for window in getattr(desktop_state, "windows", []):
yield window.name

tree_state = getattr(desktop_state, "tree_state", None)
if tree_state is None:
return

for node in _iter_nodes(desktop_state):
yield node.name
yield node.control_type
yield node.window_name
for value in getattr(node, "metadata", {}).values():
yield value

for node in getattr(tree_state, "dom_informative_nodes", []):
yield getattr(node, "text", "")


def _node_matches(node: Any, text: str | None, window_name: str | None) -> bool:
metadata: dict[str, object] = getattr(node, "metadata", {})
return (
_text_matches(getattr(node, "name", ""), text)
or _text_matches(getattr(node, "control_type", ""), text)
or _metadata_text_matches(metadata, text)
) and _text_matches(getattr(node, "window_name", ""), window_name)


def _matches_wait_condition(
desktop_state: Any,
condition: WaitForCondition,
text: str | None,
window_name: str | None,
) -> tuple[bool, str]:
if condition == "text_exists":
for source in _iter_text_sources(desktop_state):
if _text_matches(source, text):
return True, f"text {text!r} appeared"
return False, f"text {text!r} was absent"

if condition == "active_window":
expected = window_name or text
active_window = getattr(desktop_state, "active_window", None)
active_name = active_window.name if active_window else ""
if _text_matches(active_name, expected):
return True, f"active window matched {active_name!r}"
return False, f"active window was {active_name!r}"

if condition in {"element_exists", "element_enabled"}:
for node in _iter_nodes(desktop_state):
if _node_matches(node, text, window_name):
return True, f"element matched {getattr(node, 'name', '')!r}"
return False, "matching element was absent"

if condition == "focused_element":
for node in _iter_nodes(desktop_state):
metadata = getattr(node, "metadata", {})
if metadata.get("has_focused") and _node_matches(node, text, window_name):
return True, f"focused element matched {getattr(node, 'name', '')!r}"
return False, "matching focused element was absent"

raise ValueError(f"Unsupported WaitFor condition: {condition}")


def _validate_wait_for_args(
condition: str,
text: str | None,
window_name: str | None,
timeout: float,
interval: float,
) -> WaitForCondition:
normalized = condition.strip().lower().replace("-", "_")
aliases = {
"text": "text_exists",
"window": "active_window",
"element": "element_exists",
"enabled": "element_enabled",
"focused": "focused_element",
}
normalized = aliases.get(normalized, normalized)
valid_conditions = {
"text_exists",
"active_window",
"element_exists",
"element_enabled",
"focused_element",
}
if normalized not in valid_conditions:
raise ValueError(
"condition must be one of: text_exists, active_window, element_exists, "
"element_enabled, focused_element"
)

if timeout <= 0 or timeout > 120:
raise ValueError("timeout must be greater than 0 and at most 120 seconds")
if interval <= 0 or interval > 5:
raise ValueError("interval must be greater than 0 and at most 5 seconds")

if normalized == "text_exists" and not text:
raise ValueError("text is required when condition is text_exists")
if normalized == "active_window" and not (text or window_name):
raise ValueError("text or window_name is required when condition is active_window")
if normalized in {"element_exists", "element_enabled"} and not (text or window_name):
raise ValueError(
"text or window_name is required when condition is element_exists or element_enabled"
)

return normalized


def register(
mcp: Any,
*,
get_desktop: Callable[[], Any],
get_analytics: Callable[[], Any],
) -> None:
@mcp.tool(
name="Click",
description=(
Expand Down Expand Up @@ -122,7 +275,8 @@ def scroll_tool(
if response:
return response
return (
f"Scrolled {type} {direction} by {wheel_times} wheel times" + f" at ({loc[0]},{loc[1]})."
f"Scrolled {type} {direction} by {wheel_times} wheel times"
+ f" at ({loc[0]},{loc[1]})."
if loc
else ""
)
Expand Down Expand Up @@ -197,3 +351,71 @@ def shortcut_tool(shortcut: str, ctx: Context = None):
def wait_tool(duration: int, ctx: Context = None) -> str:
time.sleep(duration)
return f"Waited for {duration} seconds."

@mcp.tool(
name="WaitFor",
description=(
"Waits until a UI condition is satisfied, polling the Windows accessibility tree "
"inside the tool to avoid repeated Snapshot calls. Conditions: text_exists, "
"active_window, element_exists, element_enabled, focused_element. Provide text "
"and/or window_name depending on the condition. Set use_dom=True for browser DOM text."
),
annotations=ToolAnnotations(
title="WaitFor",
readOnlyHint=True,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
),
)
@with_analytics(get_analytics(), "WaitFor-Tool")
def wait_for_tool(
condition: str,
text: str | None = None,
window_name: str | None = None,
timeout: float = 10.0,
interval: float = 0.25,
use_dom: bool | str = False,
ctx: Context = None,
) -> str:
normalized = _validate_wait_for_args(
condition=condition,
text=text,
window_name=window_name,
timeout=timeout,
interval=interval,
)
desktop = get_desktop()
use_dom_bool = _as_bool(use_dom)
started_at = time.monotonic()
deadline = started_at + timeout
attempts = 0
last_detail = "condition was not evaluated"

while True:
attempts += 1
desktop_state = desktop.get_state(
use_vision=False,
use_dom=use_dom_bool,
use_ui_tree=True,
use_annotation=False,
)
matched, last_detail = _matches_wait_condition(
desktop_state=desktop_state,
condition=normalized,
text=text,
window_name=window_name,
)
if matched:
elapsed = time.monotonic() - started_at
return (
f"WaitFor condition '{normalized}' satisfied after "
f"{elapsed:.2f}s and {attempts} attempt(s): {last_detail}."
)

remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(
f"Timed out after {timeout:.2f}s waiting for '{normalized}': {last_detail}."
)
time.sleep(min(interval, remaining))
Loading