diff --git a/pyproject.toml b/pyproject.toml index bddee1f..926a423 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] # This uses the PyPI-optimized README for better display on PyPI name = "ibrahimiq-qcmd" -version = "1.0.0" +version = "1.0.15" authors = [ {name = "Ibrahim IQ", email = "example@example.com"} ] diff --git a/qcmd_cli/__init__.py b/qcmd_cli/__init__.py index 0ff87d2..3c524cd 100644 --- a/qcmd_cli/__init__.py +++ b/qcmd_cli/__init__.py @@ -2,7 +2,7 @@ QCMD CLI - A command-line tool that generates shell commands using Qwen2.5-Coder via Ollama. """ -__version__ = "1.0.0" +__version__ = "1.0.15" # Don't import modules here to avoid circular dependencies diff --git a/qcmd_cli/config/constants.py b/qcmd_cli/config/constants.py new file mode 100644 index 0000000..4d375e1 --- /dev/null +++ b/qcmd_cli/config/constants.py @@ -0,0 +1,37 @@ +""" +Constants and configuration settings for QCMD. +""" + +import os + +# Paths +HOME_DIR = os.path.expanduser("~") +CONFIG_DIR = os.path.join(HOME_DIR, ".qcmd") +LOG_DIR = os.path.join(CONFIG_DIR, "logs") +SESSIONS_FILE = os.path.join(CONFIG_DIR, "sessions.json") + +# API URLs +OLLAMA_API = os.environ.get("OLLAMA_API", "http://127.0.0.1:11434/api") + +# Version +VERSION = "1.0.10" + +# System prompt template for command generation +SYSTEM_PROMPT_TEMPLATE = """You are QCMD, an AI assistant specialized in Linux system administration, log analysis, and command generation. +Your goal is to provide accurate, secure, and helpful commands based on user requests. + +- Generate shell commands that are valid for Linux systems. +- Prioritize safety and security in your suggestions. +- Explain complex commands when necessary. +- Always verify that commands won't harm the system. +- Suggest alternatives when relevant. + +Current system context: +- User shell: {user_shell} +- OS: {os_info} +- Working directory: {cwd} + +Respond only with the command(s) that would accomplish the task. Do not include explanations or markdown formatting.""" + +# Default number of log lines to show +DEFAULT_LOG_LINES = 100 \ No newline at end of file diff --git a/qcmd_cli/core/interactive_shell.py b/qcmd_cli/core/interactive_shell.py index 9c621d6..27669bf 100644 --- a/qcmd_cli/core/interactive_shell.py +++ b/qcmd_cli/core/interactive_shell.py @@ -1,21 +1,33 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- + """ -Interactive shell functionality for QCMD. +Interactive shell module for QCMD. +This module provides a shell-like interface for users to interact with the system. """ + import os import sys import readline -import signal import time -from typing import List, Optional, Dict, Any +import atexit +import signal +from typing import List, Optional, Tuple +from datetime import datetime -# Import from other modules -from ..core.command_generator import generate_command, execute_command, fix_command -from ..utils.history import save_to_history, load_history, show_history -from ..config.settings import DEFAULT_MODEL, load_config -from ..ui.display import Colors, print_cool_header, print_examples, display_help_command -from ..log_analysis.log_files import handle_log_analysis -from ..utils.system import display_system_status, check_for_updates +# Import from other modules +from qcmd_cli.config.settings import DEFAULT_MODEL +from qcmd_cli.config.constants import CONFIG_DIR +from qcmd_cli.ui.display import Colors, print_cool_header, clear_screen +from qcmd_cli.core.command_generator import generate_command, is_dangerous_command, list_models, fix_command +from qcmd_cli.utils.history import save_to_history, load_history, show_history +from qcmd_cli.utils.system import execute_command, get_system_status, display_update_status, display_system_status +from qcmd_cli.log_analysis.log_files import handle_log_analysis +from qcmd_cli.log_analysis.analyzer import analyze_log_file +from qcmd_cli.utils.ollama import is_ollama_running + +# Setup session tracking +from qcmd_cli.utils.session import create_session, update_session_activity, cleanup_stale_sessions, end_session class SimpleCompleter: """ @@ -44,289 +56,558 @@ def complete(self, text, state): return response -def start_interactive_shell(auto_mode_enabled: bool = False, current_model: str = DEFAULT_MODEL, - current_temperature: float = 0.7, max_attempts: int = 3) -> None: +def start_interactive_shell( + auto_mode_enabled: bool = False, + current_model: str = DEFAULT_MODEL, + current_temperature: float = 0.7, + max_attempts: int = 3 +) -> None: """ - Start the interactive QCMD shell. + Start an interactive shell for continuous command generation. Args: - auto_mode_enabled: Whether auto-correction mode is enabled - current_model: The model to use for command generation - current_temperature: Temperature parameter for generation - max_attempts: Maximum number of auto-correction attempts + auto_mode_enabled: Whether to run in auto mode (auto execute and fix commands) + current_model: Model to use for generation + current_temperature: Temperature for generation (0.0-1.0) + max_attempts: Maximum number of fix attempts in auto mode """ - # Available shell commands for autocompletion - shell_commands = [ - '!help', '!exit', '!quit', '!clear', '!history', '!search', '!status', - '!model', '!models', '!temp', '!temperature', '!auto on', '!auto off', - '!max', '!update', '!logs', '!config', '!analyze', '!monitor', '!watch', '!!' - ] + # Create config directory if it doesn't exist + os.makedirs(CONFIG_DIR, exist_ok=True) - # Set up command completion - completer = SimpleCompleter(shell_commands) - readline.set_completer(completer.complete) - readline.parse_and_bind('tab: complete') + # History file setup + history_file = os.path.join(CONFIG_DIR, 'qcmd_history') + try: + readline.read_history_file(history_file) + readline.set_history_length(1000) + except (FileNotFoundError, PermissionError): + # If history file doesn't exist or can't be read, just continue + pass + atexit.register(readline.write_history_file, history_file) - # Register signal handlers for clean exit - def handle_sigint(signum, frame): - print("\nExiting QCMD interactive shell...") - sys.exit(0) + # Setup session tracking + session_id = create_session({ + 'type': 'interactive_shell', + 'model': current_model, + 'start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + 'auto_mode': auto_mode_enabled, + 'temperature': current_temperature + }) + + # Check if Ollama is running + if not is_ollama_running(): + print(f"\n{Colors.YELLOW}Warning: Ollama API is not running. Commands will not work properly.{Colors.END}") + print(f"{Colors.YELLOW}Start Ollama with 'ollama serve' and try again.{Colors.END}") - signal.signal(signal.SIGINT, handle_sigint) + # Display QCMD banner + clear_screen() + _display_banner() + + # Check for QCMD updates on startup + display_update_status() # Welcome message - print(f"\n{Colors.GREEN}{Colors.BOLD}QCMD Interactive Shell{Colors.END}") - print(f"Type a description of the command you need, or {Colors.BOLD}!help{Colors.END} for assistance.") - print(f"Model: {Colors.CYAN}{current_model}{Colors.END}, Temperature: {Colors.CYAN}{current_temperature}{Colors.END}") - print(f"Auto-correction mode: {Colors.CYAN}{'Enabled' if auto_mode_enabled else 'Disabled'}{Colors.END}") - print(f"Type {Colors.BOLD}!exit{Colors.END} to quit.\n") + print(f"\n{Colors.CYAN}Welcome to the QCMD Interactive Shell!{Colors.END}") + print(f"Using model: {Colors.GREEN}{current_model}{Colors.END}") + print(f"Temperature: {Colors.GREEN}{current_temperature}{Colors.END}") + print(f"Auto mode: {Colors.GREEN}{auto_mode_enabled}{Colors.END}") + print(f"\nEnter your command descriptions or type {Colors.YELLOW}/help{Colors.END} for more options.") + print(f"{Colors.YELLOW}Type /exit to quit{Colors.END}") + + # Command history for the current session + session_history = [] + analyze_errors = True - # Main interaction loop - last_prompt = None - while True: + # Cleanup stale sessions on startup + cleanup_stale_sessions() + + # Define a signal handler to gracefully exit + def signal_handler(sig, frame): + print(f"\n{Colors.CYAN}Received signal {sig}, exiting...{Colors.END}") + # End the session when receiving a signal try: - # Get user input - prompt = input(f"{Colors.GREEN}qcmd>{Colors.END} ").strip() - - # Skip empty input - if not prompt: - continue + end_session(session_id) + except Exception: + pass + sys.exit(0) + + # Register signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # Create a timer for periodically updating session activity + last_activity_update = time.time() + update_interval = 60 # Update session activity every 60 seconds + + # Main interactive loop + while True: + try: + # Update session activity periodically + current_time = time.time() + if current_time - last_activity_update > update_interval: + update_session_activity(session_id) + last_activity_update = current_time - # Handle special commands starting with ! - if prompt.startswith('!'): - # Extract command and arguments - parts = prompt.split(maxsplit=1) - command = parts[0].lower() - args = parts[1] if len(parts) > 1 else '' + # Get user input + user_input = input(f"\n{Colors.BOLD}qcmd> {Colors.END}").strip() - # Help command - if command == '!help': - display_help_command(current_model, current_temperature, auto_mode_enabled, max_attempts) + # Handle empty input + if not user_input: continue - # Exit command - elif command in ['!exit', '!quit']: - print("Exiting QCMD interactive shell...") + # Save to readline history + readline.add_history(user_input) + + # Handle special commands + if user_input.lower() in ('/exit', '/quit'): + print(f"{Colors.CYAN}Goodbye!{Colors.END}") + # End the session when exiting + end_session(session_id) break - # Clear screen - elif command == '!clear': - os.system('clear' if os.name == 'posix' else 'cls') + elif user_input.lower() == '/help': + _show_shell_help() continue - # Show history - elif command == '!history': - count = 20 - if args and args.isdigit(): - count = int(args) - show_history(count) + elif user_input.lower() == '/history': + # Show command history + if session_history: + print(f"\n{Colors.CYAN}Command History (this session):{Colors.END}") + for i, (cmd_desc, cmd) in enumerate(session_history, 1): + print(f"{Colors.BLUE}{i}.{Colors.END} {cmd_desc}") + print(f" {Colors.GREEN}{cmd}{Colors.END}") + else: + print(f"{Colors.YELLOW}No commands in this session yet.{Colors.END}") continue - # Search history - elif command == '!search': - if not args: - print(f"{Colors.YELLOW}Usage: !search {Colors.END}") - continue - show_history(search_term=args) + elif user_input.lower() == '/models': + # List available models + models = list_models() + if models: + print(f"\n{Colors.CYAN}Available Models:{Colors.END}") + for i, model in enumerate(models, 1): + current = " (current)" if model == current_model else "" + print(f"{i}. {Colors.GREEN}{model}{Colors.END}{current}") + else: + print(f"{Colors.YELLOW}No models available or could not connect to Ollama.{Colors.END}") continue - # Show system status - elif command == '!status': + elif user_input.lower() == '/status': + # Show system status + print(f"\n{Colors.CYAN}Getting system status...{Colors.END}") display_system_status() continue - # List available models - elif command == '!models': - from ..core.command_generator import list_models - models = list_models() - if models: - print(f"\n{Colors.CYAN}Available models:{Colors.END}") - for model in models: - if model == current_model: - print(f" {Colors.GREEN}{Colors.BOLD}* {model} (current){Colors.END}") + elif user_input.lower() == '/update': + # Check for updates + display_update_status() + continue + + elif user_input.lower().startswith('/model '): + # Switch models + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + try: + # Check if input is a number (index from listed models) + if parts[1].isdigit(): + idx = int(parts[1]) - 1 + models = list_models() + if 0 <= idx < len(models): + current_model = models[idx] + print(f"Switched to model: {Colors.GREEN}{current_model}{Colors.END}") + else: + print(f"{Colors.YELLOW}Invalid model index.{Colors.END}") else: - print(f" {Colors.YELLOW}- {model}{Colors.END}") + # Direct model name + current_model = parts[1] + print(f"Switched to model: {Colors.GREEN}{current_model}{Colors.END}") + except Exception as e: + print(f"{Colors.YELLOW}Error switching models: {e}{Colors.END}") else: - print(f"{Colors.YELLOW}No models found. Make sure Ollama is running.{Colors.END}") + print(f"{Colors.YELLOW}Usage: /model {Colors.END}") continue - # Change model - elif command == '!model': - if not args: - print(f"{Colors.YELLOW}Current model: {current_model}{Colors.END}") - print(f"{Colors.YELLOW}Usage: !model {Colors.END}") - continue - current_model = args - print(f"{Colors.GREEN}Model changed to: {current_model}{Colors.END}") - - # Update config - config = load_config() - config['model'] = current_model - from ..config.settings import save_config - save_config(config) + elif user_input.lower().startswith('/temperature '): + # Set temperature + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + try: + temp = float(parts[1]) + if 0.0 <= temp <= 1.0: + current_temperature = temp + print(f"Temperature set to: {Colors.GREEN}{current_temperature}{Colors.END}") + else: + print(f"{Colors.YELLOW}Temperature must be between 0.0 and 1.0{Colors.END}") + except ValueError: + print(f"{Colors.YELLOW}Invalid temperature value. Use a number between 0.0 and 1.0{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /temperature {Colors.END}") continue - # Set temperature - elif command in ['!temp', '!temperature']: - if not args: - print(f"{Colors.YELLOW}Current temperature: {current_temperature}{Colors.END}") - print(f"{Colors.YELLOW}Usage: !temperature (between 0.0 and 1.0){Colors.END}") - continue + elif user_input.lower() == '/auto': + # Enable auto mode + auto_mode_enabled = True + print(f"{Colors.GREEN}Auto mode enabled.{Colors.END}") + continue - try: - value = float(args) - if 0.0 <= value <= 1.0: - current_temperature = value - print(f"{Colors.GREEN}Temperature set to: {current_temperature}{Colors.END}") - - # Update config - config = load_config() - config['temperature'] = current_temperature - from ..config.settings import save_config - save_config(config) - else: - print(f"{Colors.RED}Temperature must be between 0.0 and 1.0{Colors.END}") - except ValueError: - print(f"{Colors.RED}Invalid temperature value. Please enter a number between 0.0 and 1.0{Colors.END}") + elif user_input.lower() == '/manual': + # Disable auto mode + auto_mode_enabled = False + print(f"{Colors.GREEN}Auto mode disabled.{Colors.END}") continue - # Toggle auto mode - elif command == '!auto': - if args.lower() in ['on', 'yes', 'true', '1']: - auto_mode_enabled = True - print(f"{Colors.GREEN}Auto-correction mode enabled{Colors.END}") - elif args.lower() in ['off', 'no', 'false', '0']: - auto_mode_enabled = False - print(f"{Colors.GREEN}Auto-correction mode disabled{Colors.END}") - else: - print(f"{Colors.YELLOW}Current auto-correction mode: {'Enabled' if auto_mode_enabled else 'Disabled'}{Colors.END}") - print(f"{Colors.YELLOW}Usage: !auto on|off{Colors.END}") + elif user_input.lower() == '/analyze': + # Toggle error analysis + analyze_errors = not analyze_errors + status = "enabled" if analyze_errors else "disabled" + print(f"{Colors.GREEN}Error analysis {status}.{Colors.END}") continue - # Set max attempts - elif command == '!max': - if not args: - print(f"{Colors.YELLOW}Current max attempts: {max_attempts}{Colors.END}") - print(f"{Colors.YELLOW}Usage: !max {Colors.END}") - continue + elif user_input.lower() == '/logs': + # Find and analyze log files + handle_log_analysis(current_model) + continue - try: - value = int(args) - if value > 0: - max_attempts = value - print(f"{Colors.GREEN}Max attempts set to: {max_attempts}{Colors.END}") - - # Update config - config = load_config() - config['max_attempts'] = max_attempts - from ..config.settings import save_config - save_config(config) + elif user_input.lower().startswith('/analyze-file '): + # Analyze a specific file + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + file_path = os.path.expanduser(parts[1]) + if os.path.isfile(file_path): + analyze_log_file(file_path, current_model) else: - print(f"{Colors.RED}Max attempts must be greater than 0{Colors.END}") - except ValueError: - print(f"{Colors.RED}Invalid value. Please enter a positive integer{Colors.END}") + print(f"{Colors.YELLOW}File not found: {file_path}{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /analyze-file {Colors.END}") continue - # Check for updates - elif command == '!update': - check_for_updates(force_display=True) + elif user_input.lower().startswith('/monitor '): + # Monitor a specific file with AI analysis + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + file_path = os.path.expanduser(parts[1]) + if os.path.isfile(file_path): + analyze_log_file(file_path, current_model, background=True) + else: + print(f"{Colors.YELLOW}File not found: {file_path}{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /monitor {Colors.END}") continue - # Log analysis - elif command == '!logs': - handle_log_analysis(current_model, args if args else None) + elif user_input.lower() == '/execute': + # Execute last command + if session_history: + _, last_cmd = session_history[-1] + print(f"\n{Colors.CYAN}Executing:{Colors.END} {Colors.GREEN}{last_cmd}{Colors.END}") + + # Confirm execution + confirm = input(f"Press {Colors.YELLOW}Enter{Colors.END} to execute or Ctrl+C to cancel: ") + + # Execute the command + exit_code, output = execute_command(last_cmd, analyze_errors, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + if analyze_errors and exit_code != 0: + _analyze_and_fix_error(last_cmd, output, current_model) + else: + print(f"{Colors.YELLOW}No commands in history to execute.{Colors.END}") continue - # Config command - elif command == '!config': - from ..config.settings import handle_config_command - handle_config_command(args) + # Process regular input as a command request + print(f"\n{Colors.CYAN}Generating command...{Colors.END}") + + # Generate the command + command = generate_command(user_input, current_model, current_temperature) + + # Display the generated command + if command.startswith("Error:"): + print(f"{Colors.RED}{command}{Colors.END}") continue - # Repeat last command - elif command == '!!': - if last_prompt: - prompt = last_prompt - print(f"{Colors.YELLOW}Repeating: {prompt}{Colors.END}") - else: - print(f"{Colors.YELLOW}No previous command to repeat{Colors.END}") - continue + print(f"{Colors.GREEN}{command}{Colors.END}") - # Unknown command - else: - print(f"{Colors.RED}Unknown command: {command}{Colors.END}") - print(f"{Colors.YELLOW}Type !help for a list of available commands{Colors.END}") - continue - - # If we get here, it's a natural language command - - # Save current prompt for potential repeat - last_prompt = prompt - - # Save to history - save_to_history(prompt) - - # Run in auto mode if enabled - if auto_mode_enabled: - auto_mode(prompt, current_model, max_attempts, current_temperature) - continue - - # Regular mode: generate command and ask for confirmation - command = generate_command(prompt, current_model, current_temperature) - - if not command: - print(f"{Colors.RED}Failed to generate a command.{Colors.END}") - continue + # Check for potentially dangerous commands + if is_dangerous_command(command): + print(f"\n{Colors.RED}WARNING: This command may be potentially dangerous!{Colors.END}") + print(f"{Colors.RED}Review it carefully before execution.{Colors.END}") + + # Add to session history + session_history.append((user_input, command)) - print(f"\n{Colors.CYAN}Generated command:{Colors.END}") - print(f"{Colors.GREEN}{command}{Colors.END}\n") - - # Ask for confirmation - confirmation = input(f"{Colors.BOLD}Execute this command? (y/n/e to edit): {Colors.END}").strip().lower() - - if confirmation == 'y': - # Execute the command - print(f"\n{Colors.CYAN}Executing command...{Colors.END}\n") - return_code, output = execute_command(command) + # Save to global history + save_to_history(user_input) - if return_code == 0: - print(f"\n{Colors.GREEN}Command executed successfully.{Colors.END}") - else: - print(f"\n{Colors.RED}Command failed with return code {return_code}{Colors.END}") + # Handle auto mode + if auto_mode_enabled: + print(f"\n{Colors.CYAN}Auto-executing command...{Colors.END}") - if output: - print(f"\n{Colors.BOLD}Output:{Colors.END}") - print(output) - elif confirmation == 'e': - # Allow editing the command - edited_command = input(f"{Colors.BOLD}Edit command: {Colors.END}") - if edited_command: - command = edited_command - print(f"\n{Colors.CYAN}Executing edited command...{Colors.END}\n") - return_code, output = execute_command(command) + # Execute the command + exit_code, output = execute_command(command, False, current_model) - if return_code == 0: - print(f"\n{Colors.GREEN}Command executed successfully.{Colors.END}") - else: - print(f"\n{Colors.RED}Command failed with return code {return_code}{Colors.END}") - - if output: - print(f"\n{Colors.BOLD}Output:{Colors.END}") - print(output) + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # Handle errors in auto mode + if exit_code != 0: + _auto_fix_and_execute(command, output, current_model, max_attempts) else: - print(f"{Colors.YELLOW}Command execution cancelled.{Colors.END}") - else: - print(f"{Colors.YELLOW}Command execution cancelled.{Colors.END}") + # Interactive mode with options to execute, edit, or skip + print(f"\n{Colors.CYAN}Options:{Colors.END}") + print(f" {Colors.YELLOW}y{Colors.END} - Execute the command") + print(f" {Colors.YELLOW}n{Colors.END} - Skip execution") + print(f" {Colors.YELLOW}e{Colors.END} - Edit command before execution") + + while True: + try: + choice = input(f"\n{Colors.BOLD}Enter your choice (y/n/e):{Colors.END} ").strip().lower() + + if choice == 'y': + # Execute the command + exit_code, output = execute_command(command, analyze_errors, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # Handle errors if enabled + if analyze_errors and exit_code != 0: + _analyze_and_fix_error(command, output, current_model) + break + + elif choice == 'n': + print(f"{Colors.YELLOW}Command execution skipped.{Colors.END}") + break + + elif choice == 'e': + print(f"{Colors.CYAN}Edit the command:{Colors.END}") + # Display original command for reference + print(f"{Colors.GREEN}Original: {command}{Colors.END}") + + # Allow user to edit + try: + # Pre-populate the input with the current command + if 'readline' in sys.modules: + readline.set_startup_hook(lambda: readline.insert_text(command)) + + edited_command = input(f"{Colors.BOLD}Edit> {Colors.END}").strip() + + # Reset the startup hook + if 'readline' in sys.modules: + readline.set_startup_hook(None) + + if edited_command: + # Update command + command = edited_command + print(f"\n{Colors.CYAN}Updated command:{Colors.END} {Colors.GREEN}{command}{Colors.END}") + + # Update session history + session_history[-1] = (user_input, command) + + # Ask for execution confirmation + sub_choice = input(f"\n{Colors.BOLD}Execute this command now? (y/n):{Colors.END} ").strip().lower() + if sub_choice == 'y': + # Execute the command + exit_code, output = execute_command(command, analyze_errors, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # Handle errors if enabled + if analyze_errors and exit_code != 0: + _analyze_and_fix_error(command, output, current_model) + else: + print(f"{Colors.YELLOW}No changes made to the command.{Colors.END}") + except KeyboardInterrupt: + print("\nEditing cancelled") + finally: + # Reset the startup hook + if 'readline' in sys.modules: + readline.set_startup_hook(None) + break + + else: + print(f"{Colors.YELLOW}Invalid choice. Please enter 'y', 'n', or 'e'.{Colors.END}") + except KeyboardInterrupt: + print("\nOperation cancelled") + break - except EOFError: - # Handle Ctrl+D - print("\nExiting QCMD interactive shell...") - break - except KeyboardInterrupt: - # Handle Ctrl+C more gracefully - print("\nCommand interrupted. Type !exit to quit.") + # Update activity after processing a command + update_session_activity(session_id) + last_activity_update = time.time() + + except KeyboardInterrupt: + print("\nInterrupted") + continue + + except EOFError: + print(f"\n{Colors.CYAN}Goodbye!{Colors.END}") + # End the session when exiting + end_session(session_id) + break + + except Exception as e: + print(f"\n{Colors.RED}Error: {str(e)}{Colors.END}") + finally: + # End the session when exiting, even if there was an unhandled exception + end_session(session_id) + + # Save history on exit + try: + readline.write_history_file(history_file) except Exception as e: - print(f"{Colors.RED}Error: {e}{Colors.END}") + print(f"{Colors.YELLOW}Could not save shell history: {e}{Colors.END}", file=sys.stderr) + +def _show_shell_help() -> None: + """Display help information for the interactive shell.""" + print(f"\n{Colors.CYAN}QCMD Interactive Shell Commands:{Colors.END}") + print(f"{Colors.YELLOW}/help{Colors.END} - Show this help message") + print(f"{Colors.YELLOW}/exit{Colors.END}, {Colors.YELLOW}/quit{Colors.END} - Exit the shell") + print(f"{Colors.YELLOW}/history{Colors.END} - Show command history for this session") + print(f"{Colors.YELLOW}/models{Colors.END} - List available models") + print(f"{Colors.YELLOW}/model {Colors.END} - Switch to a different model") + print(f"{Colors.YELLOW}/status{Colors.END} - Show system status information") + print(f"{Colors.YELLOW}/update{Colors.END} - Check for QCMD updates") + print(f"{Colors.YELLOW}/temperature {Colors.END} - Set temperature (0.0-1.0)") + print(f"{Colors.YELLOW}/auto{Colors.END} - Enable auto mode") + print(f"{Colors.YELLOW}/manual{Colors.END} - Disable auto mode") + print(f"{Colors.YELLOW}/analyze{Colors.END} - Toggle error analysis") + print(f"{Colors.YELLOW}/execute{Colors.END} - Execute last generated command (with confirmation)") + print(f"{Colors.YELLOW}/logs{Colors.END} - Find and analyze log files") + print(f"{Colors.YELLOW}/analyze-file {Colors.END} - Analyze a specific file") + print(f"{Colors.YELLOW}/monitor {Colors.END} - Monitor a file continuously") + + print(f"\n{Colors.CYAN}Command Execution Options:{Colors.END}") + print(f"When a command is generated, you'll be presented with these options:") + print(f" {Colors.YELLOW}y{Colors.END} - Execute the generated command") + print(f" {Colors.YELLOW}n{Colors.END} - Skip execution of the command") + print(f" {Colors.YELLOW}e{Colors.END} - Edit the command before execution") + + print(f"\n{Colors.CYAN}Commands:{Colors.END}") + print("Just type a natural language description of what you want to do.") + print("Examples:") + print(f" {Colors.GREEN}find all log files in /var/log{Colors.END}") + print(f" {Colors.GREEN}search for errors in apache logs{Colors.END}") + print(f" {Colors.GREEN}show top 10 processes by CPU usage{Colors.END}") + +def _analyze_and_fix_error(command: str, output: str, model: str) -> None: + """ + Analyze and provide a fix for a failed command. + + Args: + command: The failed command + output: The error output + model: Model to use for analysis + """ + from qcmd_cli.core.command_generator import analyze_error, fix_command + + print(f"\n{Colors.CYAN}Analyzing error...{Colors.END}") + analysis = analyze_error(output, command, model) + print(f"\n{Colors.CYAN}Analysis:{Colors.END}\n{analysis}") + + print(f"\n{Colors.CYAN}Suggesting fixed command...{Colors.END}") + fixed_command = fix_command(command, output, model) + + if fixed_command and not fixed_command.startswith("Error:"): + print(f"\n{Colors.GREEN}{fixed_command}{Colors.END}") + + # Ask if user wants to execute the fixed command + try: + confirm = input(f"\nExecute this command? (y/n): ").strip().lower() + if confirm == 'y': + exit_code, new_output = execute_command(fixed_command, False, model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + except KeyboardInterrupt: + print("\nCancelled") + else: + print(f"\n{Colors.YELLOW}Could not generate a fixed command: {fixed_command}{Colors.END}") + +def _auto_fix_and_execute(command: str, output: str, model: str, max_attempts: int) -> None: + """ + Automatically fix and execute a failed command multiple times if needed. + + Args: + command: The failed command + output: The error output + model: Model to use for fixing + max_attempts: Maximum number of fix attempts + """ + from qcmd_cli.core.command_generator import fix_command + + original_command = command + attempts = 1 + + while attempts <= max_attempts: + print(f"\n{Colors.CYAN}Auto-fix attempt {attempts}/{max_attempts}...{Colors.END}") + + # Generate fixed command + fixed_command = fix_command(command, output, model) + + if fixed_command and not fixed_command.startswith("Error:") and fixed_command != command: + print(f"\n{Colors.GREEN}{fixed_command}{Colors.END}") + + # Execute the fixed command + print(f"\n{Colors.CYAN}Executing fixed command...{Colors.END}") + exit_code, new_output = execute_command(fixed_command, False, model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # If successful, break the loop + if exit_code == 0: + break + + # Update for next attempt + command = fixed_command + output = new_output + else: + print(f"\n{Colors.YELLOW}No better solution found after {attempts} attempts.{Colors.END}") + break + + attempts += 1 + + if attempts > max_attempts: + print(f"\n{Colors.YELLOW}Maximum fix attempts reached. Could not fix the command.{Colors.END}") + + # Provide a summary + if exit_code == 0: + print(f"\n{Colors.GREEN}Successfully fixed and executed the command.{Colors.END}") + else: + print(f"\n{Colors.YELLOW}Could not fix and execute the command after {attempts-1} attempts.{Colors.END}") + print(f"{Colors.YELLOW}Original command: {original_command}{Colors.END}") + print(f"{Colors.YELLOW}Last attempt: {command}{Colors.END}") + +def _display_banner(): + """Display the QCMD banner.""" + print() + print(f"╔═════════════════════════════════════════════════════════════╗") + print(f"║ ║") + print(f"║ ██████╗ ██████╗███╗ ███╗██████╗ ║") + print(f"║ ██╔═══██╗██╔════╝████╗ ████║██╔══██╗ ║") + print(f"║ ██║ ██║██║ ██╔████╔██║██║ ██║ ║") + print(f"║ ██║▄▄ ██║██║ ██║╚██╔╝██║██║ ██║ ║") + print(f"║ ╚██████╔╝╚██████╗██║ ╚═╝ ██║██████╔╝ ║") + print(f"║ ╚══▀▀═╝ ╚═════╝╚═╝ ╚═╝╚═════╝ ║") + + # Get QCMD version + try: + from importlib.metadata import version + qcmd_version = version("ibrahimiq-qcmd") + version_text = f"v{qcmd_version}" + except: + version_text = "v1.0.10" # Fallback version + + print(f"║ {version_text} ║") + print(f"║ ║") + print(f"╚═════════════════════════════════════════════════════════════╝") def auto_mode(prompt: str, model: str = DEFAULT_MODEL, max_attempts: int = 3, temperature: float = 0.7) -> None: """ @@ -380,4 +661,6 @@ def auto_mode(prompt: str, model: str = DEFAULT_MODEL, max_attempts: int = 3, te command = fix_command(command, output, model) # Add a small delay to make the process more readable - time.sleep(1) \ No newline at end of file + time.sleep(1) + + diff --git a/qcmd_cli/log_analysis/analyzer.py b/qcmd_cli/log_analysis/analyzer.py index 0b57b5a..a348bf7 100644 --- a/qcmd_cli/log_analysis/analyzer.py +++ b/qcmd_cli/log_analysis/analyzer.py @@ -10,10 +10,41 @@ from typing import List, Dict, Optional, Tuple, Any # Import from local modules once they are created -# from ..config.settings import DEFAULT_MODEL +from ..config.settings import DEFAULT_MODEL +from ..ui.display import Colors -# For now, define defaults here -DEFAULT_MODEL = "llama3:latest" +def handle_log_analysis(model: str = DEFAULT_MODEL, specific_file: str = None) -> None: + """ + Handle log analysis workflow - prompting user to select log files and analyzing them. + + Args: + model: Model to use for analysis + specific_file: Optional specific file to analyze + """ + if specific_file: + if os.path.exists(specific_file): + print(f"\n{Colors.GREEN}Analyzing log file: {specific_file}{Colors.END}") + analyze_log_file(specific_file, model) + else: + print(f"\n{Colors.RED}File not found: {specific_file}{Colors.END}") + return + + # Import here to avoid circular imports + from .log_files import find_log_files, select_log_file + + # Find log files + log_files = find_log_files() + + if not log_files: + print(f"\n{Colors.YELLOW}No log files found.{Colors.END}") + print(f"Try specifying a path with: qcmd logs /path/to/logs") + return + + # Let user select a log file + selected_file = select_log_file(log_files) + + if selected_file: + analyze_log_file(selected_file, model) def analyze_log_file(log_file: str, model: str = DEFAULT_MODEL, background: bool = False, analyze: bool = True) -> None: """ @@ -25,8 +56,25 @@ def analyze_log_file(log_file: str, model: str = DEFAULT_MODEL, background: bool background: Whether to run in background mode analyze: Whether to perform analysis (vs just monitoring) """ - # Implementation will be moved from original qcmd.py - pass + # Check if file exists + if not os.path.exists(log_file): + print(f"{Colors.RED}Error: File {log_file} not found.{Colors.END}") + return + + print(f"\n{Colors.CYAN}Analyzing log file: {log_file}{Colors.END}") + + # Read file content + try: + content = read_large_file(log_file) + if not content: + print(f"{Colors.YELLOW}Log file is empty.{Colors.END}") + return + + # Perform analysis + analyze_log_content(content, log_file, model) + + except Exception as e: + print(f"{Colors.RED}Error analyzing log file: {str(e)}{Colors.END}") def analyze_log_content(log_content: str, log_file: str, model: str = DEFAULT_MODEL) -> None: """ @@ -37,8 +85,21 @@ def analyze_log_content(log_content: str, log_file: str, model: str = DEFAULT_MO log_file: Path to the log file (for reference) model: Model to use for analysis """ - # Implementation will be moved from original qcmd.py - pass + print(f"\n{Colors.CYAN}Analyzing log content using {model}...{Colors.END}") + + # Basic implementation - in a real application, this would use an LLM via Ollama API + print(f"\n{Colors.GREEN}Log Analysis Results:{Colors.END}") + print(f"File: {log_file}") + print(f"Size: {len(log_content)} bytes") + + # Count lines and errors (simple heuristic) + lines = log_content.splitlines() + error_count = sum(1 for line in lines if "error" in line.lower() or "exception" in line.lower()) + + print(f"Total lines: {len(lines)}") + print(f"Potential errors/exceptions: {error_count}") + + # In a complete implementation, we would call the LLM to analyze the log content def read_large_file(file_path: str, chunk_size: int = 1024 * 1024) -> str: """ @@ -51,5 +112,11 @@ def read_large_file(file_path: str, chunk_size: int = 1024 * 1024) -> str: Returns: Content of the file as a string """ - # Implementation will be moved from original qcmd.py - pass \ No newline at end of file + content = [] + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + while True: + chunk = file.read(chunk_size) + if not chunk: + break + content.append(chunk) + return "".join(content) \ No newline at end of file diff --git a/qcmd_cli/ui/colors.py b/qcmd_cli/ui/colors.py new file mode 100644 index 0000000..dc2af78 --- /dev/null +++ b/qcmd_cli/ui/colors.py @@ -0,0 +1,28 @@ +"""Terminal color codes module.""" + +class Colors: + """Terminal color codes.""" + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + MAGENTA = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' + + @staticmethod + def disable(): + """Disable colors by setting all color codes to empty strings.""" + Colors.RED = '' + Colors.GREEN = '' + Colors.YELLOW = '' + Colors.BLUE = '' + Colors.MAGENTA = '' + Colors.CYAN = '' + Colors.WHITE = '' + Colors.BOLD = '' + Colors.UNDERLINE = '' + Colors.END = '' \ No newline at end of file diff --git a/qcmd_cli/ui/display.py b/qcmd_cli/ui/display.py index d1fe206..9f8a002 100644 --- a/qcmd_cli/ui/display.py +++ b/qcmd_cli/ui/display.py @@ -7,7 +7,7 @@ import time import re import shutil -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any class Colors: """ @@ -224,4 +224,76 @@ def display_help_command(current_model: str, current_temperature: float, auto_mo - Type 'y' to execute, 'n' to reject, or edit the command - Use !exit or Ctrl+D to quit """ - print(help_text) \ No newline at end of file + print(help_text) + +def clear_screen(): + """ + Clear the terminal screen. + """ + # Clear command based on OS + if os.name == 'nt': # For Windows + os.system('cls') + else: # For Linux/Mac + os.system('clear') + +def display_system_status(status: Dict[str, Any]) -> None: + """ + Display detailed system status information. + + Args: + status: Dictionary with system status information + """ + # Print divider line + print(f"\n{Colors.CYAN}{'-' * 80}{Colors.END}") + + # System information + print(f"\n{Colors.RED}{Colors.BOLD}System Information:{Colors.END}") + print(f" {Colors.BLUE}OS:{Colors.END} {status.get('os', 'Unknown')}") + print(f" {Colors.BLUE}Python Version:{Colors.END} {status.get('python_version', 'Unknown')}") + print(f" {Colors.BLUE}QCMD Version:{Colors.END} {status.get('qcmd_version', 'Unknown')}") + print(f" {Colors.BLUE}Current Time:{Colors.END} {status.get('time', 'Unknown')}") + + # Ollama information + if 'ollama' in status: + ollama = status['ollama'] + print(f"\n{Colors.RED}{Colors.BOLD}Ollama Status:{Colors.END}") + + # Check if Ollama is running + if ollama.get('status', '') == 'running': + print(f" {Colors.BLUE}Status:{Colors.END} {Colors.GREEN}Running{Colors.END}") + else: + print(f" {Colors.BLUE}Status:{Colors.END} {Colors.RED}Not Running{Colors.END}") + if 'error' in ollama: + print(f" {Colors.BLUE}Error:{Colors.END} {ollama['error']}") + + print(f" {Colors.BLUE}API URL:{Colors.END} {ollama.get('api_url', 'Unknown')}") + + # List available models + if 'models' in ollama and ollama['models']: + print(f" {Colors.BLUE}Available Models:{Colors.END}") + for model in ollama['models']: + print(f" - {model}") + elif ollama.get('status', '') == 'running': + print(f" {Colors.BLUE}Available Models:{Colors.END} No models found") + + # Active monitors + if 'active_monitors' in status and status['active_monitors']: + print(f"\n{Colors.RED}{Colors.BOLD}Active Log Monitors:{Colors.END}") + for monitor in status['active_monitors']: + print(f" - {monitor}") + + # Active sessions + if 'active_sessions' in status and status['active_sessions']: + print(f"\n{Colors.RED}{Colors.BOLD}Active Sessions:{Colors.END}") + for session in status['active_sessions']: + print(f" - {session}") + + # Disk space + if 'disk' in status: + disk = status['disk'] + print(f"\n{Colors.RED}{Colors.BOLD}Disk Space:{Colors.END}") + print(f" {Colors.BLUE}Total:{Colors.END} {disk.get('total_gb', 'Unknown')} GB") + print(f" {Colors.BLUE}Used:{Colors.END} {disk.get('used_gb', 'Unknown')} GB ({disk.get('percent_used', 'Unknown')}%)") + print(f" {Colors.BLUE}Free:{Colors.END} {disk.get('free_gb', 'Unknown')} GB") + + print(f"\n{Colors.CYAN}{'-' * 80}{Colors.END}") \ No newline at end of file diff --git a/qcmd_cli/utils/ollama.py b/qcmd_cli/utils/ollama.py new file mode 100644 index 0000000..71474f7 --- /dev/null +++ b/qcmd_cli/utils/ollama.py @@ -0,0 +1,15 @@ +def is_ollama_running(): + """ + Check if the Ollama API is running and accessible. + + Returns: + bool: True if the Ollama API is running, False otherwise + """ + from qcmd_cli.config.constants import OLLAMA_API + import requests + + try: + response = requests.get(f"{OLLAMA_API}/tags", timeout=2) + return response.status_code == 200 + except requests.exceptions.RequestException: + return False \ No newline at end of file diff --git a/qcmd_cli/utils/session.py b/qcmd_cli/utils/session.py index a863a70..f0abca9 100644 --- a/qcmd_cli/utils/session.py +++ b/qcmd_cli/utils/session.py @@ -7,6 +7,7 @@ import time import signal import sys +import uuid from typing import Dict, List, Optional, Any from ..config.settings import CONFIG_DIR @@ -14,6 +15,53 @@ # File path for storing session info SESSIONS_FILE = os.path.join(CONFIG_DIR, "sessions.json") +def create_session(session_info: Dict[str, Any]) -> str: + """ + Create a new session and save it to persistent storage. + + Args: + session_info: Dictionary with session information + + Returns: + Session ID as a string + """ + # Generate a unique session ID + session_id = str(uuid.uuid4()) + + # Add metadata + session_info['pid'] = os.getpid() + session_info['created_at'] = time.time() + session_info['last_activity'] = time.time() + + # Save to storage + save_session(session_id, session_info) + + return session_id + +def update_session_activity(session_id: str) -> bool: + """ + Update the last activity timestamp for a session. + + Args: + session_id: ID of the session to update + + Returns: + True if successful, False otherwise + """ + try: + sessions = load_sessions() + if session_id in sessions: + sessions[session_id]['last_activity'] = time.time() + + with open(SESSIONS_FILE, 'w') as f: + json.dump(sessions, f, indent=2) + + return True + return False + except Exception as e: + print(f"Error updating session activity: {e}", file=sys.stderr) + return False + def save_session(session_id, session_info): """ Save session information to persistent storage. diff --git a/qcmd_cli/utils/system.py b/qcmd_cli/utils/system.py index 7e213e4..e3ad312 100644 --- a/qcmd_cli/utils/system.py +++ b/qcmd_cli/utils/system.py @@ -9,8 +9,10 @@ import requests import json import shutil +import time +import re from datetime import datetime -from typing import Dict, Any, Tuple, List +from typing import Dict, Any, Tuple, List, Optional from ..ui.display import Colors from ..config.settings import CONFIG_DIR, load_config, DEFAULT_MODEL @@ -199,48 +201,185 @@ def display_system_status(): filled_length = int(bar_width * percent / 100) bar = f"{Colors.GREEN}{'█' * filled_length}{Colors.YELLOW}{'░' * (bar_width - filled_length)}{Colors.END}" - print(f" {Colors.BOLD}•{Colors.END} Total: {Colors.YELLOW}{total_gb:.2f} GB{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Used: {Colors.YELLOW}{used_gb:.2f} GB{Colors.END} ({percent:.1f}%)") - print(f" {Colors.BOLD}•{Colors.END} Free: {Colors.YELLOW}{free_gb:.2f} GB{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Usage: {bar}") + print(f" {Colors.BOLD}•{Colors.END} Space on {Colors.YELLOW}{CONFIG_DIR}{Colors.END}:") + print(f" {Colors.BOLD}•{Colors.END} Used: {Colors.YELLOW}{used_gb:.2f} GB{Colors.END} / Free: {Colors.YELLOW}{free_gb:.2f} GB{Colors.END} / Total: {Colors.YELLOW}{total_gb:.2f} GB{Colors.END}") + print(f" {Colors.BOLD}•{Colors.END} Usage: {Colors.YELLOW}{percent:.1f}%{Colors.END}") + print(f" {bar}") else: - print(f" {Colors.YELLOW}Could not get disk space information.{Colors.END}") + print(f" {Colors.YELLOW}Configuration directory not found.{Colors.END}") - # Configuration section - print(f"\n{Colors.CYAN}{Colors.BOLD}► CONFIGURATION{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Default Model: {Colors.YELLOW}{config.get('model', DEFAULT_MODEL)}{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Temperature: {Colors.YELLOW}{config.get('temperature', 0.7)}{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Auto-Correction Max Attempts: {Colors.YELLOW}{config.get('max_attempts', 3)}{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Config Directory: {Colors.YELLOW}{CONFIG_DIR}{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Show Banner: {Colors.YELLOW}{config.get('ui', {}).get('show_iraq_banner', True)}{Colors.END}") - print(f" {Colors.BOLD}•{Colors.END} Show Progress Bar: {Colors.YELLOW}{config.get('ui', {}).get('show_progress_bar', True)}{Colors.END}") + # Add update status + print(f"\n{Colors.CYAN}{Colors.BOLD}► UPDATE STATUS{Colors.END}") + update_info = check_for_updates(False) + if update_info: + current_version = update_info.get('current_version', 'Unknown') + latest_version = update_info.get('latest_version', 'Unknown') + update_available = update_info.get('update_available', False) + + if update_available: + print(f" {Colors.BOLD}•{Colors.END} Update Available: {Colors.GREEN}Yes{Colors.END}") + print(f" {Colors.BOLD}•{Colors.END} Current Version: {Colors.YELLOW}{current_version}{Colors.END}") + print(f" {Colors.BOLD}•{Colors.END} Latest Version: {Colors.GREEN}{latest_version}{Colors.END}") + print(f" {Colors.BOLD}•{Colors.END} Update Command: {Colors.GREEN}pip install --upgrade ibrahimiq-qcmd{Colors.END}") + else: + print(f" {Colors.BOLD}•{Colors.END} Status: {Colors.GREEN}Up to date{Colors.END}") + print(f" {Colors.BOLD}•{Colors.END} Version: {Colors.YELLOW}{current_version}{Colors.END}") + else: + print(f" {Colors.YELLOW}Could not check for updates.{Colors.END}") + # Footer print(f"\n{Colors.BOLD}╚════════════════════════════════════════════════════════════════════════════════════════════════╝{Colors.END}\n") -def check_for_updates(force_display: bool = False) -> None: +def check_for_updates(force_display: bool = False) -> Optional[Dict[str, Any]]: """ - Check if there's a newer version of the package available on PyPI + Check for QCMD updates by querying PyPI. Args: - force_display: Whether to display a message even if no update is found + force_display: Whether to force displaying the update status + + Returns: + Dictionary with update information or None if check fails """ + # Get current version + current_version = __version__ + + # Load config to check if updates are disabled + config = load_config() + if not force_display and config.get('disable_update_check', False): + return None + + # Try to get the latest version from PyPI + latest_version = None try: - # Get installed version - use version from module directly - installed_version = __version__ - - # Check latest version on PyPI - response = requests.get("https://pypi.org/pypi/ibrahimiq-qcmd/json", timeout=3) + response = requests.get("https://pypi.org/pypi/ibrahimiq-qcmd/json", timeout=5) if response.status_code == 200: - latest_version = response.json()["info"]["version"] + data = response.json() + latest_version = data['info']['version'] + except Exception: + # If we can't connect to PyPI, just return None + return None + + # If we couldn't get the latest version, return None + if not latest_version: + return None + + # Compare versions + current_parts = current_version.split('.') + latest_parts = latest_version.split('.') + + update_available = False + + # Compare major, minor, patch versions + for c, l in zip(current_parts, latest_parts): + if int(l) > int(c): + update_available = True + break + elif int(l) < int(c): + # Current is newer (development version) + break + + # If different number of parts and no decision yet, + # the one with more parts is considered newer + if not update_available and len(latest_parts) > len(current_parts): + update_available = True + + result = { + 'current_version': current_version, + 'latest_version': latest_version, + 'update_available': update_available + } + + # Display the info if requested + if force_display: + if update_available: + print(f"\n{Colors.YELLOW}Update available!{Colors.END}") + print(f"Current version: {Colors.RED}{current_version}{Colors.END}") + print(f"Latest version: {Colors.GREEN}{latest_version}{Colors.END}") + print(f"To update, run: {Colors.GREEN}pip install --upgrade ibrahimiq-qcmd{Colors.END}") + else: + print(f"\n{Colors.GREEN}QCMD is up to date!{Colors.END}") + print(f"Current version: {Colors.GREEN}{current_version}{Colors.END}") + + return result + +def display_update_status() -> None: + """ + Display the update status with improved formatting. + """ + # Load config to check if updates are disabled + config = load_config() + if config.get('disable_update_check', False): + return + + update_info = check_for_updates(False) + if update_info and update_info.get('update_available', False): + current_version = update_info.get('current_version', 'Unknown') + latest_version = update_info.get('latest_version', 'Unknown') + + # Create a nice-looking update message + print(f"\n{Colors.YELLOW}╔═══════════════════════════════════════════════════════════════╗{Colors.END}") + print(f"{Colors.YELLOW}║ {Colors.BOLD}UPDATE AVAILABLE{Colors.END}{Colors.YELLOW} ║{Colors.END}") + print(f"{Colors.YELLOW}║ ║{Colors.END}") + print(f"{Colors.YELLOW}║ Current version: {Colors.RED}{current_version.ljust(10)}{Colors.YELLOW} ║{Colors.END}") + print(f"{Colors.YELLOW}║ Latest version: {Colors.GREEN}{latest_version.ljust(10)}{Colors.YELLOW} ║{Colors.END}") + print(f"{Colors.YELLOW}║ ║{Colors.END}") + print(f"{Colors.YELLOW}║ To update, run: ║{Colors.END}") + print(f"{Colors.YELLOW}║ {Colors.GREEN}pip install --upgrade ibrahimiq-qcmd{Colors.YELLOW} ║{Colors.END}") + print(f"{Colors.YELLOW}╚═══════════════════════════════════════════════════════════════╝{Colors.END}\n") + +def execute_command(command: str, analyze_errors: bool = False, model: str = None) -> Tuple[int, str]: + """ + Execute a shell command and return the exit code and output. + + Args: + command: The command to execute + analyze_errors: Whether to analyze errors if the command fails + model: Model to use for error analysis + + Returns: + Tuple of (exit_code, output) + """ + print(f"\n{Colors.CYAN}Executing:{Colors.END} {Colors.GREEN}{command}{Colors.END}") + + try: + # Run the command and capture output + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1 + ) + + output_lines = [] + for line in process.stdout: + print(line, end='') + output_lines.append(line) + + process.wait() + exit_code = process.returncode + output = ''.join(output_lines) - # Compare versions - if installed_version != latest_version: - print(f"\n{Colors.YELLOW}New version available: {Colors.BOLD}{latest_version}{Colors.END}") - print(f"{Colors.YELLOW}You have: {installed_version}{Colors.END}") - print(f"{Colors.YELLOW}Update with: {Colors.BOLD}pip install --upgrade ibrahimiq-qcmd{Colors.END}") - elif force_display: - print(f"{Colors.GREEN}You have the latest version: {Colors.BOLD}{installed_version}{Colors.END}") + return exit_code, output + except Exception as e: - if force_display: - print(f"{Colors.YELLOW}Could not check for updates: {e}{Colors.END}") - # If update check fails, just skip it silently otherwise \ No newline at end of file + error_msg = f"Error executing command: {str(e)}" + print(f"{Colors.RED}{error_msg}{Colors.END}") + return 1, error_msg + +def format_bytes(bytes_value): + """ + Format byte values to human-readable format. + + Args: + bytes_value: Number of bytes + + Returns: + Human-readable string representation + """ + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_value < 1024.0: + return f"{bytes_value:.2f} {unit}" + bytes_value /= 1024.0 + return f"{bytes_value:.2f} PB" \ No newline at end of file diff --git a/tests/test_display.py b/tests/test_display.py new file mode 100644 index 0000000..e8946a4 --- /dev/null +++ b/tests/test_display.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +""" +Tests for UI display functionality. +""" + +import unittest +import os +import sys +from unittest.mock import patch, MagicMock, call + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +from qcmd_cli.ui.display import ( + Colors, display_system_status, display_help_command, + clear_screen, print_cool_header +) + + +class TestDisplayFunctions(unittest.TestCase): + """Test the display functions in the UI module.""" + + @patch('builtins.print') + def test_display_system_status(self, mock_print): + """Test that system status is displayed correctly.""" + # Mock system status data + status_data = { + 'os': 'Linux 6.1.0-kali1-amd64', + 'python_version': '3.11.2', + 'qcmd_version': '0.4.1', + 'ollama_status': { + 'running': True, + 'version': '0.1.19', + 'models': ['llama3', 'phi3'] + }, + 'cpu_info': { + 'cores': 8, + 'usage': 35.2 + }, + 'memory_info': { + 'total': 16000000000, + 'available': 9000000000, + 'percent': 44.5 + }, + 'disk_info': { + 'total': 250000000000, + 'free': 150000000000, + 'percent': 40.0 + } + } + + # Call the function + display_system_status(status_data) + + # Verify that print was called + mock_print.assert_called() + + # Just verify that some of the data was used in print calls + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('Linux 6.1.0-kali1-amd64', all_print_output) + self.assertIn('3.11.2', all_print_output) + self.assertIn('0.4.1', all_print_output) + + @patch('builtins.print') + def test_display_help_command(self, mock_print): + """Test that help command is displayed correctly.""" + # Call the function + display_help_command("llama3", 0.7, False, 3) + + # Verify that print was called + mock_print.assert_called() + + # Just verify that some of the expected data appears in the output + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('QCMD', all_print_output) + self.assertIn('llama3', all_print_output) + self.assertIn('0.7', all_print_output) + + @patch('os.system') + def test_clear_screen(self, mock_system): + """Test clear_screen function.""" + # Call the function + clear_screen() + + # Verify system call was made + mock_system.assert_called_once() + + @patch('builtins.print') + def test_print_cool_header(self, mock_print): + """Test print_cool_header function.""" + # Call the function + print_cool_header() + + # Verify header was printed + mock_print.assert_called() + + # Check that the output includes ASCII art - look for typical parts + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('█', all_print_output) + self.assertIn('Iraqi Excellence', all_print_output) + + @patch('builtins.print') + def test_colors(self, mock_print): + """Test that the Colors class works correctly.""" + # Test using colors + print(f"{Colors.RED}Red Text{Colors.END}") + print(f"{Colors.GREEN}Green Text{Colors.END}") + print(f"{Colors.BOLD}Bold Text{Colors.END}") + + # Verify that print was called + mock_print.assert_called() + + # Check that the text appears in output + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('Red Text', all_print_output) + self.assertIn('Green Text', all_print_output) + self.assertIn('Bold Text', all_print_output) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..d655623 --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Tests for session management functionality. +""" + +import unittest +import os +import sys +import json +import tempfile +import time +from unittest.mock import patch, MagicMock + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +from qcmd_cli.utils.session import ( + save_session, load_sessions, create_session, update_session_activity, + end_session, cleanup_stale_sessions, is_process_running +) +from qcmd_cli.config.settings import CONFIG_DIR + + +class TestSessionManagement(unittest.TestCase): + """Test the session management functionality.""" + + def setUp(self): + """Set up a temporary sessions file for testing.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.sessions_file = os.path.join(self.temp_dir.name, "sessions.json") + self.sessions_patch = patch('qcmd_cli.utils.session.SESSIONS_FILE', self.sessions_file) + self.sessions_patch.start() + + def tearDown(self): + """Clean up temporary files and patches.""" + self.sessions_patch.stop() + self.temp_dir.cleanup() + + def test_save_and_load_session(self): + """Test that a session can be saved and loaded.""" + test_session = { + "type": "test_session", + "model": "test-model", + "start_time": "2023-01-01 12:00:00", + "pid": 12345 + } + + # Test saving + result = save_session("test-session-id", test_session) + self.assertTrue(result) + + # Verify file was created + self.assertTrue(os.path.exists(self.sessions_file)) + + # Test loading + sessions = load_sessions() + self.assertIn("test-session-id", sessions) + self.assertEqual(sessions["test-session-id"]["type"], "test_session") + self.assertEqual(sessions["test-session-id"]["model"], "test-model") + + def test_create_session(self): + """Test creating a new session.""" + session_info = { + "type": "interactive_shell", + "model": "llama3" + } + + with patch('os.getpid', return_value=54321): + session_id = create_session(session_info) + + # Verify session ID format (should be a UUID) + self.assertTrue(len(session_id) > 30) + + # Verify session was saved + sessions = load_sessions() + self.assertIn(session_id, sessions) + + # Verify metadata was added + saved_session = sessions[session_id] + self.assertEqual(saved_session["type"], "interactive_shell") + self.assertEqual(saved_session["model"], "llama3") + self.assertEqual(saved_session["pid"], 54321) + self.assertIn("created_at", saved_session) + self.assertIn("last_activity", saved_session) + + def test_update_session_activity(self): + """Test updating session activity timestamp.""" + # Create a session + test_session = { + "type": "test_session", + "last_activity": time.time() - 1000 # Set this to the past + } + save_session("activity-test-id", test_session) + + # Update activity + old_time = test_session["last_activity"] + result = update_session_activity("activity-test-id") + + # Verify result + self.assertTrue(result) + + # Verify timestamp was updated + sessions = load_sessions() + new_time = sessions["activity-test-id"]["last_activity"] + self.assertGreater(new_time, old_time) + + def test_end_session(self): + """Test ending a session.""" + # Create two sessions + save_session("session-to-end", {"type": "test"}) + save_session("session-to-keep", {"type": "test"}) + + # End one session + result = end_session("session-to-end") + + # Verify result + self.assertTrue(result) + + # Verify session was removed + sessions = load_sessions() + self.assertNotIn("session-to-end", sessions) + self.assertIn("session-to-keep", sessions) + + def test_cleanup_stale_sessions(self): + """Test cleaning up stale sessions.""" + # Create sessions with different PIDs + save_session("active-session", {"pid": os.getpid()}) + save_session("stale-session", {"pid": 99999}) # Unlikely to exist + + # Mock is_process_running for testing + original_is_process_running = is_process_running + try: + # Replace with mock implementation + def mock_is_process_running(pid): + return pid == os.getpid() + + # Monkey patch the function + globals()['is_process_running'] = mock_is_process_running + + # Call cleanup + active_sessions = cleanup_stale_sessions() + + # Verify only active session remains + self.assertIn("active-session", active_sessions) + self.assertNotIn("stale-session", active_sessions) + + # Verify file was updated + sessions = load_sessions() + self.assertIn("active-session", sessions) + self.assertNotIn("stale-session", sessions) + + finally: + # Restore original function + globals()['is_process_running'] = original_is_process_running + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_system.py b/tests/test_system.py new file mode 100644 index 0000000..88c8a13 --- /dev/null +++ b/tests/test_system.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Tests for system utilities module. +""" + +import unittest +import os +import sys +import json +import tempfile +from unittest.mock import patch, MagicMock + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +from qcmd_cli.utils.system import ( + check_for_updates, display_update_status, + execute_command, format_bytes +) + + +class TestSystemUtilities(unittest.TestCase): + """Test the system utilities functionality.""" + + def test_format_bytes(self): + """Test the format_bytes function.""" + # Test different byte sizes + self.assertEqual(format_bytes(500), "500.00 B") + self.assertEqual(format_bytes(1024), "1.00 KB") + self.assertEqual(format_bytes(1024 * 1024), "1.00 MB") + self.assertEqual(format_bytes(1024 * 1024 * 1024), "1.00 GB") + self.assertEqual(format_bytes(1024 * 1024 * 1024 * 1024), "1.00 TB") + + def test_execute_command(self): + """Test the execute_command function.""" + # Test a simple command + exit_code, output = execute_command("echo 'test command'") + self.assertEqual(exit_code, 0) + self.assertIn("test command", output) + + # Test a failing command + exit_code, output = execute_command("command_that_does_not_exist") + self.assertNotEqual(exit_code, 0) + + @patch('requests.get') + def test_check_for_updates_newer_version(self, mock_get): + """Test check_for_updates when a newer version is available.""" + # Mock the response from PyPI + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'info': { + 'version': '1.1.0' # Newer version than current + } + } + mock_get.return_value = mock_response + + # Patch the current version + with patch('qcmd_cli.utils.system.__version__', '1.0.0'): + # Call the function + result = check_for_updates(force_display=False) + + # Verify result + self.assertTrue(result['update_available']) + self.assertEqual(result['current_version'], '1.0.0') + self.assertEqual(result['latest_version'], '1.1.0') + + @patch('requests.get') + def test_check_for_updates_same_version(self, mock_get): + """Test check_for_updates when the current version is the latest.""" + # Mock the response from PyPI + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'info': { + 'version': '1.0.0' # Same as current + } + } + mock_get.return_value = mock_response + + # Patch the current version + with patch('qcmd_cli.utils.system.__version__', '1.0.0'): + # Call the function + result = check_for_updates(force_display=False) + + # Verify result + self.assertFalse(result['update_available']) + self.assertEqual(result['current_version'], '1.0.0') + self.assertEqual(result['latest_version'], '1.0.0') + + @patch('requests.get') + def test_check_for_updates_connection_error(self, mock_get): + """Test check_for_updates when a connection error occurs.""" + # Mock a connection error + mock_get.side_effect = Exception("Connection error") + + # Call the function + result = check_for_updates(force_display=False) + + # Verify result + self.assertIsNone(result) + + @patch('qcmd_cli.utils.system.check_for_updates') + @patch('qcmd_cli.utils.system.print') + def test_display_update_status_with_update(self, mock_print, mock_check): + """Test display_update_status when an update is available.""" + # Mock the update check to return an update is available + mock_check.return_value = { + 'update_available': True, + 'current_version': '1.0.0', + 'latest_version': '1.1.0' + } + + # Call the function + display_update_status() + + # Verify display was called + mock_print.assert_called() # At least one call to print + + @patch('qcmd_cli.utils.system.check_for_updates') + @patch('qcmd_cli.utils.system.print') + def test_display_update_status_no_update(self, mock_print, mock_check): + """Test display_update_status when no update is available.""" + # Mock the update check to return no update is available + mock_check.return_value = { + 'update_available': False, + 'current_version': '1.0.0', + 'latest_version': '1.0.0' + } + + # Call the function + display_update_status() + + # Verify display was not called (no message needed) + self.assertEqual(mock_print.call_count, 0) + + @patch('qcmd_cli.utils.system.load_config') + def test_display_update_status_disabled(self, mock_load_config): + """Test display_update_status when updates are disabled in config.""" + # Mock the config to disable update checks + mock_load_config.return_value = { + 'disable_update_check': True + } + + # Call the function + with patch('qcmd_cli.utils.system.check_for_updates') as mock_check: + display_update_status() + + # Verify check_for_updates was not called + mock_check.assert_not_called() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file