Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Added configurable command execution timeout (PR #1)
- Added automatic error analysis when commands fail
- Added more detailed dependency specifications

### Changed
- Improved error handling in command execution
- Enhanced timeout handling with graceful termination
- Better error reporting for common command execution failures

## [1.0.0] - 2024-04-04

### Added
Expand Down
9 changes: 8 additions & 1 deletion qcmd_cli/commands/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def parse_args():
help='Enable auto-correction mode')
parser.add_argument('-t', '--temperature', type=float, default=None,
help='Set the temperature for generation (0.0-1.0)')
parser.add_argument('--timeout', type=int, default=60,
help='Set the timeout for command execution in seconds (default: 60)')

# Utility commands
parser.add_argument('--status', action='store_true',
Expand Down Expand Up @@ -203,7 +205,12 @@ def main():
# Execute if requested, otherwise just display
if args.execute:
print(f"\nExecuting: {command}\n")
return_code, output = execute_command(command)
return_code, output = execute_command(
command,
analyze_errors=True,
model=args.model,
timeout=args.timeout
)

if return_code == 0:
print(f"\nCommand executed successfully.")
Expand Down
88 changes: 76 additions & 12 deletions qcmd_cli/core/command_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import subprocess
import shlex
import logging
from typing import List, Optional, Dict, Tuple, Any

from ..ui.display import Colors
Expand All @@ -20,8 +21,8 @@

# Additional dangerous patterns for improved detection
DANGEROUS_PATTERNS = [
# File system operations
"rm -rf", "rm -r /", "rm -f /", "rmdir /", "shred -uz",
# File system operations (more specific to avoid false positives)
"rm -rf /", "rm -r /", "rm -f /", "rmdir /", "shred -uz",
"mkfs", "dd if=/dev/zero", "format", "fdisk", "mkswap",
# Disk operations
"> /dev/sd", "of=/dev/sd", "dd of=/dev",
Expand Down Expand Up @@ -322,14 +323,15 @@ def list_models() -> List[str]:
print(f"{Colors.YELLOW}Error listing models: {e}{Colors.END}", file=sys.stderr)
return []

def execute_command(command: str, analyze_errors: bool = False, model: str = DEFAULT_MODEL) -> Tuple[int, str]:
def execute_command(command: str, analyze_errors: bool = False, model: str = DEFAULT_MODEL, timeout: int = 60) -> Tuple[int, str]:
"""
Execute a shell command and capture output.

Args:
command: The command to execute
analyze_errors: Whether to analyze errors if the command fails
model: The model to use for error analysis
timeout: Maximum execution time in seconds (default: 60)

Returns:
Tuple of (return_code, output)
Expand All @@ -355,9 +357,9 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = DEF
text=True
)

# Use a timeout (60 seconds by default)
# Use configurable timeout
try:
stdout, stderr = process.communicate(timeout=60)
stdout, stderr = process.communicate(timeout=timeout)
return_code = process.returncode

# Combine stdout and stderr
Expand All @@ -367,16 +369,57 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = DEF
output += "\n" + stderr
else:
output = stderr

# If command failed and analyze_errors is True, get AI analysis
if return_code != 0 and analyze_errors and stderr:
print(f"{Colors.YELLOW}Command failed with return code {return_code}.{Colors.END}")
print(f"{Colors.BLUE}Analyzing error...{Colors.END}")
error_analysis = analyze_error(stderr, command, model)
if error_analysis:
output += f"\n\n{Colors.BOLD}Error Analysis:{Colors.END}\n{error_analysis}"

return (return_code, output)

except subprocess.TimeoutExpired:
process.kill()
_, _ = process.communicate()
return (1, "Command execution timed out after 60 seconds.")
# Attempt to gracefully terminate the process
print(f"{Colors.RED}Command execution timed out after {timeout} seconds. Terminating...{Colors.END}")
process.terminate()

try:
# Give the process 5 seconds to terminate gracefully
process.wait(timeout=5)
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate gracefully
process.kill()
print(f"{Colors.RED}Process had to be forcefully terminated.{Colors.END}")

# Collect any output that might have been generated before timeout
try:
stdout, stderr = process.communicate(timeout=2)
partial_output = stdout if stdout else ""
if stderr:
partial_output += "\n" + stderr if partial_output else stderr

timeout_message = f"Command execution timed out after {timeout} seconds."
if partial_output:
timeout_message += f"\n\nPartial output before timeout:\n{partial_output}"

return (1, timeout_message)
except:
return (1, f"Command execution timed out after {timeout} seconds.")

except FileNotFoundError:
error_msg = f"Error: Command not found or executable could not be run."
print(f"{Colors.RED}{error_msg}{Colors.END}", file=sys.stderr)
return (127, error_msg)
except PermissionError:
error_msg = f"Error: Permission denied when trying to execute command."
print(f"{Colors.RED}{error_msg}{Colors.END}", file=sys.stderr)
return (126, error_msg)
except Exception as e:
return (1, f"Error executing command: {e}")
error_msg = f"Error executing command: {str(e)}"
print(f"{Colors.RED}{error_msg}{Colors.END}", file=sys.stderr)
return (1, error_msg)

def is_dangerous_command(command: str) -> bool:
"""
Expand All @@ -394,11 +437,32 @@ def is_dangerous_command(command: str) -> bool:
for pattern in DANGEROUS_PATTERNS:
if pattern.lower() in command_lower:
return True

# Check for commands that might delete or overwrite system files
if ("rm" in command_lower) and ("/" in command_lower) and not ("./") in command_lower:
return True
if "rm" in command_lower and "/" in command_lower:
# Split the command to examine each part
parts = command_lower.split()

# Find command arguments (skip program name and options)
path_args = []
for i, part in enumerate(parts):
# Skip the command name and options
if i == 0 or part.startswith("-"):
continue
# This might be a path argument
if "/" in part:
path_args.append(part)

# Analyze path arguments
for path in path_args:
# Safe if it starts with "./" or "../" (relative paths)
if path.startswith("./") or path.startswith("../"):
continue
# If it contains a slash but isn't a relative path, consider it dangerous
# This catches absolute paths like /home, /etc, etc.
else:
return True

# Check for sudo or doas with potentially risky commands
if ("sudo" in command_lower or "doas" in command_lower) and any(risky in command_lower for risky in [
"rm", "mkfs", "dd", "fdisk", "chmod", "chown", "mv"
Expand Down
7 changes: 6 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
requests>=2.25.0
configparser>=5.0.0
configparser>=5.0.0
colorama>=0.4.4
pyfiglet>=0.8.0
psutil>=5.9.0
tqdm>=4.64.0
prompt_toolkit>=3.0.0
121 changes: 121 additions & 0 deletions tests/test_cli_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Tests for the CLI argument handling.
"""

import unittest
import os
import sys
import argparse
from unittest.mock import patch, MagicMock

# Add parent directory to path so we can import modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

# Import the module but not the functions directly to avoid conflicts
import qcmd_cli.commands.handler


class TestCLIArguments(unittest.TestCase):
"""Test the CLI argument handling."""

def test_timeout_argument(self):
"""Test that the timeout argument is correctly parsed."""
# Create a new parser for testing to avoid conflicts
parser = argparse.ArgumentParser()
parser.add_argument('prompt', nargs='?', default=None)
parser.add_argument('-e', '--execute', action='store_true')
parser.add_argument('--timeout', type=int, default=60)

args = parser.parse_args(['--timeout', '30', 'list files'])
self.assertEqual(args.timeout, 30)
self.assertEqual(args.prompt, 'list files')

def test_default_timeout(self):
"""Test that the default timeout is used when not specified."""
# Create a new parser for testing to avoid conflicts
parser = argparse.ArgumentParser()
parser.add_argument('prompt', nargs='?', default=None)
parser.add_argument('--timeout', type=int, default=60)

args = parser.parse_args(['list files'])
self.assertEqual(args.timeout, 60) # Default value
self.assertEqual(args.prompt, 'list files')

def test_execute_with_timeout(self):
"""Test that execute and timeout can be used together."""
# Create a new parser for testing to avoid conflicts
parser = argparse.ArgumentParser()
parser.add_argument('prompt', nargs='?', default=None)
parser.add_argument('-e', '--execute', action='store_true')
parser.add_argument('--timeout', type=int, default=60)

args = parser.parse_args(['-e', '--timeout', '45', 'list files'])
self.assertTrue(args.execute)
self.assertEqual(args.timeout, 45)
self.assertEqual(args.prompt, 'list files')

def test_shell_mode(self):
"""Test shell mode argument."""
# Create a new parser for testing to avoid conflicts
parser = argparse.ArgumentParser()
parser.add_argument('prompt', nargs='?', default=None)
parser.add_argument('-s', '--shell', action='store_true')

args = parser.parse_args(['-s'])
self.assertTrue(args.shell)
self.assertIsNone(args.prompt)

@patch('qcmd_cli.commands.handler.execute_command')
@patch('qcmd_cli.commands.handler.generate_command')
@patch('qcmd_cli.commands.handler.parse_args')
def test_main_with_execute_and_timeout(self, mock_parse_args, mock_generate, mock_execute):
"""Test that main correctly passes timeout to execute_command."""
# Setup mock args
mock_args = MagicMock()
mock_args.execute = True
mock_args.prompt = "list files"
mock_args.timeout = 15
mock_args.model = None
mock_args.shell = False
mock_args.auto = False
mock_args.status = False
mock_args.check_updates = False
mock_args.history = False
mock_args.logs = False
mock_args.config = None
mock_parse_args.return_value = mock_args

# Setup other mocks
mock_generate.return_value = "ls -la"
mock_execute.return_value = (0, "Command output")

# Run main with mocked dependencies
with patch('builtins.print'):
qcmd_cli.commands.handler.main()

# Verify execute_command was called with correct timeout
mock_execute.assert_called_once()
args, kwargs = mock_execute.call_args
self.assertEqual(kwargs['timeout'], 15)

def test_help_includes_timeout(self):
"""Test that help output includes timeout parameter."""
# Create a new parser for testing
parser = argparse.ArgumentParser()
parser.add_argument('--timeout', type=int, default=60,
help='Set the timeout for command execution in seconds (default: 60)')

# Capture the help output
with patch('sys.stdout') as mock_stdout:
try:
parser.parse_args(['--help'])
except SystemExit:
pass

# Verify timeout is in help text
self.assertTrue('--timeout' in str(mock_stdout.method_calls))


if __name__ == '__main__':
unittest.main()
Loading