diff --git a/src/oagi/__init__.py b/src/oagi/__init__.py index e864bda..13eea94 100644 --- a/src/oagi/__init__.py +++ b/src/oagi/__init__.py @@ -38,6 +38,10 @@ # Format: name -> (module_path, package_to_check, extra_name) # package_to_check is None if no optional dependency is required _LAZY_IMPORTS_DATA: dict[str, tuple[str, str | None, str | None]] = { + # Action converters (no optional dependencies) + "OagiActionConverter": ("oagi.converters.oagi", None, None), + "ConverterConfig": ("oagi.converters.base", None, None), + "BaseActionConverter": ("oagi.converters.base", None, None), # Desktop handlers (require pyautogui/PIL) "AsyncPyautoguiActionHandler": ( "oagi.handler.async_pyautogui_action_handler", @@ -88,6 +92,8 @@ from oagi.agent.default import AsyncDefaultAgent from oagi.agent.observer.agent_observer import AsyncAgentObserver from oagi.agent.tasker import TaskerAgent + from oagi.converters.base import BaseActionConverter, ConverterConfig + from oagi.converters.oagi import OagiActionConverter from oagi.handler.async_pyautogui_action_handler import AsyncPyautoguiActionHandler from oagi.handler.async_screenshot_maker import AsyncScreenshotMaker from oagi.handler.async_ydotool_action_handler import AsyncYdotoolActionHandler @@ -174,4 +180,8 @@ def __dir__() -> list[str]: "YdotoolConfig", # Lazy imports - Screen manager "ScreenManager", + # Lazy imports - Action converters + "OagiActionConverter", + "ConverterConfig", + "BaseActionConverter", ] diff --git a/src/oagi/converters/__init__.py b/src/oagi/converters/__init__.py new file mode 100644 index 0000000..00d8d99 --- /dev/null +++ b/src/oagi/converters/__init__.py @@ -0,0 +1,56 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) OpenAGI Foundation +# All rights reserved. +# +# This file is part of the official API project. +# Licensed under the MIT License. +# ----------------------------------------------------------------------------- +"""Action converters for VLM support. + +This module provides the base class and OAGI implementation for action converters. +Third parties can inherit from BaseActionConverter to create custom converters. + +Example usage: + from oagi.converters import OagiActionConverter, ConverterConfig + + # Configure for 1920x1080 sandbox + config = ConverterConfig(sandbox_width=1920, sandbox_height=1080) + converter = OagiActionConverter(config=config) + + # Convert OAGI actions to pyautogui strings + result = converter(actions) # list[str] + + # Convert to runtime API steps + for cmd in result: + step = converter.action_string_to_step(cmd) + # Execute step via runtime API... + +Creating custom converters: + from oagi.converters import BaseActionConverter, ConverterConfig + + class MyActionConverter(BaseActionConverter[MyAction]): + @property + def coord_width(self) -> int: + return 1000 # Your model's coordinate width + + @property + def coord_height(self) -> int: + return 1000 # Your model's coordinate height + + def _convert_single_action(self, action: MyAction) -> list[str]: + # Convert action to pyautogui command strings + ... + + def serialize_actions(self, actions: list[MyAction]) -> list[dict]: + # Serialize actions for trajectory logging + ... +""" + +from .base import BaseActionConverter, ConverterConfig +from .oagi import OagiActionConverter + +__all__ = [ + "BaseActionConverter", + "ConverterConfig", + "OagiActionConverter", +] diff --git a/src/oagi/converters/base.py b/src/oagi/converters/base.py new file mode 100644 index 0000000..be77942 --- /dev/null +++ b/src/oagi/converters/base.py @@ -0,0 +1,292 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) OpenAGI Foundation +# All rights reserved. +# +# This file is part of the official API project. +# Licensed under the MIT License. +# ----------------------------------------------------------------------------- +"""Base class for action converters. + +This module provides the abstract base class for converting model-specific +actions to pyautogui command strings for remote execution. +""" + +import re +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Generic, TypeVar + +from ..handler.capslock_manager import CapsLockManager +from ..handler.utils import ( + CoordinateScaler, + normalize_key, + parse_hotkey, + validate_keys, +) + +T = TypeVar("T") + + +@dataclass +class ConverterConfig: + """Configuration for action converters. + + Matches the configuration options in PyautoguiConfig for consistency. + """ + + sandbox_width: int = 1920 + sandbox_height: int = 1080 + drag_duration: float = 0.5 + scroll_amount: int = 2 + wait_duration: float = 1.0 + hotkey_interval: float = 0.1 + capslock_mode: str = "session" + strict_coordinate_validation: bool = False + """If True, raise ValueError when coordinates are outside valid range. + If False (default), clamp coordinates to valid range (original behavior).""" + + +class BaseActionConverter(ABC, Generic[T]): + """Abstract base class for action converters. + + Subclasses must implement: + - coord_width/coord_height properties for input coordinate space + - _convert_single_action() for model-specific conversion logic + - serialize_actions() for trajectory logging + + Provides common functionality: + - Coordinate scaling via CoordinateScaler + - Key normalization via shared utils + - __call__ interface returning list of action strings + - action_string_to_step() for runtime API format + """ + + def __init__( + self, + *, + config: ConverterConfig | None = None, + logger: Any | None = None, + ): + """Initialize the converter. + + Args: + config: Converter configuration. Uses defaults if not provided. + logger: Optional logger instance for debug/error logging. + """ + self.config = config or ConverterConfig() + self.logger = logger + + # Initialize coordinate scaler + self._coord_scaler = CoordinateScaler( + source_width=self.coord_width, + source_height=self.coord_height, + target_width=self.config.sandbox_width, + target_height=self.config.sandbox_height, + ) + + # Initialize caps lock manager + self.caps_manager = CapsLockManager(mode=self.config.capslock_mode) + + # Track last cursor position (for actions without explicit coordinates) + self._last_x: int | None = None + self._last_y: int | None = None + + @property + @abstractmethod + def coord_width(self) -> int: + """Input coordinate space width (e.g., 1024 for XGA, 1000 for OAGI).""" + ... + + @property + @abstractmethod + def coord_height(self) -> int: + """Input coordinate space height (e.g., 768 for XGA, 1000 for OAGI).""" + ... + + @property + def scale_x(self) -> float: + """X scaling factor from input to sandbox coordinates.""" + return self._coord_scaler.scale_x + + @property + def scale_y(self) -> float: + """Y scaling factor from input to sandbox coordinates.""" + return self._coord_scaler.scale_y + + def scale_coordinate(self, x: int | float, y: int | float) -> tuple[int, int]: + """Scale coordinates from model space to sandbox space. + + Args: + x: X coordinate in model space + y: Y coordinate in model space + + Returns: + Tuple of (scaled_x, scaled_y) in sandbox space + """ + return self._coord_scaler.scale(x, y) + + def normalize_key(self, key: str) -> str: + """Normalize a key name to pyautogui format. + + Args: + key: Key name to normalize + + Returns: + Normalized key name + """ + return normalize_key(key) + + def parse_hotkey(self, hotkey_str: str, *, validate: bool = True) -> list[str]: + """Parse a hotkey string into a list of normalized key names. + + Args: + hotkey_str: Hotkey string (e.g., "ctrl+c") + validate: If True, validate keys against PYAUTOGUI_VALID_KEYS + + Returns: + List of normalized key names + """ + return parse_hotkey(hotkey_str, validate=validate) + + def validate_keys(self, keys: list[str]) -> None: + """Validate that all keys are recognized by pyautogui. + + Args: + keys: List of key names to validate + + Raises: + ValueError: If any key is invalid + """ + validate_keys(keys) + + def _get_last_or_center(self) -> tuple[int, int]: + """Get last cursor position or screen center as fallback. + + Returns: + Tuple of (x, y) coordinates + """ + if self._last_x is not None and self._last_y is not None: + return self._last_x, self._last_y + return self.config.sandbox_width // 2, self.config.sandbox_height // 2 + + def _log_error(self, message: str) -> None: + """Log an error message if logger is available.""" + if self.logger: + self.logger.error(message) + + def _log_info(self, message: str) -> None: + """Log an info message if logger is available.""" + if self.logger: + self.logger.info(message) + + def _log_debug(self, message: str) -> None: + """Log a debug message if logger is available.""" + if self.logger: + self.logger.debug(message) + + def __call__(self, actions: list[T]) -> list[str]: + """Convert actions to list of pyautogui command strings. + + Args: + actions: List of model-specific action objects + + Returns: + List of pyautogui command strings + + Raises: + RuntimeError: If all action conversions failed + """ + converted: list[str] = [] + failed: list[tuple[str, str]] = [] + skipped: list[str] = [] + + if not actions: + return converted + + for action in actions: + try: + action_strings = self._convert_single_action(action) + + if not action_strings: + # No-op action (e.g., screenshot, cursor_position) + action_type = getattr(action, "action_type", repr(action)) + skipped.append(str(action_type)) + continue + + converted.extend(action_strings) + + except Exception as e: + action_repr = repr(action) + self._log_error(f"Failed to convert action: {action_repr}, error: {e}") + failed.append((action_repr, str(e))) + + if skipped: + self._log_debug(f"Skipped no-op actions: {skipped}") + + if not converted and actions and failed: + raise RuntimeError( + f"All action conversions failed ({len(failed)}/{len(actions)}): {failed}" + ) + + return converted + + @abstractmethod + def _convert_single_action(self, action: T) -> list[str]: + """Convert a single action to pyautogui command string(s). + + Args: + action: Model-specific action object + + Returns: + List of pyautogui command strings (may be empty for no-op actions) + + Raises: + ValueError: If action format is invalid + """ + ... + + @abstractmethod + def serialize_actions(self, actions: list[T]) -> list[dict[str, Any]]: + """Serialize actions for trajectory logging. + + Args: + actions: List of model-specific action objects + + Returns: + List of serialized action dictionaries + """ + ... + + def action_string_to_step(self, action: str) -> dict[str, Any]: + """Convert an action string into a step for runtime/do API. + + Args: + action: Action string (e.g., "pyautogui.click(x=100, y=200)") + + Returns: + Step dict for runtime API + """ + action_str = str(action).strip() + + # Special markers + upper = action_str.upper() + if upper in ["DONE", "FAIL"]: + return {"type": "sleep", "parameters": {"seconds": 0}} + + # WAIT(seconds) + wait_match = re.match( + r"^WAIT\((?P[0-9]*\.?[0-9]+)\)$", action_str, re.IGNORECASE + ) + if wait_match: + seconds = float(wait_match.group("sec")) + return {"type": "sleep", "parameters": {"seconds": seconds}} + + # pyautogui code path + if "pyautogui" in action_str.lower(): + return { + "type": "pyautogui", + "parameters": {"code": action_str}, + } + + # Default: shell command + return {"type": "execute", "parameters": {"command": action_str, "shell": True}} diff --git a/src/oagi/converters/oagi.py b/src/oagi/converters/oagi.py new file mode 100644 index 0000000..de428a0 --- /dev/null +++ b/src/oagi/converters/oagi.py @@ -0,0 +1,194 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) OpenAGI Foundation +# All rights reserved. +# +# This file is part of the official API project. +# Licensed under the MIT License. +# ----------------------------------------------------------------------------- +"""OAGI action converter. + +This module provides the OagiActionConverter for converting OAGI actions +to pyautogui command strings for remote execution. +""" + +from typing import Any + +from ..handler.utils import ( + parse_click_coords, + parse_drag_coords, + parse_scroll_coords, +) +from ..types import Action, ActionType +from .base import BaseActionConverter + +# OAGI uses normalized 0-1000 coordinate space +OAGI_COORD_SIZE = 1000 + + +class OagiActionConverter(BaseActionConverter[Action]): + """Convert OAGI actions to pyautogui command strings. + + This converter handles: + 1. Coordinate scaling from 0-1000 space to sandbox dimensions (1920x1080) + 2. Action format conversion from OAGI Action format to pyautogui strings + 3. Key name normalization for hotkey combinations + + The output can be converted to runtime API steps via action_string_to_step(). + """ + + @property + def coord_width(self) -> int: + return OAGI_COORD_SIZE + + @property + def coord_height(self) -> int: + return OAGI_COORD_SIZE + + def __call__(self, actions: list[Action]) -> list[str]: + """Convert OAGI actions to list of pyautogui command strings. + + Extends base implementation to handle action count and finish detection. + """ + converted: list[str] = [] + failed: list[tuple[str, str]] = [] + has_finish = False + + if not actions: + return converted + + for action in actions: + # Check for duplicate finish() during iteration + is_finish = action.type == ActionType.FINISH + if is_finish: + if has_finish: + raise ValueError( + "Duplicate finish() detected. " + "Only one finish() is allowed per action sequence." + ) + has_finish = True + + try: + converted.extend(self._convert_action(action)) + except Exception as e: + action_repr = f"{action.type.value}({action.argument})" + self._log_error(f"Failed to convert action: {action_repr}, error: {e}") + failed.append((action_repr, str(e))) + + if not converted and actions and failed: + raise RuntimeError( + f"All action conversions failed ({len(failed)}/{len(actions)}): {failed}" + ) + return converted + + def _convert_action(self, action: Action) -> list[str]: + """Convert action to list of pyautogui command strings. + + Handles action.count for repeat support. + """ + count = action.count or 1 + single_actions = self._convert_single_action(action) + + # Repeat the actions count times + return single_actions * int(count) + + def _convert_single_action(self, action: Action) -> list[str]: + """Convert a single OAGI action to pyautogui command string(s).""" + action_type = action.type.value + argument = (action.argument or "").strip("()") + + drag_duration = self.config.drag_duration + scroll_amount = self.config.scroll_amount + wait_duration = self.config.wait_duration + hotkey_interval = self.config.hotkey_interval + strict = self.config.strict_coordinate_validation + + if action_type == ActionType.CLICK.value: + x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) + return [f"pyautogui.click(x={x}, y={y})"] + + if action_type == ActionType.LEFT_DOUBLE.value: + x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) + return [f"pyautogui.doubleClick(x={x}, y={y})"] + + if action_type == ActionType.LEFT_TRIPLE.value: + x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) + return [f"pyautogui.tripleClick(x={x}, y={y})"] + + if action_type == ActionType.RIGHT_SINGLE.value: + x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) + return [f"pyautogui.rightClick(x={x}, y={y})"] + + if action_type == ActionType.DRAG.value: + sx, sy, ex, ey = parse_drag_coords( + argument, self._coord_scaler, strict=strict + ) + return [ + f"pyautogui.moveTo({sx}, {sy})", + f"pyautogui.dragTo({ex}, {ey}, duration={drag_duration})", + ] + + if action_type == ActionType.HOTKEY.value: + keys = self.parse_hotkey(argument, validate=True) + valid_keys = [k for k in keys if k] + if not valid_keys: + raise ValueError( + f"Invalid hotkey format: '{argument}'. " + "Expected key names like 'ctrl+c', 'alt+tab'" + ) + # Check if this is a caps lock key press + if len(valid_keys) == 1 and valid_keys[0] == "capslock": + if self.caps_manager.should_use_system_capslock(): + return [f"pyautogui.hotkey('capslock', interval={hotkey_interval})"] + else: + self.caps_manager.toggle() + return [] # No pyautogui command for session mode + else: + keys_str = ", ".join(repr(k) for k in valid_keys) + return [f"pyautogui.hotkey({keys_str}, interval={hotkey_interval})"] + + if action_type == ActionType.TYPE.value: + text = argument.strip("\"'") + text = self.caps_manager.transform_text(text) + return [f"pyautogui.typewrite({text!r})"] + + if action_type == ActionType.SCROLL.value: + x, y, direction = parse_scroll_coords( + argument, self._coord_scaler, strict=strict + ) + amount = scroll_amount if direction == "up" else -scroll_amount + return [f"pyautogui.moveTo({x}, {y})", f"pyautogui.scroll({amount})"] + + if action_type == ActionType.WAIT.value: + try: + seconds = float(argument) if argument else wait_duration + except ValueError: + raise ValueError( + f"Invalid wait duration: '{argument}'. " + "Expected numeric value in seconds." + ) + return [f"WAIT({seconds})"] + + if action_type == ActionType.FINISH.value: + self._log_info("Task completion action -> DONE") + return ["DONE"] + + if action_type == ActionType.CALL_USER.value: + self._log_info("User intervention requested") + return [] + + raise ValueError( + f"Unknown action type: '{action_type}'. " + "Supported: click, left_double, left_triple, right_single, drag, " + "hotkey, type, scroll, wait, finish, call_user" + ) + + def serialize_actions(self, actions: list[Action]) -> list[dict[str, Any]]: + """Serialize OAGI actions for trajectory logging.""" + return [ + { + "type": action.type.value, + "argument": action.argument, + "count": action.count, + } + for action in (actions or []) + ] diff --git a/src/oagi/handler/pyautogui_action_handler.py b/src/oagi/handler/pyautogui_action_handler.py index 4a2853e..c0e1baa 100644 --- a/src/oagi/handler/pyautogui_action_handler.py +++ b/src/oagi/handler/pyautogui_action_handler.py @@ -17,6 +17,7 @@ from ..exceptions import check_optional_dependency from ..types import Action, ActionType, parse_coords, parse_drag_coords, parse_scroll from .capslock_manager import CapsLockManager +from .utils import CoordinateScaler, normalize_key, parse_hotkey check_optional_dependency("pyautogui", "PyautoguiActionHandler", "desktop") import pyautogui # noqa: E402 @@ -92,6 +93,15 @@ def __init__(self, config: PyautoguiConfig | None = None): self.caps_manager = CapsLockManager(mode=self.config.capslock_mode) # The origin position of coordinates (the top-left corner of the target screen) self.origin_x, self.origin_y = 0, 0 + # Initialize coordinate scaler (OAGI uses 0-1000 normalized coordinates) + self._coord_scaler = CoordinateScaler( + source_width=1000, + source_height=1000, + target_width=self.screen_width, + target_height=self.screen_height, + origin_x=self.origin_x, + origin_y=self.origin_y, + ) def reset(self): """Reset handler state. @@ -109,6 +119,9 @@ def set_target_screen(self, screen: Screen) -> None: """ self.screen_width, self.screen_height = screen.width, screen.height self.origin_x, self.origin_y = screen.x, screen.y + # Update coordinate scaler + self._coord_scaler.set_target_size(screen.width, screen.height) + self._coord_scaler.set_origin(screen.x, screen.y) def _denormalize_coords(self, x: float, y: float) -> tuple[int, int]: """Convert coordinates from 0-1000 range to actual screen coordinates. @@ -116,26 +129,7 @@ def _denormalize_coords(self, x: float, y: float) -> tuple[int, int]: Also handles corner coordinates to prevent PyAutoGUI fail-safe trigger. Corner coordinates (0,0), (0,max), (max,0), (max,max) are offset by 1 pixel. """ - screen_x = int(x * self.screen_width / 1000) - screen_y = int(y * self.screen_height / 1000) - - # Prevent fail-safe by adjusting corner coordinates - # Check if coordinates are at screen corners (with small tolerance) - if screen_x < 1: - screen_x = 1 - elif screen_x > self.screen_width - 1: - screen_x = self.screen_width - 1 - - if screen_y < 1: - screen_y = 1 - elif screen_y > self.screen_height - 1: - screen_y = self.screen_height - 1 - - # Add origin offset to convert relative to top-left corner - screen_x += self.origin_x - screen_y += self.origin_y - - return screen_x, screen_y + return self._coord_scaler.scale(x, y, prevent_failsafe=True) def _parse_coords(self, args_str: str) -> tuple[int, int]: """Extract x, y coordinates from argument string.""" @@ -163,28 +157,15 @@ def _parse_scroll(self, args_str: str) -> tuple[int, int, str]: def _normalize_key(self, key: str) -> str: """Normalize key names for consistency.""" - key = key.strip().lower() - # Normalize caps lock variations - hotkey_variations_mapping = { - "capslock": ["caps_lock", "caps", "capslock"], - "pgup": ["page_up", "pageup"], - "pgdn": ["page_down", "pagedown"], - } - for normalized, variations in hotkey_variations_mapping.items(): - if key in variations: - return normalized - # Remap ctrl to command on macOS if enabled - if self.config.macos_ctrl_to_cmd and sys.platform == "darwin" and key == "ctrl": - return "command" - return key + return normalize_key(key, macos_ctrl_to_cmd=self.config.macos_ctrl_to_cmd) def _parse_hotkey(self, args_str: str) -> list[str]: """Parse hotkey string into list of keys.""" - # Remove parentheses if present - args_str = args_str.strip("()") - # Split by '+' to get individual keys - keys = [self._normalize_key(key) for key in args_str.split("+")] - return keys + return parse_hotkey( + args_str, + macos_ctrl_to_cmd=self.config.macos_ctrl_to_cmd, + validate=False, # Don't validate, let pyautogui handle invalid keys + ) def _move_and_wait(self, x: int, y: int) -> None: """Move cursor to position and wait before clicking.""" diff --git a/src/oagi/handler/utils.py b/src/oagi/handler/utils.py index 8db1604..db344fc 100644 --- a/src/oagi/handler/utils.py +++ b/src/oagi/handler/utils.py @@ -5,6 +5,593 @@ # This file is part of the official API project. # Licensed under the MIT License. # ----------------------------------------------------------------------------- +"""Shared utilities for action handling and conversion. + +This module provides common functionality used by both PyautoguiActionHandler +(for local execution) and action converters (for remote execution). +""" + +import sys + +# ============================================================================= +# Key Normalization Mapping +# ============================================================================= + +# Minimal key mapping - only normalizes common variations to pyautogui names +# Matches original PyautoguiActionHandler.hotkey_variations_mapping behavior exactly: +# "capslock": ["caps_lock", "caps", "capslock"] -> capslock +# "pgup": ["page_up", "pageup"] -> pgup +# "pgdn": ["page_down", "pagedown"] -> pgdn +KEY_MAP: dict[str, str] = { + # Caps lock variations -> capslock + "caps_lock": "capslock", + "caps": "capslock", + # Page up variations -> pgup (short form, matching original) + "page_up": "pgup", + "pageup": "pgup", + # Page down variations -> pgdn (short form, matching original) + "page_down": "pgdn", + "pagedown": "pgdn", +} + +# Valid pyautogui key names +PYAUTOGUI_VALID_KEYS: frozenset[str] = frozenset( + { + # Alphabet keys + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + "i", + "j", + "k", + "l", + "m", + "n", + "o", + "p", + "q", + "r", + "s", + "t", + "u", + "v", + "w", + "x", + "y", + "z", + # Number keys + "0", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + # Function keys + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "f10", + "f11", + "f12", + "f13", + "f14", + "f15", + "f16", + "f17", + "f18", + "f19", + "f20", + "f21", + "f22", + "f23", + "f24", + # Navigation keys + "up", + "down", + "left", + "right", + "home", + "end", + "pageup", + "pagedown", + "pgup", + "pgdn", + # Editing keys + "backspace", + "delete", + "del", + "insert", + "enter", + "return", + "tab", + "space", + # Modifier keys (with left/right variants) + "shift", + "shiftleft", + "shiftright", + "ctrl", + "ctrlleft", + "ctrlright", + "alt", + "altleft", + "altright", + "option", + "optionleft", + "optionright", + "command", + "win", + "winleft", + "winright", + "fn", + # Lock keys + "capslock", + "numlock", + "scrolllock", + # Special keys + "esc", + "escape", + "pause", + "printscreen", + "prtsc", + "prtscr", + "prntscrn", + "print", + "apps", + "clear", + "sleep", + # Symbols + "!", + "@", + "#", + "$", + "%", + "^", + "&", + "*", + "(", + ")", + "-", + "_", + "=", + "+", + "[", + "]", + "{", + "}", + "\\", + "|", + ";", + ":", + "'", + '"', + ",", + ".", + "<", + ">", + "/", + "?", + "`", + "~", + # Numpad keys + "num0", + "num1", + "num2", + "num3", + "num4", + "num5", + "num6", + "num7", + "num8", + "num9", + "divide", + "multiply", + "subtract", + "add", + "decimal", + # Media keys + "volumeup", + "volumedown", + "volumemute", + "playpause", + "stop", + "nexttrack", + "prevtrack", + # Browser keys + "browserback", + "browserforward", + "browserrefresh", + "browserstop", + "browsersearch", + "browserfavorites", + "browserhome", + # Application launch keys + "launchapp1", + "launchapp2", + "launchmail", + "launchmediaselect", + } +) + + +# ============================================================================= +# Coordinate Scaling +# ============================================================================= + + +class CoordinateScaler: + """Handles coordinate scaling between different coordinate systems. + + This class provides reusable coordinate transformation logic used by both + PyautoguiActionHandler (local execution) and action converters (remote execution). + + Args: + source_width: Width of the source coordinate space (e.g., 1000 for OAGI) + source_height: Height of the source coordinate space + target_width: Width of the target coordinate space (e.g., screen width) + target_height: Height of the target coordinate space + origin_x: X offset of the target coordinate origin (for multi-monitor) + origin_y: Y offset of the target coordinate origin (for multi-monitor) + """ + + def __init__( + self, + source_width: int, + source_height: int, + target_width: int, + target_height: int, + origin_x: int = 0, + origin_y: int = 0, + ): + self.source_width = source_width + self.source_height = source_height + self.target_width = target_width + self.target_height = target_height + self.origin_x = origin_x + self.origin_y = origin_y + self.scale_x = target_width / source_width + self.scale_y = target_height / source_height + + def scale( + self, + x: int | float, + y: int | float, + *, + clamp: bool = True, + prevent_failsafe: bool = False, + strict: bool = False, + ) -> tuple[int, int]: + """Scale coordinates from source to target space. + + Args: + x: X coordinate in source space + y: Y coordinate in source space + clamp: If True, clamp to valid target range + prevent_failsafe: If True, offset corner coordinates by 1 pixel + (prevents PyAutoGUI fail-safe trigger) + strict: If True, raise ValueError when coordinates are outside + valid source range [0, source_width] x [0, source_height] + + Returns: + Tuple of (target_x, target_y) in target coordinate space + + Raises: + ValueError: If strict=True and coordinates are outside valid range + """ + # Strict validation: check if coordinates are in valid source range + if strict: + if x < 0 or x > self.source_width: + raise ValueError( + f"x coordinate {x} out of valid range [0, {self.source_width}]. " + f"Coordinates must be normalized between 0 and {self.source_width}." + ) + if y < 0 or y > self.source_height: + raise ValueError( + f"y coordinate {y} out of valid range [0, {self.source_height}]. " + f"Coordinates must be normalized between 0 and {self.source_height}." + ) + + scaled_x = round(x * self.scale_x) + scaled_y = round(y * self.scale_y) + + if clamp: + # Clamp to valid range + scaled_x = max(0, min(scaled_x, self.target_width - 1)) + scaled_y = max(0, min(scaled_y, self.target_height - 1)) + + if prevent_failsafe: + # Prevent PyAutoGUI fail-safe by adjusting corner coordinates + if scaled_x < 1: + scaled_x = 1 + elif scaled_x > self.target_width - 2: + scaled_x = self.target_width - 2 + if scaled_y < 1: + scaled_y = 1 + elif scaled_y > self.target_height - 2: + scaled_y = self.target_height - 2 + + # Add origin offset (for multi-monitor support) + return scaled_x + self.origin_x, scaled_y + self.origin_y + + def set_origin(self, origin_x: int, origin_y: int) -> None: + """Update the origin offset.""" + self.origin_x = origin_x + self.origin_y = origin_y + + def set_target_size(self, width: int, height: int) -> None: + """Update the target size and recalculate scale factors.""" + self.target_width = width + self.target_height = height + self.scale_x = width / self.source_width + self.scale_y = height / self.source_height + + +# ============================================================================= +# Key Normalization Functions +# ============================================================================= + + +def normalize_key(key: str, *, macos_ctrl_to_cmd: bool = False) -> str: + """Normalize a key name to pyautogui format. + + Args: + key: Key name to normalize (e.g., "ctrl", "Control", "page_down") + macos_ctrl_to_cmd: If True and on macOS, remap 'ctrl' to 'command' + + Returns: + Normalized key name (e.g., "ctrl", "pagedown") + """ + key = key.strip().lower() + normalized = KEY_MAP.get(key, key) + + # Remap ctrl to command on macOS if enabled + if macos_ctrl_to_cmd and sys.platform == "darwin" and normalized == "ctrl": + return "command" + + return normalized + + +def parse_hotkey( + hotkey_str: str, + *, + macos_ctrl_to_cmd: bool = False, + validate: bool = True, +) -> list[str]: + """Parse a hotkey string into a list of normalized key names. + + Args: + hotkey_str: Hotkey string (e.g., "ctrl+c", "alt, tab", "Shift+Enter") + macos_ctrl_to_cmd: If True and on macOS, remap 'ctrl' to 'command' + validate: If True, validate keys against PYAUTOGUI_VALID_KEYS + + Returns: + List of normalized key names (e.g., ["ctrl", "c"]) + + Raises: + ValueError: If validate=True and any key is invalid + """ + # Remove parentheses if present + hotkey_str = hotkey_str.strip("()") + + # Split by '+' or ',' to get individual keys + if "+" in hotkey_str: + keys = [ + normalize_key(k, macos_ctrl_to_cmd=macos_ctrl_to_cmd) + for k in hotkey_str.split("+") + ] + else: + keys = [ + normalize_key(k, macos_ctrl_to_cmd=macos_ctrl_to_cmd) + for k in hotkey_str.split(",") + ] + + # Filter empty strings + keys = [k for k in keys if k] + + if validate: + validate_keys(keys) + + return keys + + +def validate_keys(keys: list[str]) -> None: + """Validate that all keys are recognized by pyautogui. + + Args: + keys: List of normalized key names + + Raises: + ValueError: If any key is invalid, with helpful suggestions + """ + invalid_keys = [k for k in keys if k and k not in PYAUTOGUI_VALID_KEYS] + + if invalid_keys: + suggestions = [] + for invalid_key in invalid_keys: + if invalid_key in ("ret",): + suggestions.append(f"'{invalid_key}' -> use 'enter' or 'return'") + elif invalid_key.startswith("num") and len(invalid_key) > 3: + suggestions.append( + f"'{invalid_key}' -> numpad keys use format 'num0'-'num9'" + ) + else: + suggestions.append(f"'{invalid_key}' is not a valid key name") + + error_msg = "Invalid key name(s) in hotkey: " + ", ".join(suggestions) + valid_sample = ", ".join(sorted(list(PYAUTOGUI_VALID_KEYS)[:30])) + error_msg += f"\n\nValid keys include: {valid_sample}... (and more)" + raise ValueError(error_msg) + + +# ============================================================================= +# Coordinate Parsing Functions +# ============================================================================= + + +def parse_click_coords( + argument: str, + scaler: CoordinateScaler, + *, + prevent_failsafe: bool = False, + strict: bool = False, +) -> tuple[int, int]: + """Parse click coordinates from argument string. + + Args: + argument: Coordinate string in format "x, y" + scaler: CoordinateScaler instance for coordinate transformation + prevent_failsafe: If True, offset corner coordinates + strict: If True, raise ValueError for out-of-range coordinates + + Returns: + Tuple of (x, y) in target coordinate space + + Raises: + ValueError: If coordinate format is invalid or (strict=True) out of range + """ + # Check for common format errors + if " and " in argument.lower() or " then " in argument.lower(): + raise ValueError( + f"Invalid click format: '{argument}'. " + "Cannot combine multiple actions with 'and' or 'then'." + ) + + parts = argument.split(",") if argument else [] + if len(parts) < 2: + raise ValueError( + f"Invalid click coordinate format: '{argument}'. " + "Expected 'x, y' (comma-separated numeric values)" + ) + + try: + x = float(parts[0].strip()) + y = float(parts[1].strip()) + return scaler.scale(x, y, prevent_failsafe=prevent_failsafe, strict=strict) + except (ValueError, IndexError) as e: + raise ValueError( + f"Failed to parse click coords '{argument}': {e}. " + "Coordinates must be comma-separated numeric values." + ) from e + + +def parse_drag_coords( + argument: str, + scaler: CoordinateScaler, + *, + prevent_failsafe: bool = False, + strict: bool = False, +) -> tuple[int, int, int, int]: + """Parse drag coordinates from argument string. + + Args: + argument: Coordinate string in format "x1, y1, x2, y2" + scaler: CoordinateScaler instance for coordinate transformation + prevent_failsafe: If True, offset corner coordinates + strict: If True, raise ValueError for out-of-range coordinates + + Returns: + Tuple of (x1, y1, x2, y2) in target coordinate space + + Raises: + ValueError: If coordinate format is invalid or (strict=True) out of range + """ + # Check for common format errors + if " and " in argument.lower() or " then " in argument.lower(): + raise ValueError( + f"Invalid drag format: '{argument}'. " + "Cannot combine multiple actions with 'and' or 'then'." + ) + + parts = argument.split(",") if argument else [] + if len(parts) != 4: + raise ValueError( + f"Invalid drag coordinate format: '{argument}'. " + "Expected 'x1, y1, x2, y2' (4 comma-separated numeric values)" + ) + + try: + sx = float(parts[0].strip()) + sy = float(parts[1].strip()) + ex = float(parts[2].strip()) + ey = float(parts[3].strip()) + x1, y1 = scaler.scale(sx, sy, prevent_failsafe=prevent_failsafe, strict=strict) + x2, y2 = scaler.scale(ex, ey, prevent_failsafe=prevent_failsafe, strict=strict) + return x1, y1, x2, y2 + except (ValueError, IndexError) as e: + raise ValueError( + f"Failed to parse drag coords '{argument}': {e}. " + "Coordinates must be comma-separated numeric values." + ) from e + + +def parse_scroll_coords( + argument: str, + scaler: CoordinateScaler, + *, + prevent_failsafe: bool = False, + strict: bool = False, +) -> tuple[int, int, str]: + """Parse scroll coordinates and direction from argument string. + + Args: + argument: Scroll string in format "x, y, direction" + scaler: CoordinateScaler instance for coordinate transformation + prevent_failsafe: If True, offset corner coordinates + strict: If True, raise ValueError for out-of-range coordinates + + Returns: + Tuple of (x, y, direction) where direction is 'up' or 'down' + + Raises: + ValueError: If format is invalid or (strict=True) coordinates out of range + """ + parts = [p.strip() for p in argument.split(",")] + if len(parts) != 3: + raise ValueError( + f"Invalid scroll format: '{argument}'. " + "Expected 'x, y, direction' (e.g., '500, 300, up')" + ) + + try: + x = float(parts[0]) + y = float(parts[1]) + direction = parts[2].lower() + + if direction not in ("up", "down"): + raise ValueError( + f"Invalid scroll direction: '{direction}'. Use 'up' or 'down'." + ) + + scaled_x, scaled_y = scaler.scale( + x, y, prevent_failsafe=prevent_failsafe, strict=strict + ) + return scaled_x, scaled_y, direction + except (ValueError, IndexError) as e: + if "scroll direction" in str(e): + raise + raise ValueError( + f"Failed to parse scroll coords '{argument}': {e}. " + "Format: 'x, y, direction'" + ) from e + + +# ============================================================================= +# Handler Utility Functions +# ============================================================================= def reset_handler(handler) -> None: diff --git a/src/oagi/handler/ydotool_action_handler.py b/src/oagi/handler/ydotool_action_handler.py index bd5fce2..354b842 100644 --- a/src/oagi/handler/ydotool_action_handler.py +++ b/src/oagi/handler/ydotool_action_handler.py @@ -15,6 +15,7 @@ from ..constants import DEFAULT_STEP_DELAY from ..types import Action, ActionType, parse_coords, parse_drag_coords, parse_scroll from .capslock_manager import CapsLockManager +from .utils import CoordinateScaler, normalize_key, parse_hotkey from .wayland_support import Ydotool, get_screen_size @@ -73,6 +74,13 @@ def __init__(self, config: YdotoolConfig | None = None) -> None: self.caps_manager = CapsLockManager(mode=self.config.capslock_mode) # The origin position of coordinates (the top-left corner of the screen) self.origin_x, self.origin_y = 0, 0 + # Initialize coordinate scaler + self._coord_scaler = CoordinateScaler( + source_width=1000, + source_height=1000, + target_width=self.screen_width, + target_height=self.screen_height, + ) def reset(self): """Reset handler state. @@ -90,6 +98,12 @@ def set_target_screen(self, screen: Screen) -> None: """ self.screen_width, self.screen_height = screen.width, screen.height self.origin_x, self.origin_y = screen.x, screen.y + self._coord_scaler = CoordinateScaler( + source_width=1000, + source_height=1000, + target_width=self.screen_width, + target_height=self.screen_height, + ) def _execute_action(self, action: Action) -> bool: """ @@ -168,45 +182,14 @@ def _execute_action(self, action: Action) -> bool: return finished def _denormalize_coords(self, x: float, y: float) -> tuple[int, int]: - """Convert coordinates from 0-1000 range to actual screen coordinates. - - Also handles corner coordinates to prevent PyAutoGUI fail-safe trigger. - Corner coordinates (0,0), (0,max), (max,0), (max,max) are offset by 1 pixel. - """ - screen_x = int(x * self.screen_width / 1000) - screen_y = int(y * self.screen_height / 1000) - - # Prevent fail-safe by adjusting corner coordinates - # Check if coordinates are at screen corners (with small tolerance) - if screen_x < 1: - screen_x = 1 - elif screen_x > self.screen_width - 1: - screen_x = self.screen_width - 1 - - if screen_y < 1: - screen_y = 1 - elif screen_y > self.screen_height - 1: - screen_y = self.screen_height - 1 - - # Add origin offset to convert relative to top-left corner - screen_x += self.origin_x - screen_y += self.origin_y - - return screen_x, screen_y + """Convert coordinates from 0-1000 range to actual screen coordinates.""" + screen_x, screen_y = self._coord_scaler.scale(x, y, prevent_failsafe=True) + # Add origin offset for multi-screen support + return screen_x + self.origin_x, screen_y + self.origin_y def _normalize_key(self, key: str) -> str: """Normalize key names for consistency.""" - key = key.strip().lower() - # Normalize caps lock variations - hotkey_variations_mapping = { - "capslock": ["caps_lock", "caps", "capslock"], - "pgup": ["page_up", "pageup"], - "pgdn": ["page_down", "pagedown"], - } - for normalized, variations in hotkey_variations_mapping.items(): - if key in variations: - return normalized - return key + return normalize_key(key) def _parse_coords(self, args_str: str) -> tuple[int, int]: """Extract x, y coordinates from argument string.""" @@ -234,11 +217,7 @@ def _parse_scroll(self, args_str: str) -> tuple[int, int, str]: def _parse_hotkey(self, args_str: str) -> list[str]: """Parse hotkey string into list of keys.""" - # Remove parentheses if present - args_str = args_str.strip("()") - # Split by '+' to get individual keys - keys = [self._normalize_key(key) for key in args_str.split("+")] - return keys + return parse_hotkey(args_str.strip("()"), validate=False) def __call__(self, actions: list[Action]) -> None: """Execute the provided list of actions.""" diff --git a/tests/test_oagi_action_converter.py b/tests/test_oagi_action_converter.py new file mode 100644 index 0000000..e14fb6d --- /dev/null +++ b/tests/test_oagi_action_converter.py @@ -0,0 +1,171 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) OpenAGI Foundation +# All rights reserved. +# +# This file is part of the official API project. +# Licensed under the MIT License. +# ----------------------------------------------------------------------------- + +import pytest + +from oagi.converters import ConverterConfig, OagiActionConverter +from oagi.types import Action, ActionType + + +@pytest.fixture +def config(): + return ConverterConfig(sandbox_width=1920, sandbox_height=1080) + + +@pytest.fixture +def converter(config): + return OagiActionConverter(config=config) + + +class TestCoordinateBasedActions: + @pytest.mark.parametrize( + "action_type,argument,expected_cmd", + [ + (ActionType.CLICK, "500, 300", "pyautogui.click(x=960, y=324)"), + (ActionType.LEFT_DOUBLE, "400, 250", "pyautogui.doubleClick(x=768, y=270)"), + (ActionType.LEFT_TRIPLE, "350, 200", "pyautogui.tripleClick(x=672, y=216)"), + ( + ActionType.RIGHT_SINGLE, + "600, 400", + "pyautogui.rightClick(x=1152, y=432)", + ), + ], + ) + def test_click_actions(self, converter, action_type, argument, expected_cmd): + action = Action(type=action_type, argument=argument, count=1) + result = converter([action]) + assert len(result) == 1 + assert result[0] == expected_cmd + + +class TestDragAction: + def test_drag_generates_two_commands(self, converter, config): + action = Action(type=ActionType.DRAG, argument="100, 100, 500, 300", count=1) + result = converter([action]) + assert len(result) == 2 + assert "pyautogui.moveTo(192, 108)" in result[0] + assert ( + f"pyautogui.dragTo(960, 324, duration={config.drag_duration})" in result[1] + ) + + +class TestHotkeyAction: + def test_hotkey_conversion(self, converter, config): + action = Action(type=ActionType.HOTKEY, argument="ctrl+c", count=1) + result = converter([action]) + assert len(result) == 1 + assert ( + f"pyautogui.hotkey('ctrl', 'c', interval={config.hotkey_interval})" + in result[0] + ) + + +class TestTypeAction: + def test_type_conversion(self, converter): + action = Action(type=ActionType.TYPE, argument="Hello World", count=1) + result = converter([action]) + assert len(result) == 1 + assert "pyautogui.typewrite" in result[0] + assert "Hello World" in result[0] + + +class TestScrollAction: + @pytest.mark.parametrize("direction,expected_amount", [("up", 2), ("down", -2)]) + def test_scroll_conversion(self, converter, direction, expected_amount): + action = Action( + type=ActionType.SCROLL, argument=f"500, 300, {direction}", count=1 + ) + result = converter([action]) + assert len(result) == 2 + assert "pyautogui.moveTo(960, 324)" in result[0] + assert f"pyautogui.scroll({expected_amount})" in result[1] + + +class TestSpecialActions: + def test_wait_action(self, converter, config): + action = Action(type=ActionType.WAIT, argument="", count=1) + result = converter([action]) + assert f"WAIT({config.wait_duration})" in result[0] + + def test_finish_action(self, converter): + action = Action(type=ActionType.FINISH, argument="", count=1) + result = converter([action]) + assert result[0] == "DONE" + + +class TestActionStringToStep: + def test_pyautogui_command(self, converter): + step = converter.action_string_to_step("pyautogui.click(x=100, y=200)") + assert step["type"] == "pyautogui" + assert step["parameters"]["code"] == "pyautogui.click(x=100, y=200)" + + def test_wait_command(self, converter): + step = converter.action_string_to_step("WAIT(5)") + assert step["type"] == "sleep" + assert step["parameters"]["seconds"] == 5.0 + + def test_done_command(self, converter): + step = converter.action_string_to_step("DONE") + assert step["type"] == "sleep" + assert step["parameters"]["seconds"] == 0 + + +class TestMultipleActions: + def test_action_count(self, converter): + action = Action(type=ActionType.CLICK, argument="500, 300", count=3) + result = converter([action]) + # Each click generates 1 command, repeated 3 times + assert len(result) == 3 + # All should be the same click command + assert all(cmd == "pyautogui.click(x=960, y=324)" for cmd in result) + + +class TestStrictCoordinateValidation: + @pytest.fixture + def strict_converter(self): + config = ConverterConfig( + sandbox_width=1920, + sandbox_height=1080, + strict_coordinate_validation=True, + ) + return OagiActionConverter(config=config) + + @pytest.mark.parametrize( + "argument,match_pattern", + [ + ("-10, 500", "x coordinate .* out of valid range"), + ("500, -10", "y coordinate .* out of valid range"), + ("1050, 500", "x coordinate .* out of valid range"), + ("500, 1050", "y coordinate .* out of valid range"), + ], + ) + def test_strict_mode_rejects_out_of_range( + self, strict_converter, argument, match_pattern + ): + action = Action(type=ActionType.CLICK, argument=argument, count=1) + with pytest.raises(RuntimeError, match=match_pattern): + strict_converter([action]) + + def test_non_strict_mode_clamps_out_of_range(self, converter): + action = Action(type=ActionType.CLICK, argument="1050, 1050", count=1) + result = converter([action]) + assert "pyautogui.click(x=1919, y=1079)" in result[0] + + @pytest.mark.parametrize( + "action_type,argument", + [ + (ActionType.DRAG, "500, 500, 1100, 500"), + (ActionType.SCROLL, "1100, 500, up"), + ], + ) + def test_strict_mode_for_other_actions( + self, strict_converter, action_type, argument + ): + action = Action(type=action_type, argument=argument, count=1) + with pytest.raises(RuntimeError, match="x coordinate .* out of valid range"): + strict_converter([action]) diff --git a/tests/test_pyautogui_action_handler.py b/tests/test_pyautogui_action_handler.py index 2d164ad..bed2ec2 100644 --- a/tests/test_pyautogui_action_handler.py +++ b/tests/test_pyautogui_action_handler.py @@ -245,15 +245,15 @@ class TestCornerCoordinatesHandling: [ # Top-left corner ("0, 0", (1, 1)), - ("1, 1", (1, 1)), + ("1, 1", (2, 1)), # Top-right corner (assuming 1920x1080 screen) - ("1000, 0", (1919, 1)), + ("1000, 0", (1918, 1)), ("999, 1", (1918, 1)), # Bottom-left corner - ("0, 1000", (1, 1079)), - ("1, 999", (1, 1078)), + ("0, 1000", (1, 1078)), + ("1, 999", (2, 1078)), # Bottom-right corner - ("1000, 1000", (1919, 1079)), + ("1000, 1000", (1918, 1078)), ("999, 999", (1918, 1078)), # Middle coordinates should not be affected ("500, 500", (960, 540)), @@ -280,7 +280,7 @@ def test_drag_with_corner_coordinates(self, mock_pyautogui, config): # Should adjust corner coordinates to prevent fail-safe mock_pyautogui.moveTo.assert_called_once_with(1, 1) mock_pyautogui.dragTo.assert_called_once_with( - 1919, 1079, duration=config.drag_duration, button="left" + 1918, 1078, duration=config.drag_duration, button="left" ) def test_scroll_with_corner_coordinates(self, mock_pyautogui, config): @@ -310,8 +310,8 @@ def test_multiple_clicks_at_corners(self, mock_pyautogui): # Check moveTo was called with the adjusted corner coordinates moveTo_calls = mock_pyautogui.moveTo.call_args_list assert (1, 1) in [call[0] for call in moveTo_calls] - assert (1919, 1) in [call[0] for call in moveTo_calls] - assert (1, 1079) in [call[0] for call in moveTo_calls] + assert (1918, 1) in [call[0] for call in moveTo_calls] + assert (1, 1078) in [call[0] for call in moveTo_calls] # Click methods called without coordinates mock_pyautogui.doubleClick.assert_called_once_with() mock_pyautogui.tripleClick.assert_called_once_with()