diff --git a/src/oagi/__init__.py b/src/oagi/__init__.py index 13eea94..880571f 100644 --- a/src/oagi/__init__.py +++ b/src/oagi/__init__.py @@ -39,8 +39,16 @@ # package_to_check is None if no optional dependency is required _LAZY_IMPORTS_DATA: dict[str, tuple[str, str | None, str | None]] = { # Action converters (no optional dependencies) - "OagiActionConverter": ("oagi.converters.oagi", None, None), - "ConverterConfig": ("oagi.converters.base", None, None), + "PyautoguiActionConvertor": ( + "oagi.converters.pyautogui_action_converter", + None, + None, + ), + "OagiActionConverter": ( + "oagi.converters.pyautogui_action_converter", + None, + None, + ), "BaseActionConverter": ("oagi.converters.base", None, None), # Desktop handlers (require pyautogui/PIL) "AsyncPyautoguiActionHandler": ( @@ -55,11 +63,7 @@ "pyautogui", "desktop", ), - "PyautoguiConfig": ( - "oagi.handler.pyautogui_action_handler", - "pyautogui", - "desktop", - ), + "PyautoguiConfig": ("oagi.handler.utils", None, None), "ScreenshotMaker": ("oagi.handler.screenshot_maker", "PIL", "desktop"), # Agent modules (lazy to avoid circular imports) "AsyncDefaultAgent": ("oagi.agent.default", None, None), @@ -92,18 +96,19 @@ from oagi.agent.default import AsyncDefaultAgent from oagi.agent.observer.agent_observer import AsyncAgentObserver from oagi.agent.tasker import TaskerAgent - from oagi.converters.base import BaseActionConverter, ConverterConfig - from oagi.converters.oagi import OagiActionConverter + from oagi.converters.base import BaseActionConverter + from oagi.converters.pyautogui_action_converter import ( + OagiActionConverter, + PyautoguiActionConvertor, + ) from oagi.handler.async_pyautogui_action_handler import AsyncPyautoguiActionHandler from oagi.handler.async_screenshot_maker import AsyncScreenshotMaker from oagi.handler.async_ydotool_action_handler import AsyncYdotoolActionHandler from oagi.handler.pil_image import PILImage - from oagi.handler.pyautogui_action_handler import ( - PyautoguiActionHandler, - PyautoguiConfig, - ) + from oagi.handler.pyautogui_action_handler import PyautoguiActionHandler from oagi.handler.screen_manager import ScreenManager from oagi.handler.screenshot_maker import ScreenshotMaker + from oagi.handler.utils import PyautoguiConfig from oagi.handler.ydotool_action_handler import YdotoolActionHandler, YdotoolConfig from oagi.server.config import ServerConfig from oagi.server.main import create_app @@ -181,7 +186,7 @@ def __dir__() -> list[str]: # Lazy imports - Screen manager "ScreenManager", # Lazy imports - Action converters + "PyautoguiActionConvertor", "OagiActionConverter", - "ConverterConfig", "BaseActionConverter", ] diff --git a/src/oagi/converters/__init__.py b/src/oagi/converters/__init__.py index 00d8d99..3261866 100644 --- a/src/oagi/converters/__init__.py +++ b/src/oagi/converters/__init__.py @@ -7,26 +7,26 @@ # ----------------------------------------------------------------------------- """Action converters for VLM support. -This module provides the base class and OAGI implementation for action converters. -Third parties can inherit from BaseActionConverter to create custom converters. +This module provides PyautoguiActionConvertor for converting OAGI actions +to pyautogui command strings, and BaseActionConverter for custom converters. Example usage: - from oagi.converters import OagiActionConverter, ConverterConfig + from oagi.converters import PyautoguiActionConvertor - # Configure for 1920x1080 sandbox - config = ConverterConfig(sandbox_width=1920, sandbox_height=1080) - converter = OagiActionConverter(config=config) + import logging + converter = PyautoguiActionConvertor(logger=logging.getLogger(__name__)) # Convert OAGI actions to pyautogui strings - result = converter(actions) # list[str] + result = converter(actions) # list[tuple[str, bool]] # Convert to runtime API steps - for cmd in result: + for cmd, is_last in result: step = converter.action_string_to_step(cmd) # Execute step via runtime API... Creating custom converters: - from oagi.converters import BaseActionConverter, ConverterConfig + from oagi.converters import BaseActionConverter + from oagi.handler.utils import PyautoguiConfig class MyActionConverter(BaseActionConverter[MyAction]): @property @@ -46,11 +46,11 @@ def serialize_actions(self, actions: list[MyAction]) -> list[dict]: ... """ -from .base import BaseActionConverter, ConverterConfig -from .oagi import OagiActionConverter +from .base import BaseActionConverter +from .pyautogui_action_converter import OagiActionConverter, PyautoguiActionConvertor __all__ = [ "BaseActionConverter", - "ConverterConfig", + "PyautoguiActionConvertor", "OagiActionConverter", ] diff --git a/src/oagi/converters/base.py b/src/oagi/converters/base.py index f0f46d2..7c1d58e 100644 --- a/src/oagi/converters/base.py +++ b/src/oagi/converters/base.py @@ -13,12 +13,12 @@ import re from abc import ABC, abstractmethod -from dataclasses import dataclass from typing import Any, Generic, TypeVar from ..handler.capslock_manager import CapsLockManager from ..handler.utils import ( CoordinateScaler, + PyautoguiConfig, normalize_key, parse_hotkey, validate_keys, @@ -27,25 +27,6 @@ T = TypeVar("T") -@dataclass -class ConverterConfig: - """Configuration for action converters. - - Matches the configuration options in PyautoguiConfig for consistency. - """ - - sandbox_width: int = 1920 - sandbox_height: int = 1080 - drag_duration: float = 0.5 - scroll_amount: int = 2 - wait_duration: float = 1.0 - hotkey_interval: float = 0.1 - capslock_mode: str = "session" - strict_coordinate_validation: bool = False - """If True, raise ValueError when coordinates are outside valid range. - If False (default), clamp coordinates to valid range (original behavior).""" - - class BaseActionConverter(ABC, Generic[T]): """Abstract base class for action converters. @@ -64,7 +45,7 @@ class BaseActionConverter(ABC, Generic[T]): def __init__( self, *, - config: ConverterConfig | None = None, + config: PyautoguiConfig | None = None, logger: Any | None = None, ): """Initialize the converter. @@ -73,7 +54,7 @@ def __init__( config: Converter configuration. Uses defaults if not provided. logger: Optional logger instance for debug/error logging. """ - self.config = config or ConverterConfig() + self.config = config or PyautoguiConfig() self.logger = logger # Initialize coordinate scaler diff --git a/src/oagi/converters/oagi.py b/src/oagi/converters/oagi.py deleted file mode 100644 index 9dfefa6..0000000 --- a/src/oagi/converters/oagi.py +++ /dev/null @@ -1,198 +0,0 @@ -# ----------------------------------------------------------------------------- -# Copyright (c) OpenAGI Foundation -# All rights reserved. -# -# This file is part of the official API project. -# Licensed under the MIT License. -# ----------------------------------------------------------------------------- -"""OAGI action converter. - -This module provides the OagiActionConverter for converting OAGI actions -to pyautogui command strings for remote execution. -""" - -from typing import Any - -from ..handler.utils import ( - parse_click_coords, - parse_drag_coords, - parse_scroll_coords, -) -from ..types import Action, ActionType -from .base import BaseActionConverter - -# OAGI uses normalized 0-1000 coordinate space -OAGI_COORD_SIZE = 1000 - - -class OagiActionConverter(BaseActionConverter[Action]): - """Convert OAGI actions to pyautogui command strings. - - This converter handles: - 1. Coordinate scaling from 0-1000 space to sandbox dimensions (1920x1080) - 2. Action format conversion from OAGI Action format to pyautogui strings - 3. Key name normalization for hotkey combinations - - The output can be converted to runtime API steps via action_string_to_step(). - """ - - @property - def coord_width(self) -> int: - return OAGI_COORD_SIZE - - @property - def coord_height(self) -> int: - return OAGI_COORD_SIZE - - def __call__(self, actions: list[Action]) -> list[str]: - """Convert OAGI actions to list of pyautogui command strings. - - Extends base implementation to handle action count and finish detection. - """ - converted: list[str] = [] - failed: list[tuple[str, str]] = [] - has_terminal = False - - if not actions: - return converted - - for action in actions: - # Check for duplicate finish()/fail() during iteration - is_terminal = action.type in (ActionType.FINISH, ActionType.FAIL) - if is_terminal: - if has_terminal: - raise ValueError( - "Duplicate finish()/fail() detected. " - "Only one finish() or fail() is allowed per action sequence." - ) - has_terminal = True - - try: - converted.extend(self._convert_action(action)) - except Exception as e: - action_repr = f"{action.type.value}({action.argument})" - self._log_error(f"Failed to convert action: {action_repr}, error: {e}") - failed.append((action_repr, str(e))) - - if not converted and actions and failed: - raise RuntimeError( - f"All action conversions failed ({len(failed)}/{len(actions)}): {failed}" - ) - return converted - - def _convert_action(self, action: Action) -> list[str]: - """Convert action to list of pyautogui command strings. - - Handles action.count for repeat support. - """ - count = action.count or 1 - single_actions = self._convert_single_action(action) - - # Repeat the actions count times - return single_actions * int(count) - - def _convert_single_action(self, action: Action) -> list[str]: - """Convert a single OAGI action to pyautogui command string(s).""" - action_type = action.type.value - argument = (action.argument or "").strip("()") - - drag_duration = self.config.drag_duration - scroll_amount = self.config.scroll_amount - wait_duration = self.config.wait_duration - hotkey_interval = self.config.hotkey_interval - strict = self.config.strict_coordinate_validation - - if action_type == ActionType.CLICK.value: - x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) - return [f"pyautogui.click(x={x}, y={y})"] - - if action_type == ActionType.LEFT_DOUBLE.value: - x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) - return [f"pyautogui.doubleClick(x={x}, y={y})"] - - if action_type == ActionType.LEFT_TRIPLE.value: - x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) - return [f"pyautogui.tripleClick(x={x}, y={y})"] - - if action_type == ActionType.RIGHT_SINGLE.value: - x, y = parse_click_coords(argument, self._coord_scaler, strict=strict) - return [f"pyautogui.rightClick(x={x}, y={y})"] - - if action_type == ActionType.DRAG.value: - sx, sy, ex, ey = parse_drag_coords( - argument, self._coord_scaler, strict=strict - ) - return [ - f"pyautogui.moveTo({sx}, {sy})", - f"pyautogui.dragTo({ex}, {ey}, duration={drag_duration})", - ] - - if action_type == ActionType.HOTKEY.value: - keys = self.parse_hotkey(argument, validate=True) - valid_keys = [k for k in keys if k] - if not valid_keys: - raise ValueError( - f"Invalid hotkey format: '{argument}'. " - "Expected key names like 'ctrl+c', 'alt+tab'" - ) - # Check if this is a caps lock key press - if len(valid_keys) == 1 and valid_keys[0] == "capslock": - if self.caps_manager.should_use_system_capslock(): - return [f"pyautogui.hotkey('capslock', interval={hotkey_interval})"] - else: - self.caps_manager.toggle() - return [] # No pyautogui command for session mode - else: - keys_str = ", ".join(repr(k) for k in valid_keys) - return [f"pyautogui.hotkey({keys_str}, interval={hotkey_interval})"] - - if action_type == ActionType.TYPE.value: - text = argument.strip("\"'") - text = self.caps_manager.transform_text(text) - return [f"pyautogui.typewrite({text!r})"] - - if action_type == ActionType.SCROLL.value: - x, y, direction = parse_scroll_coords( - argument, self._coord_scaler, strict=strict - ) - amount = scroll_amount if direction == "up" else -scroll_amount - return [f"pyautogui.moveTo({x}, {y})", f"pyautogui.scroll({amount})"] - - if action_type == ActionType.WAIT.value: - try: - seconds = float(argument) if argument else wait_duration - except ValueError: - raise ValueError( - f"Invalid wait duration: '{argument}'. " - "Expected numeric value in seconds." - ) - return [f"WAIT({seconds})"] - - if action_type == ActionType.FINISH.value: - self._log_info("Task completion action -> DONE") - return ["DONE"] - - if action_type == ActionType.FAIL.value: - self._log_info("Task infeasible action -> FAIL") - return ["FAIL"] - - if action_type == ActionType.CALL_USER.value: - self._log_info("User intervention requested") - return [] - - raise ValueError( - f"Unknown action type: '{action_type}'. " - "Supported: click, left_double, left_triple, right_single, drag, " - "hotkey, type, scroll, wait, finish, fail, call_user" - ) - - def serialize_actions(self, actions: list[Action]) -> list[dict[str, Any]]: - """Serialize OAGI actions for trajectory logging.""" - return [ - { - "type": action.type.value, - "argument": action.argument, - "count": action.count, - } - for action in (actions or []) - ] diff --git a/src/oagi/converters/pyautogui_action_converter.py b/src/oagi/converters/pyautogui_action_converter.py new file mode 100644 index 0000000..8e4d0cd --- /dev/null +++ b/src/oagi/converters/pyautogui_action_converter.py @@ -0,0 +1,544 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) OpenAGI Foundation +# All rights reserved. +# +# This file is part of the official API project. +# Licensed under the MIT License. +# ----------------------------------------------------------------------------- +"""PyAutoGUI action converter. + +This module provides PyautoguiActionConvertor for converting OAGI actions +to pyautogui command strings for remote execution via the sandbox runtime API. +""" + +import logging +import re +from typing import Any + +from ..handler.capslock_manager import CapsLockManager +from ..handler.utils import PYAUTOGUI_VALID_KEYS, PyautoguiConfig, make_type_command +from ..types import ActionType + +# Sandbox configuration constants +DEFAULT_SANDBOX_WIDTH = 1920 +DEFAULT_SANDBOX_HEIGHT = 1080 +MODEL_COORD_WIDTH = 1000 +MODEL_COORD_HEIGHT = 1000 + +# Converter uses scroll_amount=2 (not PyautoguiConfig's platform-dependent default +# of 100 on Linux) because commands execute in the remote sandbox VM. +DEFAULT_CONVERTER_SCROLL_AMOUNT = 2 + + +class PyautoguiActionConvertor: + """Convert OAGI actions to pyautogui command strings. + + This class mirrors the structure of PyautoguiActionHandler but instead of + executing actions directly, it converts them to pyautogui command strings + that can be executed remotely via the runtime API. + + Aligned with PyautoguiActionHandler structure: + - __call__: Iterate actions and call _convert_action for each + - _convert_action: Handle action count and call _convert_single_action + - _convert_single_action: Convert individual action to pyautogui string(s) + - _denormalize_coords: Map model/image coords to sandbox coords + - CapsLockManager: Handle caps lock state for text transformation + + Key differences from PyautoguiActionHandler: + - Returns pyautogui command strings instead of executing them + - Uses custom coordinate scaling for sandbox dimensions + - Integrates with runtime API via action_string_to_step method + + Args: + logger: Logger instance for error and debug logging + config: PyautoguiConfig instance. If not provided, uses default config + with scroll_amount=2 (overriding the platform-dependent default). + """ + + def __init__( + self, + *, + logger: logging.Logger, + config: PyautoguiConfig | None = None, + ) -> None: + self.logger = logger + + # Use provided config or create default with converter-specific scroll_amount + self.pyautogui_config = config or PyautoguiConfig( + scroll_amount=DEFAULT_CONVERTER_SCROLL_AMOUNT, + ) + + self.sandbox_width = self.pyautogui_config.sandbox_width + self.sandbox_height = self.pyautogui_config.sandbox_height + + self.coord_scale_x = self.sandbox_width / MODEL_COORD_WIDTH + self.coord_scale_y = self.sandbox_height / MODEL_COORD_HEIGHT + + # Initialize caps lock manager + self.caps_manager = CapsLockManager(mode=self.pyautogui_config.capslock_mode) + + def __call__(self, oagi_actions: list[Any]) -> list[tuple[str, bool]]: + """Convert OAGI actions to list of (action_string, is_last_of_repeat) tuples. + + Returns: + List of tuples: [(action_string, is_last_of_repeat), ...] + + Raises: + ValueError: If duplicate finish() actions or other format errors detected + RuntimeError: If all action conversions failed + """ + converted: list[tuple[str, bool]] = [] + failed: list[tuple] = [] + has_terminal = False + + if not oagi_actions: + return converted + + for action in oagi_actions: + # Check for duplicate finish()/fail() during iteration + action_type = getattr(action, "type", None) + is_terminal = hasattr(action_type, "value") and action_type.value in ( + ActionType.FINISH.value, + ActionType.FAIL.value, + ) + if is_terminal: + if has_terminal: + raise ValueError( + "Duplicate finish()/fail() detected. " + "Only one finish() or fail() is allowed per action sequence." + ) + has_terminal = True + + try: + converted.extend(self._convert_action(action)) + except Exception as e: + # Extract action details for better error logging + action_arg = getattr(action, "argument", "unknown") + action_repr = f"{action_type.value if hasattr(action_type, 'value') else action_type}({action_arg})" + self.logger.error( + f"Failed to convert action: {action_repr}, error: {e}" + ) + failed.append((action_repr, str(e))) + + if not converted and oagi_actions: + raise RuntimeError( + f"All action conversions failed ({len(failed)}/{len(oagi_actions)}): {failed}" + ) + return converted + + def _convert_action(self, action: Any) -> list[tuple[str, bool]]: + """Convert action to list of (action_string, is_last_of_repeat) tuples. + + Args: + action: OAGI action object + + Returns: + List of tuples: [(action_string, is_last_of_repeat), ...] + is_last_of_repeat indicates whether this is the last action in a count>1 sequence + """ + if not hasattr(action, "type") or not hasattr(action, "argument"): + raise ValueError("Action missing required attribute 'type' or 'argument'") + count = getattr(action, "count", None) or 1 + out: list[tuple[str, bool]] = [] + single_actions = self._convert_single_action(action) + + # Repeat the actions count times + for i in range(int(count)): + is_last_repeat = i == int(count) - 1 # True only on last iteration + for j, action_str in enumerate(single_actions): + # Only mark the very last command of the very last repeat as is_last + is_last = is_last_repeat and (j == len(single_actions) - 1) + out.append((action_str, is_last)) + + return out + + def _denormalize_coords(self, x: float, y: float) -> tuple[int, int]: + """Convert normalized coordinates to actual sandbox screen coordinates. + + Args: + x: Normalized x coordinate (must be in range [0, MODEL_COORD_WIDTH]) + y: Normalized y coordinate (must be in range [0, MODEL_COORD_HEIGHT]) + + Returns: + Tuple of (screen_x, screen_y) in valid screen bounds + + Raises: + ValueError: If coordinates are outside the valid model coordinate range [0, 1000] + """ + # Validate input coordinates are within model coordinate range + # Model outputs coordinates normalized between 0 and 1000 + if x < 0 or x > MODEL_COORD_WIDTH: + raise ValueError( + f"x coordinate {x} out of valid range [0, {MODEL_COORD_WIDTH}]. " + f"Coordinates must be normalized between 0 and {MODEL_COORD_WIDTH}." + ) + if y < 0 or y > MODEL_COORD_HEIGHT: + raise ValueError( + f"y coordinate {y} out of valid range [0, {MODEL_COORD_HEIGHT}]. " + f"Coordinates must be normalized between 0 and {MODEL_COORD_HEIGHT}." + ) + + scaled_x = round(x * self.coord_scale_x) + scaled_y = round(y * self.coord_scale_y) + + # Clamp coordinates to ensure valid screen positions (handles edge case at exactly 1000) + scaled_x = max(0, min(scaled_x, self.sandbox_width - 1)) + scaled_y = max(0, min(scaled_y, self.sandbox_height - 1)) + + return scaled_x, scaled_y + + def _parse_click_coords(self, argument: str) -> tuple[int, int]: + """Parse click coordinates from argument string. + + Args: + argument: Coordinate string in format "x, y" + + Returns: + Tuple of denormalized (x, y) coordinates + + Raises: + ValueError: If coordinate format is invalid or contains non-numeric values + """ + # Check for common format errors first + if " and " in argument.lower() or " then " in argument.lower(): + raise ValueError( + f"Invalid click format: '{argument}'. " + f"Cannot combine multiple actions with 'and' or 'then'. " + f"Each action must be separate in the action list." + ) + + parts = argument.split(",") if argument else [] + if len(parts) < 2: + raise ValueError( + f"Invalid click coordinate format: '{argument}'. Expected 'x, y' (comma-separated numeric values)" + ) + try: + x = float(parts[0].strip()) + y = float(parts[1].strip()) + return self._denormalize_coords(x, y) + except (ValueError, IndexError) as e: + raise ValueError( + f"Failed to parse click coords '{argument}': {e}. " + f"Coordinates must be comma-separated numeric values, e.g., 'click(500, 300)'" + ) from e + + def _parse_drag_coords(self, argument: str) -> tuple[int, int, int, int]: + """Parse drag coordinates from argument string. + + Args: + argument: Coordinate string in format "x1, y1, x2, y2" + + Returns: + Tuple of denormalized (x1, y1, x2, y2) coordinates + + Raises: + ValueError: If coordinate format is invalid or contains non-numeric values + """ + # Check for common format errors first + if " and " in argument.lower() or " then " in argument.lower(): + raise ValueError( + f"Invalid drag format: '{argument}'. " + f"Cannot combine multiple actions with 'and' or 'then'. " + f"Each action must be separate in the action list." + ) + + parts = argument.split(",") if argument else [] + if len(parts) != 4: + raise ValueError( + f"Invalid drag coordinate format: '{argument}'. " + f"Expected 'x1, y1, x2, y2' (4 comma-separated numeric values)" + ) + try: + sx = float(parts[0].strip()) + sy = float(parts[1].strip()) + ex = float(parts[2].strip()) + ey = float(parts[3].strip()) + sx, sy = self._denormalize_coords(sx, sy) + ex, ey = self._denormalize_coords(ex, ey) + return sx, sy, ex, ey + except (ValueError, IndexError) as e: + raise ValueError( + f"Failed to parse drag coords '{argument}': {e}. " + f"Coordinates must be comma-separated numeric values, e.g., 'drag(100, 200, 300, 400)'" + ) from e + + def _normalize_key(self, key: str) -> str: + """Normalize key names for consistency. + + Maps common aliases to pyautogui-recognized key names. + Handles underscore-separated key names (e.g., page_down -> pagedown). + """ + key = key.strip().lower() + + # Normalize underscore-separated key names to pyautogui format + # This handles common model outputs like page_down, print_screen, etc. + # Synced from oagi.handler.utils.normalize_key + hotkey_variations_mapping = { + "pageup": ["page_up", "pageup", "pgup"], + "pagedown": ["page_down", "pagedown", "pgdn"], + "printscreen": ["print_screen", "printscreen", "prtsc", "prtscr"], + "numlock": ["num_lock", "numlock"], + "scrolllock": ["scroll_lock", "scrolllock"], + "capslock": ["caps_lock", "caps", "capslock"], + } + for normalized, variations in hotkey_variations_mapping.items(): + if key in variations: + return normalized + + # Windows-specific key mappings + if key in ("windows", "super", "meta"): + return "win" # Windows key + + # macOS-specific key mappings + if key == "cmd": + return "command" + + # Control key alias (pyautogui uses 'ctrl', not 'control') + if key == "control": + return "ctrl" + + # Media key aliases + if key == "mute": + return "volumemute" + if key == "play": + return "playpause" + + return key + + def _validate_keys(self, keys: list[str]) -> None: + """Validate that all keys are recognized by pyautogui. + + Args: + keys: List of normalized key names + + Raises: + ValueError: If any key is invalid, with helpful suggestions + """ + invalid_keys = [k for k in keys if k and k not in PYAUTOGUI_VALID_KEYS] + + if invalid_keys: + # Provide helpful suggestions for common mistakes + suggestions = [] + for invalid_key in invalid_keys: + if invalid_key in ("return", "ret"): + suggestions.append(f"'{invalid_key}' → use 'enter' or 'return'") + elif invalid_key in ("delete", "del"): + suggestions.append(f"'{invalid_key}' → use 'delete' or 'del'") + elif invalid_key in ("escape", "esc"): + suggestions.append(f"'{invalid_key}' → use 'escape' or 'esc'") + elif invalid_key.startswith("num") and len(invalid_key) > 3: + suggestions.append( + f"'{invalid_key}' → numpad keys use format 'num0'-'num9'" + ) + else: + suggestions.append(f"'{invalid_key}' is not a valid key name") + + error_msg = "Invalid key name(s) in hotkey: " + ", ".join(suggestions) + error_msg += f"\n\nValid keys include: {', '.join(sorted(list(PYAUTOGUI_VALID_KEYS)[:30]))}... (and more)" + raise ValueError(error_msg) + + def _parse_hotkey(self, args_str: str) -> list[str]: + """Parse hotkey string into list of keys. + + Args: + args_str: Hotkey string (e.g., "ctrl+c", "alt+tab") + + Returns: + List of normalized key names + + Raises: + ValueError: If any key is invalid + """ + # Remove parentheses if present + args_str = args_str.strip("()") + + # Split by '+' or ',' to get individual keys + # This handles both formats: "ctrl+c" and "alt, tab" + if "+" in args_str: + keys = [self._normalize_key(key) for key in args_str.split("+")] + else: + # Split by comma (handles "alt, tab" format from model output) + keys = [self._normalize_key(key) for key in args_str.split(",")] + + # Validate all keys before returning + self._validate_keys(keys) + + return keys + + def _convert_single_action(self, action: Any) -> list[str]: + action_type = action.type.value + # Strip outer parentheses from argument, similar to official handler + argument = (action.argument or "").strip("()") + + drag_duration = self.pyautogui_config.drag_duration + scroll_default = self.pyautogui_config.scroll_amount + wait_default = self.pyautogui_config.wait_duration + + if action_type == ActionType.CLICK.value: + x, y = self._parse_click_coords(argument) + return [f"pyautogui.click(x={x}, y={y})"] + + if action_type == ActionType.LEFT_DOUBLE.value: + x, y = self._parse_click_coords(argument) + return [f"pyautogui.doubleClick(x={x}, y={y})"] + + if action_type == ActionType.LEFT_TRIPLE.value: + x, y = self._parse_click_coords(argument) + return [f"pyautogui.tripleClick(x={x}, y={y})"] + + if action_type == ActionType.RIGHT_SINGLE.value: + x, y = self._parse_click_coords(argument) + return [f"pyautogui.rightClick(x={x}, y={y})"] + + if action_type == ActionType.DRAG.value: + sx, sy, ex, ey = self._parse_drag_coords(argument) + return [ + f"pyautogui.moveTo({sx}, {sy})", + f"pyautogui.dragTo({ex}, {ey}, duration={drag_duration})", + ] + + if action_type == ActionType.HOTKEY.value: + keys = self._parse_hotkey(argument) + # Validate keys are not empty (already validated in _parse_hotkey) + valid_keys = [k for k in keys if k] + if not valid_keys: + raise ValueError( + f"Invalid hotkey format: '{argument}'. " + f"Expected key names like 'ctrl+c', 'alt+tab', got empty or invalid keys" + ) + # Check if this is a caps lock key press + if len(valid_keys) == 1 and valid_keys[0] == "capslock": + if self.caps_manager.should_use_system_capslock(): + # System mode: use OS-level caps lock + hotkey_interval = self.pyautogui_config.hotkey_interval + return [f"pyautogui.hotkey('capslock', interval={hotkey_interval})"] + else: + # Session mode: toggle internal state (no actual key press needed in conversion) + self.caps_manager.toggle() + return [] # No pyautogui command needed for session mode + else: + # Regular hotkey combination + keys_str = ", ".join(repr(k) for k in valid_keys) + hotkey_interval = self.pyautogui_config.hotkey_interval + return [f"pyautogui.hotkey({keys_str}, interval={hotkey_interval})"] + + if action_type == ActionType.TYPE.value: + # Remove quotes if present + text = argument.strip("\"'") + + # Apply caps lock transformation if needed + text = self.caps_manager.transform_text(text) + return [make_type_command(text)] + + if action_type == ActionType.SCROLL.value: + parts = [p.strip() for p in argument.split(",")] + if len(parts) != 3: + raise ValueError( + f"Invalid scroll format: '{argument}'. " + f"Expected 'x, y, direction' (3 comma-separated values), got {len(parts)} parts" + ) + try: + x = float(parts[0]) + y = float(parts[1]) + except (ValueError, IndexError) as e: + raise ValueError( + f"Invalid scroll coordinates: '{argument}'. " + f"x and y must be numeric values, e.g., 'scroll(500, 300, up)'" + ) from e + + x, y = self._denormalize_coords(x, y) + direction = parts[2].lower().strip() + + if direction == "up": + amount = scroll_default + elif direction == "down": + amount = -scroll_default + else: + raise ValueError( + f"Invalid scroll direction: '{direction}' in '{argument}'. Expected 'up' or 'down'" + ) + + return [f"pyautogui.moveTo({x}, {y})", f"pyautogui.scroll({amount})"] + + if action_type == ActionType.WAIT.value: + try: + seconds = float(argument) if argument else float(wait_default) + except ValueError: + raise ValueError( + f"Invalid wait duration: '{argument}'. Expected numeric value in seconds, e.g., 'wait(2.0)'" + ) from None + return [f"WAIT({seconds})"] + + if action_type == ActionType.FINISH.value: + # Task completion action + self.logger.info("Task completion action -> DONE") + return ["DONE"] + + if action_type == ActionType.FAIL.value: + # Task infeasible action + self.logger.info("Task infeasible action -> FAIL") + return ["FAIL"] + + if action_type == ActionType.CALL_USER.value: + # User intervention requested - not an error, just no-op + self.logger.info("User intervention requested") + return [] + + # Unknown action type - raise error to guide model + raise ValueError( + f"Unknown action type: '{action_type}'. " + f"Supported types: click, left_double, left_triple, right_single, drag, " + f"hotkey, type, scroll, wait, finish, fail" + ) + + # ------------------------------------------------------------------ + # Public: convert an action string into a runtime step dict + # ------------------------------------------------------------------ + def action_string_to_step(self, action: str) -> dict[str, Any]: + """Convert a single action string into a step for runtime/do API. + + Mirrors previous TaskExecutor._convert_action_to_step behavior. + """ + action_str = str(action).strip() + + if not action_str: + raise ValueError("Empty action string — invalid model output format") + + # Special markers + upper = action_str.upper() + if upper in ["DONE", "FAIL"]: + return {"type": "sleep", "parameters": {"seconds": 0}} + + # WAIT(seconds) + wait_match = re.match( + r"^WAIT\((?P[0-9]*\.?[0-9]+)\)$", action_str, re.IGNORECASE + ) + if wait_match: + seconds = float(wait_match.group("sec")) + return {"type": "sleep", "parameters": {"seconds": seconds}} + + # pyautogui code path - use direct execution for better performance + # This avoids spawning a new Python process for each action + # PynputController and _smart_paste must also use this path to preserve X11 context + action_lower = action_str.lower() + if ( + "pyautogui" in action_lower + or "pynputcontroller" in action_lower + or "_smart_paste" in action_lower + ): + return { + "type": "pyautogui", + "parameters": { + "code": action_str, + }, + } + + # Default: shell command + return { + "type": "execute", + "parameters": {"command": action_str, "shell": True}, + } + + +# Backward compatibility alias +OagiActionConverter = PyautoguiActionConvertor diff --git a/src/oagi/handler/pyautogui_action_handler.py b/src/oagi/handler/pyautogui_action_handler.py index e767637..b96aa5b 100644 --- a/src/oagi/handler/pyautogui_action_handler.py +++ b/src/oagi/handler/pyautogui_action_handler.py @@ -9,15 +9,12 @@ import sys import time -from pydantic import BaseModel, Field - from oagi.handler.screen_manager import Screen -from ..constants import DEFAULT_STEP_DELAY from ..exceptions import check_optional_dependency from ..types import Action, ActionType, parse_coords, parse_drag_coords, parse_scroll from .capslock_manager import CapsLockManager -from .utils import CoordinateScaler, normalize_key, parse_hotkey +from .utils import CoordinateScaler, PyautoguiConfig, normalize_key, parse_hotkey check_optional_dependency("pyautogui", "PyautoguiActionHandler", "desktop") import pyautogui # noqa: E402 @@ -28,45 +25,6 @@ from . import _windows -class PyautoguiConfig(BaseModel): - """Configuration for PyautoguiActionHandler.""" - - drag_duration: float = Field( - default=0.5, description="Duration for drag operations in seconds" - ) - scroll_amount: int = Field( - default=2 if sys.platform == "darwin" else 100, - description="Amount to scroll (positive for up, negative for down)", - ) - wait_duration: float = Field( - default=1.0, description="Duration for wait actions in seconds" - ) - action_pause: float = Field( - default=0.1, description="Pause between PyAutoGUI actions in seconds" - ) - hotkey_interval: float = Field( - default=0.1, description="Interval between key presses in hotkey combinations" - ) - capslock_mode: str = Field( - default="session", - description="Caps lock handling mode: 'session' (internal state) or 'system' (OS-level)", - ) - macos_ctrl_to_cmd: bool = Field( - default=True, - description="Replace 'ctrl' with 'command' in hotkey combinations on macOS", - ) - click_pre_delay: float = Field( - default=0.1, - description="Delay in seconds after moving to position before clicking", - ) - post_batch_delay: float = Field( - default=DEFAULT_STEP_DELAY, - ge=0, - description="Delay after executing all actions in a batch (seconds). " - "Allows UI to settle before next screenshot.", - ) - - class PyautoguiActionHandler: """ Handles actions to be executed using PyAutoGUI. diff --git a/src/oagi/handler/utils.py b/src/oagi/handler/utils.py index db344fc..23ea55f 100644 --- a/src/oagi/handler/utils.py +++ b/src/oagi/handler/utils.py @@ -13,6 +13,10 @@ import sys +from pydantic import BaseModel, Field + +from ..constants import DEFAULT_STEP_DELAY + # ============================================================================= # Key Normalization Mapping # ============================================================================= @@ -620,3 +624,79 @@ def configure_handler_delay(handler, step_delay: float) -> None: """ if hasattr(handler, "config") and hasattr(handler.config, "post_batch_delay"): handler.config.post_batch_delay = step_delay + + +# ============================================================================= +# PyautoguiConfig +# ============================================================================= + + +class PyautoguiConfig(BaseModel): + """Configuration for PyautoguiActionHandler and PyautoguiActionConvertor.""" + + drag_duration: float = Field( + default=0.5, description="Duration for drag operations in seconds" + ) + scroll_amount: int = Field( + default=2 if sys.platform == "darwin" else 100, + description="Amount to scroll (positive for up, negative for down)", + ) + wait_duration: float = Field( + default=1.0, description="Duration for wait actions in seconds" + ) + action_pause: float = Field( + default=0.1, description="Pause between PyAutoGUI actions in seconds" + ) + hotkey_interval: float = Field( + default=0.1, description="Interval between key presses in hotkey combinations" + ) + capslock_mode: str = Field( + default="session", + description="Caps lock handling mode: 'session' (internal state) or 'system' (OS-level)", + ) + macos_ctrl_to_cmd: bool = Field( + default=True, + description="Replace 'ctrl' with 'command' in hotkey combinations on macOS", + ) + click_pre_delay: float = Field( + default=0.1, + description="Delay in seconds after moving to position before clicking", + ) + post_batch_delay: float = Field( + default=DEFAULT_STEP_DELAY, + ge=0, + description="Delay after executing all actions in a batch (seconds). " + "Allows UI to settle before next screenshot.", + ) + sandbox_width: int = Field( + default=1920, description="Target sandbox screen width in pixels" + ) + sandbox_height: int = Field( + default=1080, description="Target sandbox screen height in pixels" + ) + strict_coordinate_validation: bool = Field( + default=False, + description="If True, raise ValueError when coordinates are outside valid range. " + "If False (default), clamp coordinates to valid range.", + ) + + +# ============================================================================= +# Type Command Utilities +# ============================================================================= + +_PYNPUT_CHAR_LIMIT = 200 + + +def make_type_command(text: str) -> str: + """Generate pyautogui code to type *text*. + + Short ASCII without newlines (<=200 chars) -> PynputController (character-by-character). + Long ASCII / Unicode / multi-line -> _smart_paste (clipboard paste, terminal-aware). + """ + if not text: + raise ValueError("Empty text for type command — invalid model output") + has_unicode = any(ord(c) > 127 for c in text) + if not has_unicode and "\n" not in text and len(text) <= _PYNPUT_CHAR_LIMIT: + return f"PynputController().type({text!r})" + return f"_smart_paste({text!r})" diff --git a/tests/test_oagi_action_converter.py b/tests/test_oagi_action_converter.py index 6acafca..7a7ba9d 100644 --- a/tests/test_oagi_action_converter.py +++ b/tests/test_oagi_action_converter.py @@ -6,20 +6,27 @@ # Licensed under the MIT License. # ----------------------------------------------------------------------------- +import logging + import pytest -from oagi.converters import ConverterConfig, OagiActionConverter +from oagi.converters import OagiActionConverter, PyautoguiActionConvertor from oagi.types import Action, ActionType @pytest.fixture -def config(): - return ConverterConfig(sandbox_width=1920, sandbox_height=1080) +def converter(): + return PyautoguiActionConvertor(logger=logging.getLogger("test")) -@pytest.fixture -def converter(config): - return OagiActionConverter(config=config) +def _cmds(result: list[tuple[str, bool]]) -> list[str]: + """Extract command strings from converter result tuples.""" + return [cmd for cmd, _ in result] + + +class TestBackwardCompatAlias: + def test_oagi_action_converter_is_alias(self): + assert OagiActionConverter is PyautoguiActionConvertor class TestCoordinateBasedActions: @@ -35,72 +42,103 @@ class TestCoordinateBasedActions: "pyautogui.rightClick(x=1152, y=432)", ), ], + ids=["click", "double-click", "triple-click", "right-click"], ) def test_click_actions(self, converter, action_type, argument, expected_cmd): action = Action(type=action_type, argument=argument, count=1) result = converter([action]) assert len(result) == 1 - assert result[0] == expected_cmd + cmd, is_last = result[0] + assert cmd == expected_cmd + assert is_last is True class TestDragAction: - def test_drag_generates_two_commands(self, converter, config): + def test_drag_generates_two_commands(self, converter): action = Action(type=ActionType.DRAG, argument="100, 100, 500, 300", count=1) result = converter([action]) - assert len(result) == 2 - assert "pyautogui.moveTo(192, 108)" in result[0] - assert ( - f"pyautogui.dragTo(960, 324, duration={config.drag_duration})" in result[1] - ) + cmds = _cmds(result) + assert len(cmds) == 2 + assert "pyautogui.moveTo(192, 108)" in cmds[0] + assert "pyautogui.dragTo(960, 324, duration=0.5)" in cmds[1] + # Only last command should have is_last=True + assert result[0][1] is False + assert result[1][1] is True class TestHotkeyAction: - def test_hotkey_conversion(self, converter, config): + def test_hotkey_conversion(self, converter): action = Action(type=ActionType.HOTKEY, argument="ctrl+c", count=1) result = converter([action]) - assert len(result) == 1 - assert ( - f"pyautogui.hotkey('ctrl', 'c', interval={config.hotkey_interval})" - in result[0] - ) + cmds = _cmds(result) + assert len(cmds) == 1 + assert "pyautogui.hotkey('ctrl', 'c', interval=0.1)" in cmds[0] class TestTypeAction: - def test_type_conversion(self, converter): + def test_short_ascii_uses_pynput(self, converter): action = Action(type=ActionType.TYPE, argument="Hello World", count=1) result = converter([action]) - assert len(result) == 1 - assert "pyautogui.typewrite" in result[0] - assert "Hello World" in result[0] + cmds = _cmds(result) + assert len(cmds) == 1 + assert "PynputController().type('Hello World')" == cmds[0] + + def test_unicode_uses_smart_paste(self, converter): + action = Action(type=ActionType.TYPE, argument="你好世界", count=1) + result = converter([action]) + cmds = _cmds(result) + assert len(cmds) == 1 + assert "_smart_paste('你好世界')" == cmds[0] + + def test_multiline_uses_smart_paste(self, converter): + action = Action(type=ActionType.TYPE, argument="line1\nline2", count=1) + result = converter([action]) + cmds = _cmds(result) + assert len(cmds) == 1 + assert "_smart_paste(" in cmds[0] + + def test_long_text_uses_smart_paste(self, converter): + long_text = "a" * 201 + action = Action(type=ActionType.TYPE, argument=long_text, count=1) + result = converter([action]) + cmds = _cmds(result) + assert len(cmds) == 1 + assert "_smart_paste(" in cmds[0] class TestScrollAction: - @pytest.mark.parametrize("direction,expected_amount", [("up", 2), ("down", -2)]) + @pytest.mark.parametrize( + "direction,expected_amount", + [("up", 2), ("down", -2)], + ids=["scroll-up", "scroll-down"], + ) def test_scroll_conversion(self, converter, direction, expected_amount): action = Action( type=ActionType.SCROLL, argument=f"500, 300, {direction}", count=1 ) result = converter([action]) - assert len(result) == 2 - assert "pyautogui.moveTo(960, 324)" in result[0] - assert f"pyautogui.scroll({expected_amount})" in result[1] + cmds = _cmds(result) + assert len(cmds) == 2 + assert "pyautogui.moveTo(960, 324)" in cmds[0] + assert f"pyautogui.scroll({expected_amount})" in cmds[1] class TestSpecialActions: - def test_wait_action(self, converter, config): + def test_wait_action(self, converter): action = Action(type=ActionType.WAIT, argument="", count=1) result = converter([action]) - assert f"WAIT({config.wait_duration})" in result[0] + cmds = _cmds(result) + assert "WAIT(1.0)" in cmds[0] def test_finish_action(self, converter): action = Action(type=ActionType.FINISH, argument="", count=1) result = converter([action]) - assert result[0] == "DONE" + assert result[0][0] == "DONE" def test_fail_action(self, converter): action = Action(type=ActionType.FAIL, argument="", count=1) result = converter([action]) - assert result[0] == "FAIL" + assert result[0][0] == "FAIL" def test_duplicate_terminal_actions_raises(self, converter): actions = [ @@ -117,6 +155,16 @@ def test_pyautogui_command(self, converter): assert step["type"] == "pyautogui" assert step["parameters"]["code"] == "pyautogui.click(x=100, y=200)" + def test_pynput_command(self, converter): + step = converter.action_string_to_step("PynputController().type('hello')") + assert step["type"] == "pyautogui" + assert step["parameters"]["code"] == "PynputController().type('hello')" + + def test_smart_paste_command(self, converter): + step = converter.action_string_to_step("_smart_paste('hello')") + assert step["type"] == "pyautogui" + assert step["parameters"]["code"] == "_smart_paste('hello')" + def test_wait_command(self, converter): step = converter.action_string_to_step("WAIT(5)") assert step["type"] == "sleep" @@ -137,22 +185,26 @@ class TestMultipleActions: def test_action_count(self, converter): action = Action(type=ActionType.CLICK, argument="500, 300", count=3) result = converter([action]) + cmds = _cmds(result) # Each click generates 1 command, repeated 3 times - assert len(result) == 3 + assert len(cmds) == 3 # All should be the same click command - assert all(cmd == "pyautogui.click(x=960, y=324)" for cmd in result) - + assert all(cmd == "pyautogui.click(x=960, y=324)" for cmd in cmds) + # Only the last should have is_last=True + assert result[0][1] is False + assert result[1][1] is False + assert result[2][1] is True + + def test_drag_count(self, converter): + action = Action(type=ActionType.DRAG, argument="100, 100, 500, 300", count=2) + result = converter([action]) + # Drag generates 2 commands, repeated 2 times = 4 total + assert len(result) == 4 + # is_last only on the very last command + assert [is_last for _, is_last in result] == [False, False, False, True] -class TestStrictCoordinateValidation: - @pytest.fixture - def strict_converter(self): - config = ConverterConfig( - sandbox_width=1920, - sandbox_height=1080, - strict_coordinate_validation=True, - ) - return OagiActionConverter(config=config) +class TestCoordinateValidation: @pytest.mark.parametrize( "argument,match_pattern", [ @@ -161,18 +213,19 @@ def strict_converter(self): ("1050, 500", "x coordinate .* out of valid range"), ("500, 1050", "y coordinate .* out of valid range"), ], + ids=["neg-x", "neg-y", "over-x", "over-y"], ) - def test_strict_mode_rejects_out_of_range( - self, strict_converter, argument, match_pattern - ): + def test_rejects_out_of_range(self, converter, argument, match_pattern): action = Action(type=ActionType.CLICK, argument=argument, count=1) + # Coordinates always validated, wraps in RuntimeError from __call__ with pytest.raises(RuntimeError, match=match_pattern): - strict_converter([action]) + converter([action]) - def test_non_strict_mode_clamps_out_of_range(self, converter): - action = Action(type=ActionType.CLICK, argument="1050, 1050", count=1) + def test_boundary_1000_clamps_to_max(self, converter): + """Coordinates at exactly 1000 are valid but clamped to screen edge.""" + action = Action(type=ActionType.CLICK, argument="1000, 1000", count=1) result = converter([action]) - assert "pyautogui.click(x=1919, y=1079)" in result[0] + assert result[0][0] == "pyautogui.click(x=1919, y=1079)" @pytest.mark.parametrize( "action_type,argument", @@ -180,10 +233,9 @@ def test_non_strict_mode_clamps_out_of_range(self, converter): (ActionType.DRAG, "500, 500, 1100, 500"), (ActionType.SCROLL, "1100, 500, up"), ], + ids=["drag-over-range", "scroll-over-range"], ) - def test_strict_mode_for_other_actions( - self, strict_converter, action_type, argument - ): + def test_other_actions_reject_out_of_range(self, converter, action_type, argument): action = Action(type=action_type, argument=argument, count=1) with pytest.raises(RuntimeError, match="x coordinate .* out of valid range"): - strict_converter([action]) + converter([action])