From 3d97158f713e1561939465a319e70eda15b94cf4 Mon Sep 17 00:00:00 2001 From: ibrahim Date: Tue, 8 Apr 2025 19:51:52 +0000 Subject: [PATCH 1/3] Improve command execution error handling and add timeout configuration --- CHANGELOG.md | 12 +++++++ qcmd_cli/commands/handler.py | 9 ++++- qcmd_cli/core/command_generator.py | 57 ++++++++++++++++++++++++++---- requirements.txt | 7 +++- 4 files changed, 76 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84b9fd9..e8b00eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added +- Added configurable command execution timeout (PR #1) +- Added automatic error analysis when commands fail +- Added more detailed dependency specifications + +### Changed +- Improved error handling in command execution +- Enhanced timeout handling with graceful termination +- Better error reporting for common command execution failures + ## [1.0.0] - 2024-04-04 ### Added diff --git a/qcmd_cli/commands/handler.py b/qcmd_cli/commands/handler.py index 8db8975..13d19f6 100644 --- a/qcmd_cli/commands/handler.py +++ b/qcmd_cli/commands/handler.py @@ -43,6 +43,8 @@ def parse_args(): help='Enable auto-correction mode') parser.add_argument('-t', '--temperature', type=float, default=None, help='Set the temperature for generation (0.0-1.0)') + parser.add_argument('--timeout', type=int, default=60, + help='Set the timeout for command execution in seconds (default: 60)') # Utility commands parser.add_argument('--status', action='store_true', @@ -203,7 +205,12 @@ def main(): # Execute if requested, otherwise just display if args.execute: print(f"\nExecuting: {command}\n") - return_code, output = execute_command(command) + return_code, output = execute_command( + command, + analyze_errors=True, + model=args.model, + timeout=args.timeout + ) if return_code == 0: print(f"\nCommand executed successfully.") diff --git a/qcmd_cli/core/command_generator.py b/qcmd_cli/core/command_generator.py index a70584e..78ea768 100644 --- a/qcmd_cli/core/command_generator.py +++ b/qcmd_cli/core/command_generator.py @@ -9,6 +9,7 @@ import sys import subprocess import shlex +import logging from typing import List, Optional, Dict, Tuple, Any from ..ui.display import Colors @@ -322,7 +323,7 @@ def list_models() -> List[str]: print(f"{Colors.YELLOW}Error listing models: {e}{Colors.END}", file=sys.stderr) return [] -def execute_command(command: str, analyze_errors: bool = False, model: str = DEFAULT_MODEL) -> Tuple[int, str]: +def execute_command(command: str, analyze_errors: bool = False, model: str = DEFAULT_MODEL, timeout: int = 60) -> Tuple[int, str]: """ Execute a shell command and capture output. @@ -330,6 +331,7 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = DEF command: The command to execute analyze_errors: Whether to analyze errors if the command fails model: The model to use for error analysis + timeout: Maximum execution time in seconds (default: 60) Returns: Tuple of (return_code, output) @@ -355,9 +357,9 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = DEF text=True ) - # Use a timeout (60 seconds by default) + # Use configurable timeout try: - stdout, stderr = process.communicate(timeout=60) + stdout, stderr = process.communicate(timeout=timeout) return_code = process.returncode # Combine stdout and stderr @@ -367,16 +369,57 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = DEF output += "\n" + stderr else: output = stderr + + # If command failed and analyze_errors is True, get AI analysis + if return_code != 0 and analyze_errors and stderr: + print(f"{Colors.YELLOW}Command failed with return code {return_code}.{Colors.END}") + print(f"{Colors.BLUE}Analyzing error...{Colors.END}") + error_analysis = analyze_error(stderr, command, model) + if error_analysis: + output += f"\n\n{Colors.BOLD}Error Analysis:{Colors.END}\n{error_analysis}" return (return_code, output) except subprocess.TimeoutExpired: - process.kill() - _, _ = process.communicate() - return (1, "Command execution timed out after 60 seconds.") + # Attempt to gracefully terminate the process + print(f"{Colors.RED}Command execution timed out after {timeout} seconds. Terminating...{Colors.END}") + process.terminate() + + try: + # Give the process 5 seconds to terminate gracefully + process.wait(timeout=5) + except subprocess.TimeoutExpired: + # Force kill if it doesn't terminate gracefully + process.kill() + print(f"{Colors.RED}Process had to be forcefully terminated.{Colors.END}") + + # Collect any output that might have been generated before timeout + try: + stdout, stderr = process.communicate(timeout=2) + partial_output = stdout if stdout else "" + if stderr: + partial_output += "\n" + stderr if partial_output else stderr + + timeout_message = f"Command execution timed out after {timeout} seconds." + if partial_output: + timeout_message += f"\n\nPartial output before timeout:\n{partial_output}" + + return (1, timeout_message) + except: + return (1, f"Command execution timed out after {timeout} seconds.") + except FileNotFoundError: + error_msg = f"Error: Command not found or executable could not be run." + print(f"{Colors.RED}{error_msg}{Colors.END}", file=sys.stderr) + return (127, error_msg) + except PermissionError: + error_msg = f"Error: Permission denied when trying to execute command." + print(f"{Colors.RED}{error_msg}{Colors.END}", file=sys.stderr) + return (126, error_msg) except Exception as e: - return (1, f"Error executing command: {e}") + error_msg = f"Error executing command: {str(e)}" + print(f"{Colors.RED}{error_msg}{Colors.END}", file=sys.stderr) + return (1, error_msg) def is_dangerous_command(command: str) -> bool: """ diff --git a/requirements.txt b/requirements.txt index 5663c09..64c8857 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,7 @@ requests>=2.25.0 -configparser>=5.0.0 \ No newline at end of file +configparser>=5.0.0 +colorama>=0.4.4 +pyfiglet>=0.8.0 +psutil>=5.9.0 +tqdm>=4.64.0 +prompt_toolkit>=3.0.0 \ No newline at end of file From cbbe6fcdc7effb2ff2b4930236e7563ca96d2c11 Mon Sep 17 00:00:00 2001 From: ibrahim Date: Tue, 8 Apr 2025 19:55:21 +0000 Subject: [PATCH 2/3] Add unit test for timeout functionality --- tests/test_qcmd.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/test_qcmd.py b/tests/test_qcmd.py index 2ae39f7..00900f7 100644 --- a/tests/test_qcmd.py +++ b/tests/test_qcmd.py @@ -8,13 +8,14 @@ import sys import tempfile from unittest.mock import patch, MagicMock +import subprocess # Add parent directory to path so we can import modules sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Import functions to test try: - from qcmd_cli.core.command_generator import is_dangerous_command + from qcmd_cli.core.command_generator import is_dangerous_command, execute_command from qcmd_cli.config.settings import load_config, save_config, CONFIG_FILE except ImportError: # If running as script @@ -85,5 +86,27 @@ def test_config_save_load(self): self.assertEqual(loaded_config["analyze_errors"], test_config["analyze_errors"]) +class TestQcmdExecutor(unittest.TestCase): + """Test the command execution features of qcmd.""" + + @patch('subprocess.Popen') + def test_command_timeout(self, mock_popen): + """Test that command execution handles timeouts correctly.""" + # Setup mock to simulate TimeoutExpired + process_mock = MagicMock() + process_mock.communicate.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=2) + mock_popen.return_value = process_mock + + # Test with a short timeout + return_code, output = execute_command("sleep 10", timeout=2) + + # Verify results + self.assertEqual(return_code, 1) + self.assertTrue("timed out after 2 seconds" in output) + + # Verify process was terminated + process_mock.terminate.assert_called_once() + + if __name__ == '__main__': unittest.main() \ No newline at end of file From 97fba0f33d35bba51bc8cadb5f14a32e98605e7f Mon Sep 17 00:00:00 2001 From: ibrahim Date: Tue, 8 Apr 2025 20:08:35 +0000 Subject: [PATCH 3/3] Add comprehensive tests and fix dangerous command detection for relative paths --- qcmd_cli/core/command_generator.py | 31 ++++- tests/test_cli_args.py | 121 +++++++++++++++++ tests/test_command_generator.py | 200 +++++++++++++++++++++++++++++ 3 files changed, 347 insertions(+), 5 deletions(-) create mode 100644 tests/test_cli_args.py create mode 100644 tests/test_command_generator.py diff --git a/qcmd_cli/core/command_generator.py b/qcmd_cli/core/command_generator.py index 78ea768..774acf6 100644 --- a/qcmd_cli/core/command_generator.py +++ b/qcmd_cli/core/command_generator.py @@ -21,8 +21,8 @@ # Additional dangerous patterns for improved detection DANGEROUS_PATTERNS = [ - # File system operations - "rm -rf", "rm -r /", "rm -f /", "rmdir /", "shred -uz", + # File system operations (more specific to avoid false positives) + "rm -rf /", "rm -r /", "rm -f /", "rmdir /", "shred -uz", "mkfs", "dd if=/dev/zero", "format", "fdisk", "mkswap", # Disk operations "> /dev/sd", "of=/dev/sd", "dd of=/dev", @@ -437,11 +437,32 @@ def is_dangerous_command(command: str) -> bool: for pattern in DANGEROUS_PATTERNS: if pattern.lower() in command_lower: return True - + # Check for commands that might delete or overwrite system files - if ("rm" in command_lower) and ("/" in command_lower) and not ("./") in command_lower: - return True + if "rm" in command_lower and "/" in command_lower: + # Split the command to examine each part + parts = command_lower.split() + # Find command arguments (skip program name and options) + path_args = [] + for i, part in enumerate(parts): + # Skip the command name and options + if i == 0 or part.startswith("-"): + continue + # This might be a path argument + if "/" in part: + path_args.append(part) + + # Analyze path arguments + for path in path_args: + # Safe if it starts with "./" or "../" (relative paths) + if path.startswith("./") or path.startswith("../"): + continue + # If it contains a slash but isn't a relative path, consider it dangerous + # This catches absolute paths like /home, /etc, etc. + else: + return True + # Check for sudo or doas with potentially risky commands if ("sudo" in command_lower or "doas" in command_lower) and any(risky in command_lower for risky in [ "rm", "mkfs", "dd", "fdisk", "chmod", "chown", "mv" diff --git a/tests/test_cli_args.py b/tests/test_cli_args.py new file mode 100644 index 0000000..67889e3 --- /dev/null +++ b/tests/test_cli_args.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +""" +Tests for the CLI argument handling. +""" + +import unittest +import os +import sys +import argparse +from unittest.mock import patch, MagicMock + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import the module but not the functions directly to avoid conflicts +import qcmd_cli.commands.handler + + +class TestCLIArguments(unittest.TestCase): + """Test the CLI argument handling.""" + + def test_timeout_argument(self): + """Test that the timeout argument is correctly parsed.""" + # Create a new parser for testing to avoid conflicts + parser = argparse.ArgumentParser() + parser.add_argument('prompt', nargs='?', default=None) + parser.add_argument('-e', '--execute', action='store_true') + parser.add_argument('--timeout', type=int, default=60) + + args = parser.parse_args(['--timeout', '30', 'list files']) + self.assertEqual(args.timeout, 30) + self.assertEqual(args.prompt, 'list files') + + def test_default_timeout(self): + """Test that the default timeout is used when not specified.""" + # Create a new parser for testing to avoid conflicts + parser = argparse.ArgumentParser() + parser.add_argument('prompt', nargs='?', default=None) + parser.add_argument('--timeout', type=int, default=60) + + args = parser.parse_args(['list files']) + self.assertEqual(args.timeout, 60) # Default value + self.assertEqual(args.prompt, 'list files') + + def test_execute_with_timeout(self): + """Test that execute and timeout can be used together.""" + # Create a new parser for testing to avoid conflicts + parser = argparse.ArgumentParser() + parser.add_argument('prompt', nargs='?', default=None) + parser.add_argument('-e', '--execute', action='store_true') + parser.add_argument('--timeout', type=int, default=60) + + args = parser.parse_args(['-e', '--timeout', '45', 'list files']) + self.assertTrue(args.execute) + self.assertEqual(args.timeout, 45) + self.assertEqual(args.prompt, 'list files') + + def test_shell_mode(self): + """Test shell mode argument.""" + # Create a new parser for testing to avoid conflicts + parser = argparse.ArgumentParser() + parser.add_argument('prompt', nargs='?', default=None) + parser.add_argument('-s', '--shell', action='store_true') + + args = parser.parse_args(['-s']) + self.assertTrue(args.shell) + self.assertIsNone(args.prompt) + + @patch('qcmd_cli.commands.handler.execute_command') + @patch('qcmd_cli.commands.handler.generate_command') + @patch('qcmd_cli.commands.handler.parse_args') + def test_main_with_execute_and_timeout(self, mock_parse_args, mock_generate, mock_execute): + """Test that main correctly passes timeout to execute_command.""" + # Setup mock args + mock_args = MagicMock() + mock_args.execute = True + mock_args.prompt = "list files" + mock_args.timeout = 15 + mock_args.model = None + mock_args.shell = False + mock_args.auto = False + mock_args.status = False + mock_args.check_updates = False + mock_args.history = False + mock_args.logs = False + mock_args.config = None + mock_parse_args.return_value = mock_args + + # Setup other mocks + mock_generate.return_value = "ls -la" + mock_execute.return_value = (0, "Command output") + + # Run main with mocked dependencies + with patch('builtins.print'): + qcmd_cli.commands.handler.main() + + # Verify execute_command was called with correct timeout + mock_execute.assert_called_once() + args, kwargs = mock_execute.call_args + self.assertEqual(kwargs['timeout'], 15) + + def test_help_includes_timeout(self): + """Test that help output includes timeout parameter.""" + # Create a new parser for testing + parser = argparse.ArgumentParser() + parser.add_argument('--timeout', type=int, default=60, + help='Set the timeout for command execution in seconds (default: 60)') + + # Capture the help output + with patch('sys.stdout') as mock_stdout: + try: + parser.parse_args(['--help']) + except SystemExit: + pass + + # Verify timeout is in help text + self.assertTrue('--timeout' in str(mock_stdout.method_calls)) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_command_generator.py b/tests/test_command_generator.py new file mode 100644 index 0000000..229c9df --- /dev/null +++ b/tests/test_command_generator.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +""" +Tests for the command generator functionality. +""" + +import unittest +import os +import sys +import subprocess +from unittest.mock import patch, MagicMock, call + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +try: + from qcmd_cli.core.command_generator import ( + execute_command, is_dangerous_command, + generate_command, analyze_error, fix_command + ) +except ImportError: + print("Could not import qcmd_cli module. Make sure it's in your PYTHONPATH.") + sys.exit(1) + + +class TestCommandGenerator(unittest.TestCase): + """Test the command generator functionality.""" + + @patch('requests.post') + def test_generate_command(self, mock_post): + """Test command generation.""" + # Mock response from Ollama API + mock_response = MagicMock() + mock_response.json.return_value = {"response": "ls -la"} + mock_post.return_value = mock_response + + # Test command generation + result = generate_command("list all files in the current directory") + + # Verify + self.assertEqual(result, "ls -la") + mock_post.assert_called_once() + + @patch('requests.post') + def test_generate_command_with_markdown(self, mock_post): + """Test command generation with markdown formatting.""" + # Test different markdown formatted responses + tests = [ + {"response": "```\nls -la\n```", "expected": "ls -la"}, + {"response": "`ls -la`", "expected": "ls -la"}, + {"response": "```bash\nls -la\n```", "expected": "ls -la"} + ] + + for i, test in enumerate(tests): + mock_response = MagicMock() + mock_response.json.return_value = {"response": test["response"]} + mock_post.return_value = mock_response + + result = generate_command(f"test case {i}") + self.assertEqual(result, test["expected"]) + + @patch('requests.post') + def test_analyze_error(self, mock_post): + """Test error analysis functionality.""" + # Mock response from Ollama API + mock_response = MagicMock() + mock_response.json.return_value = {"response": "This command failed because the file does not exist."} + mock_post.return_value = mock_response + + # Test error analysis + result = analyze_error("No such file or directory", "cat nonexistent.txt") + + # Verify + self.assertEqual(result, "This command failed because the file does not exist.") + mock_post.assert_called_once() + + @patch('subprocess.Popen') + def test_command_timeout_handling(self, mock_popen): + """Test timeout handling during command execution.""" + # Setup mock to simulate TimeoutExpired + process_mock = MagicMock() + process_mock.communicate.side_effect = subprocess.TimeoutExpired(cmd="sleep 100", timeout=5) + mock_popen.return_value = process_mock + + # Test with a custom timeout + return_code, output = execute_command("sleep 100", timeout=5) + + # Verify results + self.assertEqual(return_code, 1, "Should return error code for timeout") + self.assertTrue("timed out after 5 seconds" in output, "Output should mention timeout") + + # Verify process was terminated properly + process_mock.terminate.assert_called_once() + + @patch('subprocess.Popen') + def test_command_execution_success(self, mock_popen): + """Test successful command execution.""" + # Setup mock for successful execution + process_mock = MagicMock() + process_mock.communicate.return_value = ("Command output", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Execute command + return_code, output = execute_command("echo 'test'") + + # Verify results + self.assertEqual(return_code, 0, "Should return success code") + self.assertEqual(output, "Command output", "Should return command output") + + @patch('subprocess.Popen') + def test_command_execution_error(self, mock_popen): + """Test command execution with error.""" + # Setup mock for failed execution + process_mock = MagicMock() + process_mock.communicate.return_value = ("", "Command error") + process_mock.returncode = 1 + mock_popen.return_value = process_mock + + # Execute command + return_code, output = execute_command("invalid_command") + + # Verify results + self.assertEqual(return_code, 1, "Should return error code") + self.assertEqual(output, "Command error", "Should return error output") + + @patch('subprocess.Popen') + def test_command_execution_file_not_found(self, mock_popen): + """Test handling of FileNotFoundError.""" + # Setup mock to raise FileNotFoundError + mock_popen.side_effect = FileNotFoundError("No such file or command") + + # Execute command + return_code, output = execute_command("nonexistent_command") + + # Verify results + self.assertEqual(return_code, 127, "Should return command not found code") + self.assertTrue("Command not found" in output, "Output should indicate command not found") + + @patch('subprocess.Popen') + def test_command_execution_permission_error(self, mock_popen): + """Test handling of PermissionError.""" + # Setup mock to raise PermissionError + mock_popen.side_effect = PermissionError("Permission denied") + + # Execute command + return_code, output = execute_command("no_permission_command") + + # Verify results + self.assertEqual(return_code, 126, "Should return permission denied code") + self.assertTrue("Permission denied" in output, "Output should indicate permission denied") + + def test_dangerous_command_detection(self): + """Test detection of dangerous commands.""" + dangerous_commands = [ + "rm -rf /", + "rm -r /home", + "sudo rm -rf /", + "dd if=/dev/zero of=/dev/sda", + "sudo mkfs.ext4 /dev/sda1", + ":(){:|:&};:", + "chmod -R 777 /" + ] + + safe_commands = [ + "ls -la", + "cd /home/user", + "cat file.txt", + "grep 'pattern' file.txt", + "echo 'hello world'" + ] + + # Test commands with relative paths + relative_path_commands = { + "rm -rf ./test_dir": False, # Should be safe (current directory) + "rm -rf ../test_dir": False, # Should be safe (parent directory) + "rm -rf /test_dir": True, # Should be dangerous (absolute path) + "rm file.txt": False # Should be safe (no path) + } + + # Test dangerous commands + for cmd in dangerous_commands: + self.assertTrue(is_dangerous_command(cmd), f"Should detect '{cmd}' as dangerous") + + # Test safe commands + for cmd in safe_commands: + self.assertFalse(is_dangerous_command(cmd), f"Should not detect '{cmd}' as safe") + + # Test relative path handling + for cmd, should_be_dangerous in relative_path_commands.items(): + if should_be_dangerous: + self.assertTrue(is_dangerous_command(cmd), + f"Should detect '{cmd}' as dangerous") + else: + self.assertFalse(is_dangerous_command(cmd), + f"Should detect '{cmd}' as safe") + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file