From fbad322911c101cba9886969e2049a88bc88e4a2 Mon Sep 17 00:00:00 2001 From: ibrahim Date: Thu, 10 Apr 2025 20:36:14 +0000 Subject: [PATCH 1/4] Add update checker and interactive command execution enhancements --- qcmd_cli/config/constants.py | 37 ++ qcmd_cli/core/interactive_shell.py | 585 +++++++++++++++++++++++++++++ qcmd_cli/ui/colors.py | 28 ++ qcmd_cli/ui/display.py | 382 +++++++++++++++++++ qcmd_cli/utils/ollama.py | 15 + qcmd_cli/utils/system.py | 461 +++++++++++++++++++++++ 6 files changed, 1508 insertions(+) create mode 100644 qcmd_cli/config/constants.py create mode 100644 qcmd_cli/core/interactive_shell.py create mode 100644 qcmd_cli/ui/colors.py create mode 100644 qcmd_cli/ui/display.py create mode 100644 qcmd_cli/utils/ollama.py create mode 100644 qcmd_cli/utils/system.py diff --git a/qcmd_cli/config/constants.py b/qcmd_cli/config/constants.py new file mode 100644 index 0000000..4d375e1 --- /dev/null +++ b/qcmd_cli/config/constants.py @@ -0,0 +1,37 @@ +""" +Constants and configuration settings for QCMD. +""" + +import os + +# Paths +HOME_DIR = os.path.expanduser("~") +CONFIG_DIR = os.path.join(HOME_DIR, ".qcmd") +LOG_DIR = os.path.join(CONFIG_DIR, "logs") +SESSIONS_FILE = os.path.join(CONFIG_DIR, "sessions.json") + +# API URLs +OLLAMA_API = os.environ.get("OLLAMA_API", "http://127.0.0.1:11434/api") + +# Version +VERSION = "1.0.10" + +# System prompt template for command generation +SYSTEM_PROMPT_TEMPLATE = """You are QCMD, an AI assistant specialized in Linux system administration, log analysis, and command generation. +Your goal is to provide accurate, secure, and helpful commands based on user requests. + +- Generate shell commands that are valid for Linux systems. +- Prioritize safety and security in your suggestions. +- Explain complex commands when necessary. +- Always verify that commands won't harm the system. +- Suggest alternatives when relevant. + +Current system context: +- User shell: {user_shell} +- OS: {os_info} +- Working directory: {cwd} + +Respond only with the command(s) that would accomplish the task. Do not include explanations or markdown formatting.""" + +# Default number of log lines to show +DEFAULT_LOG_LINES = 100 \ No newline at end of file diff --git a/qcmd_cli/core/interactive_shell.py b/qcmd_cli/core/interactive_shell.py new file mode 100644 index 0000000..9ec2b58 --- /dev/null +++ b/qcmd_cli/core/interactive_shell.py @@ -0,0 +1,585 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Interactive shell module for QCMD. +This module provides a shell-like interface for users to interact with the system. +""" + +import os +import sys +import readline +import time +import atexit +import signal +from typing import List, Optional, Tuple +from datetime import datetime + +# Import from other modules +from qcmd_cli.config.settings import DEFAULT_MODEL +from qcmd_cli.config.constants import CONFIG_DIR +from qcmd_cli.ui.display import Colors, print_cool_header, clear_screen +from qcmd_cli.core.command_generator import generate_command, is_dangerous_command, list_models, fix_command +from qcmd_cli.utils.history import save_to_history, load_history, show_history +from qcmd_cli.utils.system import execute_command, get_system_status, display_update_status +from qcmd_cli.log_analysis.analyzer import handle_log_analysis, analyze_log_file +from qcmd_cli.ui.display import display_system_status +from qcmd_cli.utils.ollama import is_ollama_running + +# Setup session tracking +from qcmd_cli.utils.session import create_session, update_session_activity, cleanup_stale_sessions + +def start_interactive_shell( + auto_mode_enabled: bool = False, + current_model: str = DEFAULT_MODEL, + current_temperature: float = 0.7, + max_attempts: int = 3 +) -> None: + """ + Start an interactive shell for continuous command generation. + + Args: + auto_mode_enabled: Whether to run in auto mode (auto execute and fix commands) + current_model: Model to use for generation + current_temperature: Temperature for generation (0.0-1.0) + max_attempts: Maximum number of fix attempts in auto mode + """ + # Create config directory if it doesn't exist + os.makedirs(CONFIG_DIR, exist_ok=True) + + # History file setup + history_file = os.path.join(CONFIG_DIR, 'qcmd_history') + try: + readline.read_history_file(history_file) + readline.set_history_length(1000) + except (FileNotFoundError, PermissionError): + # If history file doesn't exist or can't be read, just continue + pass + atexit.register(readline.write_history_file, history_file) + + # Setup session tracking + session_id = create_session({ + 'type': 'interactive_shell', + 'model': current_model, + 'start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + 'auto_mode': auto_mode_enabled, + 'temperature': current_temperature + }) + + # Check if Ollama is running + if not is_ollama_running(): + print(f"\n{Colors.YELLOW}Warning: Ollama API is not running. Commands will not work properly.{Colors.END}") + print(f"{Colors.YELLOW}Start Ollama with 'ollama serve' and try again.{Colors.END}") + + # Display QCMD banner + clear_screen() + _display_banner() + + # Check for QCMD updates on startup + display_update_status() + + # Welcome message + print(f"\n{Colors.CYAN}Welcome to the QCMD Interactive Shell!{Colors.END}") + print(f"Using model: {Colors.GREEN}{current_model}{Colors.END}") + print(f"Temperature: {Colors.GREEN}{current_temperature}{Colors.END}") + print(f"Auto mode: {Colors.GREEN}{auto_mode_enabled}{Colors.END}") + print(f"\nEnter your command descriptions or type {Colors.YELLOW}/help{Colors.END} for more options.") + print(f"{Colors.YELLOW}Type /exit to quit{Colors.END}") + + # Command history for the current session + session_history = [] + analyze_errors = True + + # Cleanup stale sessions on startup + cleanup_stale_sessions() + + # Define a signal handler to gracefully exit + def signal_handler(sig, frame): + print(f"\n{Colors.CYAN}Received signal {sig}, exiting...{Colors.END}") + # End the session when receiving a signal + from qcmd_cli.utils.session import end_session + try: + end_session(session_id) + except Exception: + pass + sys.exit(0) + + # Register signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # Create a timer for periodically updating session activity + last_activity_update = time.time() + update_interval = 60 # Update session activity every 60 seconds + + # Main interactive loop + while True: + try: + # Update session activity periodically + current_time = time.time() + if current_time - last_activity_update > update_interval: + update_session_activity(session_id) + last_activity_update = current_time + + # Get user input + user_input = input(f"\n{Colors.BOLD}qcmd> {Colors.END}").strip() + + # Handle empty input + if not user_input: + continue + + # Save to readline history + readline.add_history(user_input) + + # Handle special commands + if user_input.lower() in ('/exit', '/quit'): + print(f"{Colors.CYAN}Goodbye!{Colors.END}") + # End the session when exiting + end_session(session_id) + break + + elif user_input.lower() == '/help': + _show_shell_help() + continue + + elif user_input.lower() == '/history': + # Show command history + if session_history: + print(f"\n{Colors.CYAN}Command History (this session):{Colors.END}") + for i, (cmd_desc, cmd) in enumerate(session_history, 1): + print(f"{Colors.BLUE}{i}.{Colors.END} {cmd_desc}") + print(f" {Colors.GREEN}{cmd}{Colors.END}") + else: + print(f"{Colors.YELLOW}No commands in this session yet.{Colors.END}") + continue + + elif user_input.lower() == '/models': + # List available models + models = list_models() + if models: + print(f"\n{Colors.CYAN}Available Models:{Colors.END}") + for i, model in enumerate(models, 1): + current = " (current)" if model == current_model else "" + print(f"{i}. {Colors.GREEN}{model}{Colors.END}{current}") + else: + print(f"{Colors.YELLOW}No models available or could not connect to Ollama.{Colors.END}") + continue + + elif user_input.lower() == '/status': + # Show system status + print(f"\n{Colors.CYAN}Getting system status...{Colors.END}") + status = get_system_status() + display_system_status(status) + continue + + elif user_input.lower() == '/update': + # Check for updates + display_update_status() + continue + + elif user_input.lower().startswith('/model '): + # Switch models + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + try: + # Check if input is a number (index from listed models) + if parts[1].isdigit(): + idx = int(parts[1]) - 1 + models = list_models() + if 0 <= idx < len(models): + current_model = models[idx] + print(f"Switched to model: {Colors.GREEN}{current_model}{Colors.END}") + else: + print(f"{Colors.YELLOW}Invalid model index.{Colors.END}") + else: + # Direct model name + current_model = parts[1] + print(f"Switched to model: {Colors.GREEN}{current_model}{Colors.END}") + except Exception as e: + print(f"{Colors.YELLOW}Error switching models: {e}{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /model {Colors.END}") + continue + + elif user_input.lower().startswith('/temperature '): + # Set temperature + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + try: + temp = float(parts[1]) + if 0.0 <= temp <= 1.0: + current_temperature = temp + print(f"Temperature set to: {Colors.GREEN}{current_temperature}{Colors.END}") + else: + print(f"{Colors.YELLOW}Temperature must be between 0.0 and 1.0{Colors.END}") + except ValueError: + print(f"{Colors.YELLOW}Invalid temperature value. Use a number between 0.0 and 1.0{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /temperature {Colors.END}") + continue + + elif user_input.lower() == '/auto': + # Enable auto mode + auto_mode_enabled = True + print(f"{Colors.GREEN}Auto mode enabled.{Colors.END}") + continue + + elif user_input.lower() == '/manual': + # Disable auto mode + auto_mode_enabled = False + print(f"{Colors.GREEN}Auto mode disabled.{Colors.END}") + continue + + elif user_input.lower() == '/analyze': + # Toggle error analysis + analyze_errors = not analyze_errors + status = "enabled" if analyze_errors else "disabled" + print(f"{Colors.GREEN}Error analysis {status}.{Colors.END}") + continue + + elif user_input.lower() == '/logs': + # Find and analyze log files + handle_log_analysis(current_model) + continue + + elif user_input.lower().startswith('/analyze-file '): + # Analyze a specific file + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + file_path = os.path.expanduser(parts[1]) + if os.path.isfile(file_path): + analyze_log_file(file_path, current_model) + else: + print(f"{Colors.YELLOW}File not found: {file_path}{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /analyze-file {Colors.END}") + continue + + elif user_input.lower().startswith('/monitor '): + # Monitor a specific file with AI analysis + parts = user_input.split(maxsplit=1) + if len(parts) == 2: + file_path = os.path.expanduser(parts[1]) + if os.path.isfile(file_path): + analyze_log_file(file_path, current_model, background=True) + else: + print(f"{Colors.YELLOW}File not found: {file_path}{Colors.END}") + else: + print(f"{Colors.YELLOW}Usage: /monitor {Colors.END}") + continue + + elif user_input.lower() == '/execute': + # Execute last command + if session_history: + _, last_cmd = session_history[-1] + print(f"\n{Colors.CYAN}Executing:{Colors.END} {Colors.GREEN}{last_cmd}{Colors.END}") + + # Confirm execution + confirm = input(f"Press {Colors.YELLOW}Enter{Colors.END} to execute or Ctrl+C to cancel: ") + + # Execute the command + exit_code, output = execute_command(last_cmd, analyze_errors, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + if analyze_errors and exit_code != 0: + _analyze_and_fix_error(last_cmd, output, current_model) + else: + print(f"{Colors.YELLOW}No commands in history to execute.{Colors.END}") + continue + + # Process regular input as a command request + print(f"\n{Colors.CYAN}Generating command...{Colors.END}") + + # Generate the command + command = generate_command(user_input, current_model, current_temperature) + + # Display the generated command + if command.startswith("Error:"): + print(f"{Colors.RED}{command}{Colors.END}") + continue + + print(f"{Colors.GREEN}{command}{Colors.END}") + + # Check for potentially dangerous commands + if is_dangerous_command(command): + print(f"\n{Colors.RED}WARNING: This command may be potentially dangerous!{Colors.END}") + print(f"{Colors.RED}Review it carefully before execution.{Colors.END}") + + # Add to session history + session_history.append((user_input, command)) + + # Save to global history + save_to_history(user_input) + + # Handle auto mode + if auto_mode_enabled: + print(f"\n{Colors.CYAN}Auto-executing command...{Colors.END}") + + # Execute the command + exit_code, output = execute_command(command, False, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # Handle errors in auto mode + if exit_code != 0: + _auto_fix_and_execute(command, output, current_model, max_attempts) + else: + # Interactive mode with options to execute, edit, or skip + print(f"\n{Colors.CYAN}Options:{Colors.END}") + print(f" {Colors.YELLOW}y{Colors.END} - Execute the command") + print(f" {Colors.YELLOW}n{Colors.END} - Skip execution") + print(f" {Colors.YELLOW}e{Colors.END} - Edit command before execution") + + while True: + try: + choice = input(f"\n{Colors.BOLD}Enter your choice (y/n/e):{Colors.END} ").strip().lower() + + if choice == 'y': + # Execute the command + exit_code, output = execute_command(command, analyze_errors, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # Handle errors if enabled + if analyze_errors and exit_code != 0: + _analyze_and_fix_error(command, output, current_model) + break + + elif choice == 'n': + print(f"{Colors.YELLOW}Command execution skipped.{Colors.END}") + break + + elif choice == 'e': + print(f"{Colors.CYAN}Edit the command:{Colors.END}") + # Display original command for reference + print(f"{Colors.GREEN}Original: {command}{Colors.END}") + + # Allow user to edit + try: + # Pre-populate the input with the current command + if 'readline' in sys.modules: + readline.set_startup_hook(lambda: readline.insert_text(command)) + + edited_command = input(f"{Colors.BOLD}Edit> {Colors.END}").strip() + + # Reset the startup hook + if 'readline' in sys.modules: + readline.set_startup_hook(None) + + if edited_command: + # Update command + command = edited_command + print(f"\n{Colors.CYAN}Updated command:{Colors.END} {Colors.GREEN}{command}{Colors.END}") + + # Update session history + session_history[-1] = (user_input, command) + + # Ask for execution confirmation + sub_choice = input(f"\n{Colors.BOLD}Execute this command now? (y/n):{Colors.END} ").strip().lower() + if sub_choice == 'y': + # Execute the command + exit_code, output = execute_command(command, analyze_errors, current_model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # Handle errors if enabled + if analyze_errors and exit_code != 0: + _analyze_and_fix_error(command, output, current_model) + else: + print(f"{Colors.YELLOW}No changes made to the command.{Colors.END}") + except KeyboardInterrupt: + print("\nEditing cancelled") + finally: + # Reset the startup hook + if 'readline' in sys.modules: + readline.set_startup_hook(None) + break + + else: + print(f"{Colors.YELLOW}Invalid choice. Please enter 'y', 'n', or 'e'.{Colors.END}") + except KeyboardInterrupt: + print("\nOperation cancelled") + break + + # Update activity after processing a command + update_session_activity(session_id) + last_activity_update = time.time() + + except KeyboardInterrupt: + print("\nInterrupted") + continue + + except EOFError: + print(f"\n{Colors.CYAN}Goodbye!{Colors.END}") + # End the session when exiting + end_session(session_id) + break + + except Exception as e: + print(f"\n{Colors.RED}Error: {str(e)}{Colors.END}") + finally: + # End the session when exiting, even if there was an unhandled exception + end_session(session_id) + + # Save history on exit + try: + readline.write_history_file(history_file) + except Exception as e: + print(f"{Colors.YELLOW}Could not save shell history: {e}{Colors.END}", file=sys.stderr) + +def _show_shell_help() -> None: + """Display help information for the interactive shell.""" + print(f"\n{Colors.CYAN}QCMD Interactive Shell Commands:{Colors.END}") + print(f"{Colors.YELLOW}/help{Colors.END} - Show this help message") + print(f"{Colors.YELLOW}/exit{Colors.END}, {Colors.YELLOW}/quit{Colors.END} - Exit the shell") + print(f"{Colors.YELLOW}/history{Colors.END} - Show command history for this session") + print(f"{Colors.YELLOW}/models{Colors.END} - List available models") + print(f"{Colors.YELLOW}/model {Colors.END} - Switch to a different model") + print(f"{Colors.YELLOW}/status{Colors.END} - Show system status information") + print(f"{Colors.YELLOW}/update{Colors.END} - Check for QCMD updates") + print(f"{Colors.YELLOW}/temperature {Colors.END} - Set temperature (0.0-1.0)") + print(f"{Colors.YELLOW}/auto{Colors.END} - Enable auto mode") + print(f"{Colors.YELLOW}/manual{Colors.END} - Disable auto mode") + print(f"{Colors.YELLOW}/analyze{Colors.END} - Toggle error analysis") + print(f"{Colors.YELLOW}/execute{Colors.END} - Execute last generated command (with confirmation)") + print(f"{Colors.YELLOW}/logs{Colors.END} - Find and analyze log files") + print(f"{Colors.YELLOW}/analyze-file {Colors.END} - Analyze a specific file") + print(f"{Colors.YELLOW}/monitor {Colors.END} - Monitor a file continuously") + + print(f"\n{Colors.CYAN}Command Execution Options:{Colors.END}") + print(f"When a command is generated, you'll be presented with these options:") + print(f" {Colors.YELLOW}y{Colors.END} - Execute the generated command") + print(f" {Colors.YELLOW}n{Colors.END} - Skip execution of the command") + print(f" {Colors.YELLOW}e{Colors.END} - Edit the command before execution") + + print(f"\n{Colors.CYAN}Commands:{Colors.END}") + print("Just type a natural language description of what you want to do.") + print("Examples:") + print(f" {Colors.GREEN}find all log files in /var/log{Colors.END}") + print(f" {Colors.GREEN}search for errors in apache logs{Colors.END}") + print(f" {Colors.GREEN}show top 10 processes by CPU usage{Colors.END}") + +def _analyze_and_fix_error(command: str, output: str, model: str) -> None: + """ + Analyze and provide a fix for a failed command. + + Args: + command: The failed command + output: The error output + model: Model to use for analysis + """ + from qcmd_cli.core.command_generator import analyze_error, fix_command + + print(f"\n{Colors.CYAN}Analyzing error...{Colors.END}") + analysis = analyze_error(output, command, model) + print(f"\n{Colors.CYAN}Analysis:{Colors.END}\n{analysis}") + + print(f"\n{Colors.CYAN}Suggesting fixed command...{Colors.END}") + fixed_command = fix_command(command, output, model) + + if fixed_command and not fixed_command.startswith("Error:"): + print(f"\n{Colors.GREEN}{fixed_command}{Colors.END}") + + # Ask if user wants to execute the fixed command + try: + confirm = input(f"\nExecute this command? (y/n): ").strip().lower() + if confirm == 'y': + exit_code, new_output = execute_command(fixed_command, False, model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + except KeyboardInterrupt: + print("\nCancelled") + else: + print(f"\n{Colors.YELLOW}Could not generate a fixed command: {fixed_command}{Colors.END}") + +def _auto_fix_and_execute(command: str, output: str, model: str, max_attempts: int) -> None: + """ + Automatically fix and execute a failed command multiple times if needed. + + Args: + command: The failed command + output: The error output + model: Model to use for fixing + max_attempts: Maximum number of fix attempts + """ + from qcmd_cli.core.command_generator import fix_command + + original_command = command + attempts = 1 + + while attempts <= max_attempts: + print(f"\n{Colors.CYAN}Auto-fix attempt {attempts}/{max_attempts}...{Colors.END}") + + # Generate fixed command + fixed_command = fix_command(command, output, model) + + if fixed_command and not fixed_command.startswith("Error:") and fixed_command != command: + print(f"\n{Colors.GREEN}{fixed_command}{Colors.END}") + + # Execute the fixed command + print(f"\n{Colors.CYAN}Executing fixed command...{Colors.END}") + exit_code, new_output = execute_command(fixed_command, False, model) + + # Display results + status = f"{Colors.GREEN}Success{Colors.END}" if exit_code == 0 else f"{Colors.RED}Failed (exit code: {exit_code}){Colors.END}" + print(f"\n{Colors.CYAN}Status:{Colors.END} {status}") + + # If successful, break the loop + if exit_code == 0: + break + + # Update for next attempt + command = fixed_command + output = new_output + else: + print(f"\n{Colors.YELLOW}No better solution found after {attempts} attempts.{Colors.END}") + break + + attempts += 1 + + if attempts > max_attempts: + print(f"\n{Colors.YELLOW}Maximum fix attempts reached. Could not fix the command.{Colors.END}") + + # Provide a summary + if exit_code == 0: + print(f"\n{Colors.GREEN}Successfully fixed and executed the command.{Colors.END}") + else: + print(f"\n{Colors.YELLOW}Could not fix and execute the command after {attempts-1} attempts.{Colors.END}") + print(f"{Colors.YELLOW}Original command: {original_command}{Colors.END}") + print(f"{Colors.YELLOW}Last attempt: {command}{Colors.END}") + +def _display_banner(): + """Display the QCMD banner.""" + print() + print(f"╔═════════════════════════════════════════════════════════════╗") + print(f"║ ║") + print(f"║ ██████╗ ██████╗███╗ ███╗██████╗ ║") + print(f"║ ██╔═══██╗██╔════╝████╗ ████║██╔══██╗ ║") + print(f"║ ██║ ██║██║ ██╔████╔██║██║ ██║ ║") + print(f"║ ██║▄▄ ██║██║ ██║╚██╔╝██║██║ ██║ ║") + print(f"║ ╚██████╔╝╚██████╗██║ ╚═╝ ██║██████╔╝ ║") + print(f"║ ╚══▀▀═╝ ╚═════╝╚═╝ ╚═╝╚═════╝ ║") + + # Get QCMD version + try: + from importlib.metadata import version + qcmd_version = version("ibrahimiq-qcmd") + version_text = f"v{qcmd_version}" + except: + version_text = "v1.0.10" # Fallback version + + print(f"║ {version_text} ║") + print(f"║ ║") + print(f"╚═════════════════════════════════════════════════════════════╝") \ No newline at end of file diff --git a/qcmd_cli/ui/colors.py b/qcmd_cli/ui/colors.py new file mode 100644 index 0000000..dc2af78 --- /dev/null +++ b/qcmd_cli/ui/colors.py @@ -0,0 +1,28 @@ +"""Terminal color codes module.""" + +class Colors: + """Terminal color codes.""" + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + MAGENTA = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' + + @staticmethod + def disable(): + """Disable colors by setting all color codes to empty strings.""" + Colors.RED = '' + Colors.GREEN = '' + Colors.YELLOW = '' + Colors.BLUE = '' + Colors.MAGENTA = '' + Colors.CYAN = '' + Colors.WHITE = '' + Colors.BOLD = '' + Colors.UNDERLINE = '' + Colors.END = '' \ No newline at end of file diff --git a/qcmd_cli/ui/display.py b/qcmd_cli/ui/display.py new file mode 100644 index 0000000..c8008e0 --- /dev/null +++ b/qcmd_cli/ui/display.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +""" +UI display module for formatting terminal output in QCMD. +""" + +import os +import sys +import textwrap +import time +from typing import List, Dict, Any +from datetime import datetime + +# Import Colors from dedicated module +from qcmd_cli.ui.colors import Colors + +def print_cool_header(): + """Print a cool header for the application.""" + from qcmd_cli import __version__ + + # Get terminal width + try: + term_width = os.get_terminal_size().columns + except (AttributeError, OSError): + term_width = 80 + + # Adjust based on available width + if term_width < 60: + # Simplified header for narrow terminals + print(f"\n{Colors.CYAN}===== QCMD v{__version__} ====={Colors.END}") + return + + header = f""" +{Colors.CYAN}╔═════════════════════════════════════════════════════════════╗ +║ ║ +║ {Colors.BOLD} ██████╗ ██████╗███╗ ███╗██████╗ {Colors.CYAN} ║ +║ {Colors.BOLD} ██╔═══██╗██╔════╝████╗ ████║██╔══██╗ {Colors.CYAN} ║ +║ {Colors.BOLD} ██║ ██║██║ ██╔████╔██║██║ ██║ {Colors.CYAN} ║ +║ {Colors.BOLD} ██║▄▄ ██║██║ ██║╚██╔╝██║██║ ██║ {Colors.CYAN} ║ +║ {Colors.BOLD} ╚██████╔╝╚██████╗██║ ╚═╝ ██║██████╔╝ {Colors.CYAN} ║ +║ {Colors.BOLD} ╚══▀▀═╝ ╚═════╝╚═╝ ╚═╝╚═════╝ {Colors.CYAN} ║ +║ v{__version__} ║ +║ ║ +╚═════════════════════════════════════════════════════════════╝{Colors.END}""" + + print(header) + +def format_command_output(output: str, compact: bool = False) -> str: + """ + Format command output for display. + + Args: + output: Command output to format + compact: Whether to use compact mode (shorter output) + + Returns: + Formatted output + """ + # Truncate long output in compact mode + if compact and len(output) > 2000: + first_part = output[:1000] + last_part = output[-1000:] + return f"{first_part}\n\n[...output truncated...]\n\n{last_part}" + + return output + +def format_command_for_display(command: str) -> str: + """ + Format a command for display with proper coloring. + + Args: + command: Command to format + + Returns: + Formatted command for display + """ + return f"{Colors.GREEN}{command}{Colors.END}" + +def print_command_analysis(analysis: str): + """ + Print command analysis with nice formatting. + + Args: + analysis: Analysis text to print + """ + try: + term_width = os.get_terminal_size().columns - 4 + except (AttributeError, OSError): + term_width = 76 + + print(f"\n{Colors.CYAN}Analysis:{Colors.END}") + + # Format with word wrap + wrapped_text = textwrap.fill(analysis, width=term_width) + + # Print with left padding + for line in wrapped_text.split('\n'): + print(f" {line}") + +def display_log_files(log_files: List[str], favorite_logs: List[str] = None) -> None: + """ + Display a list of log files for selection. + + Args: + log_files: List of log file paths + favorite_logs: List of favorite log file paths + """ + if not log_files: + print(f"{Colors.YELLOW}No log files found.{Colors.END}") + return + + favorite_logs = favorite_logs or [] + + print(f"\n{Colors.CYAN}Found {len(log_files)} log files:{Colors.END}") + + # Group logs by directory + logs_by_dir = {} + for log_file in log_files: + directory = os.path.dirname(log_file) + if directory not in logs_by_dir: + logs_by_dir[directory] = [] + logs_by_dir[directory].append(log_file) + + # Display grouped logs + index = 1 + all_indices = {} + + for directory, logs in sorted(logs_by_dir.items()): + print(f"\n{Colors.BLUE}{directory}/{Colors.END}") + + for log_file in sorted(logs): + # Mark favorites + star = f"{Colors.YELLOW}★{Colors.END} " if log_file in favorite_logs else " " + + # Format the filename + filename = os.path.basename(log_file) + + # Try to show file size + try: + size = os.path.getsize(log_file) + if size < 1024: + size_str = f"{size} B" + elif size < 1024 * 1024: + size_str = f"{size / 1024:.1f} KB" + elif size < 1024 * 1024 * 1024: + size_str = f"{size / (1024 * 1024):.1f} MB" + else: + size_str = f"{size / (1024 * 1024 * 1024):.1f} GB" + + # Last modified time + mtime = os.path.getmtime(log_file) + mtime_str = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M') + + # Print the log entry + print(f"{star}{Colors.GREEN}{index:3d}{Colors.END}. {filename} ({size_str}, {mtime_str})") + except (OSError, IOError): + # If we can't access file details + print(f"{star}{Colors.GREEN}{index:3d}{Colors.END}. {filename}") + + all_indices[index] = log_file + index += 1 + + return all_indices + +def display_system_status(status: Dict[str, Any]) -> None: + """ + Display detailed system status information. + + Args: + status: Dictionary with system status information + """ + # Print divider line + print(f"\n{Colors.CYAN}{'-' * 80}{Colors.END}") + + # System information + print(f"\n{Colors.RED}{Colors.BOLD}System Information:{Colors.END}") + print(f" {Colors.BLUE}OS:{Colors.END} {status.get('os', 'Unknown')}") + print(f" {Colors.BLUE}Kernel:{Colors.END} {status.get('kernel', 'Unknown')}") + print(f" {Colors.BLUE}Python Version:{Colors.END} {status.get('python_version', 'Unknown')}") + print(f" {Colors.BLUE}QCMD Version:{Colors.END} {status.get('qcmd_version', 'Unknown')}") + print(f" {Colors.BLUE}Hostname:{Colors.END} {status.get('hostname', 'Unknown')}") + print(f" {Colors.BLUE}Current Time:{Colors.END} {status.get('current_time', 'Unknown')}") + print(f" {Colors.BLUE}Uptime:{Colors.END} {status.get('uptime', 'Unknown')}") + + # CPU information + print(f"\n{Colors.RED}{Colors.BOLD}CPU:{Colors.END}") + print(f" {Colors.BLUE}Load Average:{Colors.END} {status.get('load_avg', 'Unknown')}") + print(f" {Colors.BLUE}CPU Usage:{Colors.END} {status.get('cpu_percent', 'Unknown')}%") + + # Memory information + print(f"\n{Colors.RED}{Colors.BOLD}Memory:{Colors.END}") + print(f" {Colors.BLUE}Total:{Colors.END} {status.get('mem_total', 'Unknown')}") + print(f" {Colors.BLUE}Used:{Colors.END} {status.get('mem_used', 'Unknown')} ({status.get('mem_percent', 'Unknown')}%)") + print(f" {Colors.BLUE}Free:{Colors.END} {status.get('mem_free', 'Unknown')}") + + # Disk information + print(f"\n{Colors.RED}{Colors.BOLD}Disk Usage:{Colors.END}") + if 'disks' in status: + for disk in status['disks']: + print(f" {Colors.BLUE}{disk['mount']}:{Colors.END} {disk['used']}/{disk['total']} ({disk['percent']}%)") + else: + print(f" {Colors.YELLOW}Disk information not available{Colors.END}") + + # Log directory space + if 'log_dir_space' in status: + log_space = status['log_dir_space'] + if 'error' not in log_space: + print(f"\n{Colors.RED}{Colors.BOLD}Disk Space (Log Directory):{Colors.END}") + print(f" {Colors.BLUE}Path:{Colors.END} {log_space.get('path', 'Unknown')}") + print(f" {Colors.BLUE}Total:{Colors.END} {log_space.get('total', 'Unknown')}") + print(f" {Colors.BLUE}Used:{Colors.END} {log_space.get('used', 'Unknown')} ({log_space.get('percent', 'Unknown')}%)") + print(f" {Colors.BLUE}Free:{Colors.END} {log_space.get('free', 'Unknown')}") + + # Ollama status + if 'ollama' in status: + ollama = status['ollama'] + print(f"\n{Colors.RED}{Colors.BOLD}Ollama Status:{Colors.END}") + + # Check if Ollama is running + if ollama.get('running', False): + print(f" {Colors.BLUE}Status:{Colors.END} {Colors.GREEN}Running{Colors.END}") + else: + print(f" {Colors.BLUE}Status:{Colors.END} {Colors.RED}Not Running{Colors.END}") + if 'error' in ollama: + print(f" {Colors.BLUE}Error:{Colors.END} {ollama['error']}") + + print(f" {Colors.BLUE}API URL:{Colors.END} {ollama.get('api_url', 'Unknown')}") + + # List available models + if 'models' in ollama and ollama['models']: + print(f" {Colors.BLUE}Available Models:{Colors.END}") + for model in ollama['models']: + print(f" - {model}") + elif ollama.get('running', False): + print(f" {Colors.BLUE}Available Models:{Colors.END} No models found") + + # QCMD Processes + print(f"\n{Colors.RED}{Colors.BOLD}QCMD Processes:{Colors.END}") + if 'qcmd_processes' in status: + if isinstance(status['qcmd_processes'], list): + if status['qcmd_processes']: + print(f" {Colors.BLUE}Active Processes:{Colors.END} {len(status['qcmd_processes'])}") + for i, proc in enumerate(status['qcmd_processes'], 1): + print(f" {i}. {Colors.GREEN}PID:{Colors.END} {proc.get('pid', 'Unknown')}") + print(f" {Colors.BLUE}Type:{Colors.END} {proc.get('type', 'Unknown')}") + print(f" {Colors.BLUE}Started:{Colors.END} {proc.get('start_time', 'Unknown')}") + print(f" {Colors.BLUE}Status:{Colors.END} {proc.get('status', 'Unknown')}") + if 'command' in proc: + cmd = proc['command'] + # Truncate command if too long + if len(cmd) > 70: + cmd = cmd[:67] + "..." + print(f" {Colors.BLUE}Command:{Colors.END} {cmd}") + print() + else: + print(f" {Colors.YELLOW}No active QCMD processes found{Colors.END}") + else: + print(f" {Colors.YELLOW}{status['qcmd_processes']}{Colors.END}") + + # Active Log Monitors + print(f"\n{Colors.RED}{Colors.BOLD}Active Log Monitors:{Colors.END}") + if 'active_monitors' in status and status['active_monitors']: + for i, monitor in enumerate(status['active_monitors'], 1): + print(f" {i}. {Colors.GREEN}{monitor.get('log_file', 'Unknown')}{Colors.END}") + print(f" {Colors.BLUE}PID:{Colors.END} {monitor.get('pid', 'Unknown')}") + print(f" {Colors.BLUE}Status:{Colors.END} {monitor.get('status', 'Running')}") + if 'start_time' in monitor: + print(f" {Colors.BLUE}Started:{Colors.END} {monitor['start_time']}") + print() + else: + print(f" {Colors.YELLOW}No active log monitors{Colors.END}") + + # Active Sessions + print(f"\n{Colors.RED}{Colors.BOLD}Active Sessions:{Colors.END}") + if 'active_sessions' in status and status['active_sessions']: + for i, session in enumerate(status['active_sessions'], 1): + session_id = session.get('session_id', 'Unknown')[:8] # Show first 8 chars of UUID + print(f" {i}. {Colors.GREEN}ID:{Colors.END} {session_id}...") + print(f" {Colors.BLUE}Type:{Colors.END} {session.get('type', 'Unknown')}") + print(f" {Colors.BLUE}PID:{Colors.END} {session.get('pid', 'Unknown')}") + + # Format timestamp nicely if it exists + if 'created_at' in session: + created_time = datetime.fromtimestamp(session['created_at']).strftime('%Y-%m-%d %H:%M:%S') + print(f" {Colors.BLUE}Created:{Colors.END} {created_time}") + elif 'start_time' in session: + print(f" {Colors.BLUE}Started:{Colors.END} {session['start_time']}") + + print() + else: + print(f" {Colors.YELLOW}No active sessions{Colors.END}") + + # Network information + if 'network' in status and status['network']: + print(f"\n{Colors.RED}{Colors.BOLD}Network:{Colors.END}") + for nic, stats in status['network'].items(): + print(f" {Colors.BLUE}{nic}:{Colors.END} {stats}") + + # Top processes + if 'top_processes' in status and status['top_processes']: + print(f"\n{Colors.RED}{Colors.BOLD}Top Processes:{Colors.END}") + print(f" {Colors.BLUE}{'PID':<8}{'USER':<12}{'CPU%':<8}{'MEM%':<8}COMMAND{Colors.END}") + for proc in status['top_processes']: + print(f" {proc.get('pid', 'N/A'):<8}{proc.get('user', 'N/A'):<12}" + f"{proc.get('cpu', 'N/A'):<8}{proc.get('mem', 'N/A'):<8}{proc.get('command', 'N/A')}") + + # Print divider line + print(f"\n{Colors.CYAN}{'-' * 80}{Colors.END}\n") + +def print_examples(): + """Print detailed usage examples.""" + examples = """ +{CYAN}Basic Command Generation{END} +{GREEN}qcmd "list all files in the current directory"{END} +{GREEN}qcmd "find large log files"{END} + +{CYAN}Auto-Execute Commands{END} +{GREEN}qcmd -e "check disk space usage"{END} +{GREEN}qcmd --execute "show current directory"{END} + +{CYAN}Using Different Models{END} +{GREEN}qcmd -m llama2:7b "restart the nginx service"{END} +{GREEN}qcmd --model deepseek-coder "create a backup of config files"{END} + +{CYAN}Adjusting Creativity{END} +{GREEN}qcmd -t 0.7 "find all JPG images"{END} +{GREEN}qcmd --temperature 0.9 "monitor network traffic"{END} + +{CYAN}AI Error Analysis{END} +{GREEN}qcmd --analyze "find files larger than 1GB"{END} +{GREEN}qcmd -a -m llama2:7b "create a tar archive of logs"{END} + +{CYAN}Auto Mode (Auto-Execute with Error Fixing){END} +{GREEN}qcmd --auto "find Python files modified today"{END} +{GREEN}qcmd -A "search logs for errors"{END} +{GREEN}qcmd -A -m llama2:7b "get system information"{END} + +{CYAN}Log Analysis{END} +{GREEN}qcmd --logs{END} +{GREEN}qcmd --logs -m llama2:7b{END} +{GREEN}qcmd --all-logs{END} +{GREEN}qcmd --analyze-file /var/log/syslog{END} +{GREEN}qcmd --monitor /var/log/auth.log{END} +""".format( + CYAN=Colors.CYAN, + GREEN=Colors.GREEN, + END=Colors.END + ) + + print(examples) + +def clear_screen(): + """Clear the terminal screen.""" + if os.name == 'nt': # For Windows + os.system('cls') + else: # For Unix/Linux/Mac + os.system('clear') + +# Human readable size formatter +def get_human_readable_size(size_bytes): + """ + Convert bytes to human readable format (KB, MB, GB). + + Args: + size_bytes: Size in bytes + + Returns: + str: Human readable size string + """ + if size_bytes == 0: + return "0 B" + + size_names = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + i = 0 + while size_bytes >= 1024 and i < len(size_names) - 1: + size_bytes /= 1024 + i += 1 + + # Return formatted size + if i == 0: + return f"{size_bytes:.0f} {size_names[i]}" + else: + return f"{size_bytes:.1f} {size_names[i]}" \ No newline at end of file diff --git a/qcmd_cli/utils/ollama.py b/qcmd_cli/utils/ollama.py new file mode 100644 index 0000000..71474f7 --- /dev/null +++ b/qcmd_cli/utils/ollama.py @@ -0,0 +1,15 @@ +def is_ollama_running(): + """ + Check if the Ollama API is running and accessible. + + Returns: + bool: True if the Ollama API is running, False otherwise + """ + from qcmd_cli.config.constants import OLLAMA_API + import requests + + try: + response = requests.get(f"{OLLAMA_API}/tags", timeout=2) + return response.status_code == 200 + except requests.exceptions.RequestException: + return False \ No newline at end of file diff --git a/qcmd_cli/utils/system.py b/qcmd_cli/utils/system.py new file mode 100644 index 0000000..d29f877 --- /dev/null +++ b/qcmd_cli/utils/system.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python3 +""" +Utility module for system-related functions in QCMD. +""" + +import os +import sys +import subprocess +import platform +import shutil +import time +import re +import tempfile +from datetime import datetime +from typing import Dict, List, Tuple, Any, Optional +import requests +from importlib.metadata import version as get_version + +from qcmd_cli.ui.display import Colors +from qcmd_cli.core.command_generator import analyze_error +from qcmd_cli.config.settings import OLLAMA_API, REQUEST_TIMEOUT + +try: + # For Python 3.8+ + try: + qcmd_version = get_version("ibrahimiq-qcmd") + except Exception: + # Fallback to package version + from qcmd_cli import __version__ + qcmd_version = __version__ +except ImportError: + # Fallback for older Python versions + try: + import pkg_resources + qcmd_version = pkg_resources.get_distribution("ibrahimiq-qcmd").version + except Exception: + # Fallback to package version + from qcmd_cli import __version__ + qcmd_version = __version__ + +def execute_command(command: str, analyze_errors: bool = False, model: str = None) -> Tuple[int, str]: + """ + Execute a shell command and return the exit code and output. + + Args: + command: The command to execute + analyze_errors: Whether to analyze errors if the command fails + model: Model to use for error analysis + + Returns: + Tuple of (exit_code, output) + """ + print(f"\n{Colors.CYAN}Executing:{Colors.END} {Colors.GREEN}{command}{Colors.END}") + + # Create a temporary file for command output + with tempfile.NamedTemporaryFile(delete=False, mode='w+b') as temp_file: + temp_file_path = temp_file.name + + try: + # Run the command and capture output + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1 + ) + + output_lines = [] + for line in process.stdout: + print(line, end='') + output_lines.append(line) + + process.wait() + exit_code = process.returncode + output = ''.join(output_lines) + + # Also write output to temp file for potential later analysis + with open(temp_file_path, 'w', encoding='utf-8') as f: + f.write(output) + + # If the command failed and analyze_errors is enabled + if exit_code != 0 and analyze_errors: + print(f"\n{Colors.YELLOW}Command failed with exit code {exit_code}.{Colors.END}") + print(f"{Colors.CYAN}Analyzing error...{Colors.END}") + + # Use the analyze_error function to analyze the error + analysis = analyze_error(output, command, model) + print(f"\n{Colors.CYAN}Analysis:{Colors.END}\n{analysis}") + + return exit_code, output + + except Exception as e: + error_msg = f"Error executing command: {str(e)}" + print(f"{Colors.RED}{error_msg}{Colors.END}") + return 1, error_msg + + finally: + # Clean up the temporary file + try: + os.unlink(temp_file_path) + except: + pass + +def get_system_status() -> Dict[str, Any]: + """ + Get detailed system status information. + + Returns: + Dictionary containing system status information + """ + status = {} + + # Basic system info + status['os'] = platform.system() + status['kernel'] = platform.release() + status['python_version'] = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + status['qcmd_version'] = qcmd_version + status['current_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # Get hostname + try: + status['hostname'] = platform.node() + except: + status['hostname'] = "Unknown" + + # Get uptime + try: + if platform.system() == 'Linux': + with open('/proc/uptime', 'r') as f: + uptime_seconds = float(f.readline().split()[0]) + + # Format uptime + days = int(uptime_seconds / 86400) + hours = int((uptime_seconds % 86400) / 3600) + minutes = int((uptime_seconds % 3600) / 60) + + uptime_str = "" + if days > 0: + uptime_str += f"{days} day{'s' if days != 1 else ''}, " + uptime_str += f"{hours}h {minutes}m" + + status['uptime'] = uptime_str + + elif platform.system() == 'Darwin': # macOS + # Use uptime command on macOS + uptime_output = subprocess.check_output(['uptime']).decode('utf-8') + status['uptime'] = uptime_output.strip() + + else: + status['uptime'] = "Unknown" + + except Exception as e: + status['uptime'] = f"Error getting uptime: {e}" + + # Check Ollama status + status['ollama'] = {} + try: + response = requests.get(f"{OLLAMA_API}/tags", timeout=REQUEST_TIMEOUT) + status['ollama']['running'] = True + status['ollama']['api_url'] = OLLAMA_API + + # Get available models + if response.status_code == 200: + result = response.json() + if 'models' in result: + status['ollama']['models'] = [model['name'] for model in result['models']] + else: + status['ollama']['models'] = [] + else: + status['ollama']['error'] = f"Status code: {response.status_code}" + status['ollama']['models'] = [] + + except requests.exceptions.RequestException as e: + status['ollama']['running'] = False + status['ollama']['error'] = str(e) + status['ollama']['models'] = [] + + # Get CPU load average + try: + if platform.system() in ['Linux', 'Darwin']: + load1, load5, load15 = os.getloadavg() + status['load_avg'] = f"{load1:.2f}, {load5:.2f}, {load15:.2f}" + else: + status['load_avg'] = "Not available on this platform" + except Exception as e: + status['load_avg'] = f"Error getting load average: {e}" + + # Get CPU usage (requires psutil for accurate measurement) + try: + import psutil + status['cpu_percent'] = f"{psutil.cpu_percent(interval=1):.1f}" + + # Memory info + memory = psutil.virtual_memory() + status['mem_total'] = format_bytes(memory.total) + status['mem_used'] = format_bytes(memory.used) + status['mem_free'] = format_bytes(memory.available) + status['mem_percent'] = f"{memory.percent:.1f}" + + # Disk info + status['disks'] = [] + for part in psutil.disk_partitions(all=False): + if os.name == 'nt' and ('cdrom' in part.opts or part.fstype == ''): + # Skip CD-ROM drives on Windows + continue + + usage = psutil.disk_usage(part.mountpoint) + status['disks'].append({ + 'device': part.device, + 'mount': part.mountpoint, + 'fstype': part.fstype, + 'total': format_bytes(usage.total), + 'used': format_bytes(usage.used), + 'free': format_bytes(usage.free), + 'percent': f"{usage.percent:.1f}" + }) + + # Get log directory disk space + log_dir = os.path.expanduser("~/.qcmd") + if os.path.exists(log_dir): + try: + usage = psutil.disk_usage(log_dir) + status['log_dir_space'] = { + 'path': log_dir, + 'total': format_bytes(usage.total), + 'used': format_bytes(usage.used), + 'free': format_bytes(usage.free), + 'percent': f"{usage.percent:.1f}" + } + except Exception as e: + status['log_dir_space'] = {'error': str(e)} + + # Network info + status['network'] = {} + net_io = psutil.net_io_counters(pernic=True) + for nic, io in net_io.items(): + status['network'][nic] = f"↑{format_bytes(io.bytes_sent)} ↓{format_bytes(io.bytes_recv)}" + + # Top processes + status['top_processes'] = [] + for proc in sorted(psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_percent']), + key=lambda p: p.info['cpu_percent'] or 0, + reverse=True)[:5]: + try: + status['top_processes'].append({ + 'pid': proc.info['pid'], + 'user': proc.info.get('username', '')[:10], + 'cpu': f"{proc.info.get('cpu_percent', 0):.1f}", + 'mem': f"{proc.info.get('memory_percent', 0):.1f}", + 'command': proc.info.get('name', '') + }) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + + # Find all qcmd processes + status['qcmd_processes'] = [] + for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'create_time', 'status']): + try: + # Look for qcmd processes in different ways + if any(x for x in proc.info['cmdline'] if 'qcmd' in x.lower()): + proc_info = { + 'pid': proc.info['pid'], + 'start_time': datetime.fromtimestamp(proc.info['create_time']).strftime('%Y-%m-%d %H:%M:%S'), + 'status': proc.info['status'], + 'command': ' '.join(proc.info['cmdline']), + 'type': 'unknown' + } + + # Try to determine type of qcmd process + cmdline = ' '.join(proc.info['cmdline']) + if '-s' in cmdline or '--shell' in cmdline: + proc_info['type'] = 'interactive_shell' + elif '--monitor' in cmdline: + proc_info['type'] = 'log_monitor' + elif any(x in cmdline for x in ['--analyze-file', '--logs']): + proc_info['type'] = 'log_analyzer' + + status['qcmd_processes'].append(proc_info) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + + except ImportError: + # Fallback for systems without psutil + status['cpu_percent'] = "psutil not available" + status['mem_total'] = "psutil not available" + status['mem_used'] = "psutil not available" + status['mem_free'] = "psutil not available" + status['mem_percent'] = "N/A" + + # Also add a message about qcmd processes + status['qcmd_processes'] = "psutil not available - cannot detect running processes" + + # Get active sessions from session.py + from qcmd_cli.utils.session import get_active_sessions + sessions = get_active_sessions() + + # Ensure sessions are properly assigned to status + status['active_sessions'] = sessions + + # As a fallback, also scan running processes for QCMD + if 'qcmd_processes' in status and isinstance(status['qcmd_processes'], list) and status['qcmd_processes']: + # If we have processes but no sessions, create session entries from process info + if not sessions: + fallback_sessions = [] + for proc in status['qcmd_processes']: + if proc.get('type') == 'interactive_shell': + fallback_sessions.append({ + 'session_id': f"proc-{proc.get('pid', '0')}", + 'type': proc.get('type', 'unknown'), + 'pid': proc.get('pid', 0), + 'start_time': proc.get('start_time', 'Unknown') + }) + if fallback_sessions: + status['active_sessions'] = fallback_sessions + + # Get active monitors + from qcmd_cli.log_analysis.monitor import get_active_monitors + monitors = get_active_monitors() + status['active_monitors'] = monitors + + return status + +def format_bytes(bytes_value: int) -> str: + """ + Format bytes value to human-readable string. + + Args: + bytes_value: Number of bytes + + Returns: + Formatted string (e.g., "4.2 GB") + """ + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_value < 1024 or unit == 'TB': + return f"{bytes_value:.1f} {unit}" + bytes_value /= 1024 + +def is_process_running(pid: int) -> bool: + """ + Check if a process is running. + + Args: + pid: Process ID + + Returns: + True if process is running, False otherwise + """ + try: + # Send signal 0 to the process - doesn't kill it, just checks if it exists + os.kill(pid, 0) + return True + except OSError: + return False + except Exception: + return False + +def which(command: str) -> Optional[str]: + """ + Find the path to an executable. + + Args: + command: Command name + + Returns: + Path to the executable, or None if not found + """ + return shutil.which(command) + +def check_for_updates(): + """ + Check if there is a newer version of QCMD available on PyPI. + + Returns: + dict: A dictionary containing update information: + - 'current_version': The currently installed version + - 'latest_version': The latest version available on PyPI + - 'update_available': Boolean indicating if an update is available + - 'update_url': URL to the package on PyPI + - 'error': Error message if any issue occurred during check + """ + result = { + 'current_version': None, + 'latest_version': None, + 'update_available': False, + 'update_url': 'https://pypi.org/project/ibrahimiq-qcmd/', + 'error': None + } + + try: + # Get current version + try: + current_version = get_version('ibrahimiq-qcmd') + result['current_version'] = current_version + except Exception as e: + result['error'] = f"Failed to get current version: {str(e)}" + return result + + # Get latest version from PyPI + try: + pypi_url = 'https://pypi.org/pypi/ibrahimiq-qcmd/json' + response = requests.get(pypi_url, timeout=5) + if response.status_code == 200: + data = response.json() + latest_version = data['info']['version'] + result['latest_version'] = latest_version + + # Compare versions + # Convert versions to tuples of integers for proper comparison + def parse_version(version_str): + return tuple(map(int, re.findall(r'\d+', version_str))) + + current_tuple = parse_version(current_version) + latest_tuple = parse_version(latest_version) + + result['update_available'] = latest_tuple > current_tuple + else: + result['error'] = f"Failed to fetch PyPI data: HTTP {response.status_code}" + except Exception as e: + result['error'] = f"Failed to check for updates: {str(e)}" + + except Exception as e: + result['error'] = f"Unexpected error during update check: {str(e)}" + + return result + +def display_update_status(): + """ + Display information about available updates in a user-friendly format. + + Returns: + bool: True if updates are available, False otherwise + """ + from qcmd_cli.ui.colors import Colors # Import locally to avoid circular imports + + update_info = check_for_updates() + + if update_info['error']: + print(f"{Colors.YELLOW}Could not check for updates: {update_info['error']}{Colors.END}") + return False + + if update_info['update_available']: + print(f"\n{Colors.GREEN}┌─ QCMD Update Available ────────────────────────────────────┐{Colors.END}") + print(f"{Colors.GREEN}│{Colors.END} Current version: {Colors.YELLOW}{update_info['current_version']}{Colors.END}") + print(f"{Colors.GREEN}│{Colors.END} Latest version: {Colors.CYAN}{update_info['latest_version']}{Colors.END}") + print(f"{Colors.GREEN}│{Colors.END}") + print(f"{Colors.GREEN}│{Colors.END} To update, run: {Colors.CYAN}pip install --upgrade ibrahimiq-qcmd{Colors.END}") + print(f"{Colors.GREEN}└────────────────────────────────────────────────────────────┘{Colors.END}") + return True + + return False + +# For testing only +if __name__ == "__main__": + status = get_system_status() + for key, value in status.items(): + if isinstance(value, (list, dict)): + print(f"{key}: {len(value)} items") + else: + print(f"{key}: {value}") \ No newline at end of file From 4e310d10e051714fdee14ba2a564b50663e82254 Mon Sep 17 00:00:00 2001 From: ibrahim Date: Fri, 11 Apr 2025 12:31:02 +0000 Subject: [PATCH 2/4] Implemented session management, update checker, test coverage --- qcmd_cli/core/interactive_shell.py | 91 ++++++++++++++++- qcmd_cli/log_analysis/analyzer.py | 85 +++++++++++++-- qcmd_cli/ui/display.py | 76 +++++++++++++- qcmd_cli/utils/session.py | 48 +++++++++ qcmd_cli/utils/system.py | 6 +- tests/test_display.py | 122 ++++++++++++++++++++++ tests/test_session.py | 159 +++++++++++++++++++++++++++++ tests/test_system.py | 155 ++++++++++++++++++++++++++++ 8 files changed, 724 insertions(+), 18 deletions(-) create mode 100644 tests/test_display.py create mode 100644 tests/test_session.py create mode 100644 tests/test_system.py diff --git a/qcmd_cli/core/interactive_shell.py b/qcmd_cli/core/interactive_shell.py index 9ec2b58..0c1d513 100644 --- a/qcmd_cli/core/interactive_shell.py +++ b/qcmd_cli/core/interactive_shell.py @@ -22,12 +22,40 @@ from qcmd_cli.core.command_generator import generate_command, is_dangerous_command, list_models, fix_command from qcmd_cli.utils.history import save_to_history, load_history, show_history from qcmd_cli.utils.system import execute_command, get_system_status, display_update_status -from qcmd_cli.log_analysis.analyzer import handle_log_analysis, analyze_log_file +from qcmd_cli.log_analysis.log_files import handle_log_analysis +from qcmd_cli.log_analysis.analyzer import analyze_log_file from qcmd_cli.ui.display import display_system_status from qcmd_cli.utils.ollama import is_ollama_running # Setup session tracking -from qcmd_cli.utils.session import create_session, update_session_activity, cleanup_stale_sessions +from qcmd_cli.utils.session import create_session, update_session_activity, cleanup_stale_sessions, end_session + +class SimpleCompleter: + """ + Simple command completion for the interactive shell. + """ + def __init__(self, options): + self.options = options + + def complete(self, text, state): + """ + Return state'th completion starting with text. + """ + response = None + if state == 0: + # This is the first time for this text, so build a match list + if text: + self.matches = [s for s in self.options if s and s.startswith(text)] + else: + self.matches = self.options[:] + + # Return the state'th item from the match list, if we have that many + try: + response = self.matches[state] + except IndexError: + return None + + return response def start_interactive_shell( auto_mode_enabled: bool = False, @@ -97,7 +125,6 @@ def start_interactive_shell( def signal_handler(sig, frame): print(f"\n{Colors.CYAN}Received signal {sig}, exiting...{Colors.END}") # End the session when receiving a signal - from qcmd_cli.utils.session import end_session try: end_session(session_id) except Exception: @@ -582,4 +609,60 @@ def _display_banner(): print(f"║ {version_text} ║") print(f"║ ║") - print(f"╚═════════════════════════════════════════════════════════════╝") \ No newline at end of file + print(f"╚═════════════════════════════════════════════════════════════╝") + +def auto_mode(prompt: str, model: str = DEFAULT_MODEL, max_attempts: int = 3, temperature: float = 0.7) -> None: + """ + Run in auto-correction mode, automatically fixing errors. + + Args: + prompt: The natural language prompt + model: The model to use + max_attempts: Maximum number of correction attempts + temperature: Temperature parameter for generation + """ + print(f"{Colors.CYAN}Generating command in auto-correction mode...{Colors.END}") + + # Generate initial command + command = generate_command(prompt, model, temperature) + + if not command: + print(f"{Colors.RED}Failed to generate a command.{Colors.END}") + return + + for attempt in range(1, max_attempts + 1): + if attempt > 1: + print(f"\n{Colors.CYAN}Attempt {attempt}/{max_attempts}:{Colors.END}") + + print(f"\n{Colors.CYAN}Generated command:{Colors.END}") + print(f"{Colors.GREEN}{command}{Colors.END}\n") + + # Execute the command + print(f"{Colors.CYAN}Executing command...{Colors.END}\n") + return_code, output = execute_command(command) + + if return_code == 0: + print(f"\n{Colors.GREEN}Command executed successfully.{Colors.END}") + if output: + print(f"\n{Colors.BOLD}Output:{Colors.END}") + print(output) + return + else: + print(f"\n{Colors.RED}Command failed with return code {return_code}{Colors.END}") + if output: + print(f"\n{Colors.BOLD}Error output:{Colors.END}") + print(output) + + # Don't attempt to fix if we've reached max attempts + if attempt >= max_attempts: + print(f"\n{Colors.YELLOW}Maximum correction attempts reached. Giving up.{Colors.END}") + return + + # Try to fix the command + print(f"\n{Colors.CYAN}Attempting to fix the command...{Colors.END}") + command = fix_command(command, output, model) + + # Add a small delay to make the process more readable + time.sleep(1) + + diff --git a/qcmd_cli/log_analysis/analyzer.py b/qcmd_cli/log_analysis/analyzer.py index 0b57b5a..a348bf7 100644 --- a/qcmd_cli/log_analysis/analyzer.py +++ b/qcmd_cli/log_analysis/analyzer.py @@ -10,10 +10,41 @@ from typing import List, Dict, Optional, Tuple, Any # Import from local modules once they are created -# from ..config.settings import DEFAULT_MODEL +from ..config.settings import DEFAULT_MODEL +from ..ui.display import Colors -# For now, define defaults here -DEFAULT_MODEL = "llama3:latest" +def handle_log_analysis(model: str = DEFAULT_MODEL, specific_file: str = None) -> None: + """ + Handle log analysis workflow - prompting user to select log files and analyzing them. + + Args: + model: Model to use for analysis + specific_file: Optional specific file to analyze + """ + if specific_file: + if os.path.exists(specific_file): + print(f"\n{Colors.GREEN}Analyzing log file: {specific_file}{Colors.END}") + analyze_log_file(specific_file, model) + else: + print(f"\n{Colors.RED}File not found: {specific_file}{Colors.END}") + return + + # Import here to avoid circular imports + from .log_files import find_log_files, select_log_file + + # Find log files + log_files = find_log_files() + + if not log_files: + print(f"\n{Colors.YELLOW}No log files found.{Colors.END}") + print(f"Try specifying a path with: qcmd logs /path/to/logs") + return + + # Let user select a log file + selected_file = select_log_file(log_files) + + if selected_file: + analyze_log_file(selected_file, model) def analyze_log_file(log_file: str, model: str = DEFAULT_MODEL, background: bool = False, analyze: bool = True) -> None: """ @@ -25,8 +56,25 @@ def analyze_log_file(log_file: str, model: str = DEFAULT_MODEL, background: bool background: Whether to run in background mode analyze: Whether to perform analysis (vs just monitoring) """ - # Implementation will be moved from original qcmd.py - pass + # Check if file exists + if not os.path.exists(log_file): + print(f"{Colors.RED}Error: File {log_file} not found.{Colors.END}") + return + + print(f"\n{Colors.CYAN}Analyzing log file: {log_file}{Colors.END}") + + # Read file content + try: + content = read_large_file(log_file) + if not content: + print(f"{Colors.YELLOW}Log file is empty.{Colors.END}") + return + + # Perform analysis + analyze_log_content(content, log_file, model) + + except Exception as e: + print(f"{Colors.RED}Error analyzing log file: {str(e)}{Colors.END}") def analyze_log_content(log_content: str, log_file: str, model: str = DEFAULT_MODEL) -> None: """ @@ -37,8 +85,21 @@ def analyze_log_content(log_content: str, log_file: str, model: str = DEFAULT_MO log_file: Path to the log file (for reference) model: Model to use for analysis """ - # Implementation will be moved from original qcmd.py - pass + print(f"\n{Colors.CYAN}Analyzing log content using {model}...{Colors.END}") + + # Basic implementation - in a real application, this would use an LLM via Ollama API + print(f"\n{Colors.GREEN}Log Analysis Results:{Colors.END}") + print(f"File: {log_file}") + print(f"Size: {len(log_content)} bytes") + + # Count lines and errors (simple heuristic) + lines = log_content.splitlines() + error_count = sum(1 for line in lines if "error" in line.lower() or "exception" in line.lower()) + + print(f"Total lines: {len(lines)}") + print(f"Potential errors/exceptions: {error_count}") + + # In a complete implementation, we would call the LLM to analyze the log content def read_large_file(file_path: str, chunk_size: int = 1024 * 1024) -> str: """ @@ -51,5 +112,11 @@ def read_large_file(file_path: str, chunk_size: int = 1024 * 1024) -> str: Returns: Content of the file as a string """ - # Implementation will be moved from original qcmd.py - pass \ No newline at end of file + content = [] + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + while True: + chunk = file.read(chunk_size) + if not chunk: + break + content.append(chunk) + return "".join(content) \ No newline at end of file diff --git a/qcmd_cli/ui/display.py b/qcmd_cli/ui/display.py index d1fe206..9f8a002 100644 --- a/qcmd_cli/ui/display.py +++ b/qcmd_cli/ui/display.py @@ -7,7 +7,7 @@ import time import re import shutil -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any class Colors: """ @@ -224,4 +224,76 @@ def display_help_command(current_model: str, current_temperature: float, auto_mo - Type 'y' to execute, 'n' to reject, or edit the command - Use !exit or Ctrl+D to quit """ - print(help_text) \ No newline at end of file + print(help_text) + +def clear_screen(): + """ + Clear the terminal screen. + """ + # Clear command based on OS + if os.name == 'nt': # For Windows + os.system('cls') + else: # For Linux/Mac + os.system('clear') + +def display_system_status(status: Dict[str, Any]) -> None: + """ + Display detailed system status information. + + Args: + status: Dictionary with system status information + """ + # Print divider line + print(f"\n{Colors.CYAN}{'-' * 80}{Colors.END}") + + # System information + print(f"\n{Colors.RED}{Colors.BOLD}System Information:{Colors.END}") + print(f" {Colors.BLUE}OS:{Colors.END} {status.get('os', 'Unknown')}") + print(f" {Colors.BLUE}Python Version:{Colors.END} {status.get('python_version', 'Unknown')}") + print(f" {Colors.BLUE}QCMD Version:{Colors.END} {status.get('qcmd_version', 'Unknown')}") + print(f" {Colors.BLUE}Current Time:{Colors.END} {status.get('time', 'Unknown')}") + + # Ollama information + if 'ollama' in status: + ollama = status['ollama'] + print(f"\n{Colors.RED}{Colors.BOLD}Ollama Status:{Colors.END}") + + # Check if Ollama is running + if ollama.get('status', '') == 'running': + print(f" {Colors.BLUE}Status:{Colors.END} {Colors.GREEN}Running{Colors.END}") + else: + print(f" {Colors.BLUE}Status:{Colors.END} {Colors.RED}Not Running{Colors.END}") + if 'error' in ollama: + print(f" {Colors.BLUE}Error:{Colors.END} {ollama['error']}") + + print(f" {Colors.BLUE}API URL:{Colors.END} {ollama.get('api_url', 'Unknown')}") + + # List available models + if 'models' in ollama and ollama['models']: + print(f" {Colors.BLUE}Available Models:{Colors.END}") + for model in ollama['models']: + print(f" - {model}") + elif ollama.get('status', '') == 'running': + print(f" {Colors.BLUE}Available Models:{Colors.END} No models found") + + # Active monitors + if 'active_monitors' in status and status['active_monitors']: + print(f"\n{Colors.RED}{Colors.BOLD}Active Log Monitors:{Colors.END}") + for monitor in status['active_monitors']: + print(f" - {monitor}") + + # Active sessions + if 'active_sessions' in status and status['active_sessions']: + print(f"\n{Colors.RED}{Colors.BOLD}Active Sessions:{Colors.END}") + for session in status['active_sessions']: + print(f" - {session}") + + # Disk space + if 'disk' in status: + disk = status['disk'] + print(f"\n{Colors.RED}{Colors.BOLD}Disk Space:{Colors.END}") + print(f" {Colors.BLUE}Total:{Colors.END} {disk.get('total_gb', 'Unknown')} GB") + print(f" {Colors.BLUE}Used:{Colors.END} {disk.get('used_gb', 'Unknown')} GB ({disk.get('percent_used', 'Unknown')}%)") + print(f" {Colors.BLUE}Free:{Colors.END} {disk.get('free_gb', 'Unknown')} GB") + + print(f"\n{Colors.CYAN}{'-' * 80}{Colors.END}") \ No newline at end of file diff --git a/qcmd_cli/utils/session.py b/qcmd_cli/utils/session.py index a863a70..f0abca9 100644 --- a/qcmd_cli/utils/session.py +++ b/qcmd_cli/utils/session.py @@ -7,6 +7,7 @@ import time import signal import sys +import uuid from typing import Dict, List, Optional, Any from ..config.settings import CONFIG_DIR @@ -14,6 +15,53 @@ # File path for storing session info SESSIONS_FILE = os.path.join(CONFIG_DIR, "sessions.json") +def create_session(session_info: Dict[str, Any]) -> str: + """ + Create a new session and save it to persistent storage. + + Args: + session_info: Dictionary with session information + + Returns: + Session ID as a string + """ + # Generate a unique session ID + session_id = str(uuid.uuid4()) + + # Add metadata + session_info['pid'] = os.getpid() + session_info['created_at'] = time.time() + session_info['last_activity'] = time.time() + + # Save to storage + save_session(session_id, session_info) + + return session_id + +def update_session_activity(session_id: str) -> bool: + """ + Update the last activity timestamp for a session. + + Args: + session_id: ID of the session to update + + Returns: + True if successful, False otherwise + """ + try: + sessions = load_sessions() + if session_id in sessions: + sessions[session_id]['last_activity'] = time.time() + + with open(SESSIONS_FILE, 'w') as f: + json.dump(sessions, f, indent=2) + + return True + return False + except Exception as e: + print(f"Error updating session activity: {e}", file=sys.stderr) + return False + def save_session(session_id, session_info): """ Save session information to persistent storage. diff --git a/qcmd_cli/utils/system.py b/qcmd_cli/utils/system.py index b4ed1f7..e3ad312 100644 --- a/qcmd_cli/utils/system.py +++ b/qcmd_cli/utils/system.py @@ -340,7 +340,7 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = Non Tuple of (exit_code, output) """ print(f"\n{Colors.CYAN}Executing:{Colors.END} {Colors.GREEN}{command}{Colors.END}") - + try: # Run the command and capture output process = subprocess.Popen( @@ -360,14 +360,14 @@ def execute_command(command: str, analyze_errors: bool = False, model: str = Non process.wait() exit_code = process.returncode output = ''.join(output_lines) - + return exit_code, output except Exception as e: error_msg = f"Error executing command: {str(e)}" print(f"{Colors.RED}{error_msg}{Colors.END}") return 1, error_msg - + def format_bytes(bytes_value): """ Format byte values to human-readable format. diff --git a/tests/test_display.py b/tests/test_display.py new file mode 100644 index 0000000..e8946a4 --- /dev/null +++ b/tests/test_display.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +""" +Tests for UI display functionality. +""" + +import unittest +import os +import sys +from unittest.mock import patch, MagicMock, call + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +from qcmd_cli.ui.display import ( + Colors, display_system_status, display_help_command, + clear_screen, print_cool_header +) + + +class TestDisplayFunctions(unittest.TestCase): + """Test the display functions in the UI module.""" + + @patch('builtins.print') + def test_display_system_status(self, mock_print): + """Test that system status is displayed correctly.""" + # Mock system status data + status_data = { + 'os': 'Linux 6.1.0-kali1-amd64', + 'python_version': '3.11.2', + 'qcmd_version': '0.4.1', + 'ollama_status': { + 'running': True, + 'version': '0.1.19', + 'models': ['llama3', 'phi3'] + }, + 'cpu_info': { + 'cores': 8, + 'usage': 35.2 + }, + 'memory_info': { + 'total': 16000000000, + 'available': 9000000000, + 'percent': 44.5 + }, + 'disk_info': { + 'total': 250000000000, + 'free': 150000000000, + 'percent': 40.0 + } + } + + # Call the function + display_system_status(status_data) + + # Verify that print was called + mock_print.assert_called() + + # Just verify that some of the data was used in print calls + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('Linux 6.1.0-kali1-amd64', all_print_output) + self.assertIn('3.11.2', all_print_output) + self.assertIn('0.4.1', all_print_output) + + @patch('builtins.print') + def test_display_help_command(self, mock_print): + """Test that help command is displayed correctly.""" + # Call the function + display_help_command("llama3", 0.7, False, 3) + + # Verify that print was called + mock_print.assert_called() + + # Just verify that some of the expected data appears in the output + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('QCMD', all_print_output) + self.assertIn('llama3', all_print_output) + self.assertIn('0.7', all_print_output) + + @patch('os.system') + def test_clear_screen(self, mock_system): + """Test clear_screen function.""" + # Call the function + clear_screen() + + # Verify system call was made + mock_system.assert_called_once() + + @patch('builtins.print') + def test_print_cool_header(self, mock_print): + """Test print_cool_header function.""" + # Call the function + print_cool_header() + + # Verify header was printed + mock_print.assert_called() + + # Check that the output includes ASCII art - look for typical parts + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('█', all_print_output) + self.assertIn('Iraqi Excellence', all_print_output) + + @patch('builtins.print') + def test_colors(self, mock_print): + """Test that the Colors class works correctly.""" + # Test using colors + print(f"{Colors.RED}Red Text{Colors.END}") + print(f"{Colors.GREEN}Green Text{Colors.END}") + print(f"{Colors.BOLD}Bold Text{Colors.END}") + + # Verify that print was called + mock_print.assert_called() + + # Check that the text appears in output + all_print_output = ''.join(str(call) for call in mock_print.call_args_list) + self.assertIn('Red Text', all_print_output) + self.assertIn('Green Text', all_print_output) + self.assertIn('Bold Text', all_print_output) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..d655623 --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Tests for session management functionality. +""" + +import unittest +import os +import sys +import json +import tempfile +import time +from unittest.mock import patch, MagicMock + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +from qcmd_cli.utils.session import ( + save_session, load_sessions, create_session, update_session_activity, + end_session, cleanup_stale_sessions, is_process_running +) +from qcmd_cli.config.settings import CONFIG_DIR + + +class TestSessionManagement(unittest.TestCase): + """Test the session management functionality.""" + + def setUp(self): + """Set up a temporary sessions file for testing.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.sessions_file = os.path.join(self.temp_dir.name, "sessions.json") + self.sessions_patch = patch('qcmd_cli.utils.session.SESSIONS_FILE', self.sessions_file) + self.sessions_patch.start() + + def tearDown(self): + """Clean up temporary files and patches.""" + self.sessions_patch.stop() + self.temp_dir.cleanup() + + def test_save_and_load_session(self): + """Test that a session can be saved and loaded.""" + test_session = { + "type": "test_session", + "model": "test-model", + "start_time": "2023-01-01 12:00:00", + "pid": 12345 + } + + # Test saving + result = save_session("test-session-id", test_session) + self.assertTrue(result) + + # Verify file was created + self.assertTrue(os.path.exists(self.sessions_file)) + + # Test loading + sessions = load_sessions() + self.assertIn("test-session-id", sessions) + self.assertEqual(sessions["test-session-id"]["type"], "test_session") + self.assertEqual(sessions["test-session-id"]["model"], "test-model") + + def test_create_session(self): + """Test creating a new session.""" + session_info = { + "type": "interactive_shell", + "model": "llama3" + } + + with patch('os.getpid', return_value=54321): + session_id = create_session(session_info) + + # Verify session ID format (should be a UUID) + self.assertTrue(len(session_id) > 30) + + # Verify session was saved + sessions = load_sessions() + self.assertIn(session_id, sessions) + + # Verify metadata was added + saved_session = sessions[session_id] + self.assertEqual(saved_session["type"], "interactive_shell") + self.assertEqual(saved_session["model"], "llama3") + self.assertEqual(saved_session["pid"], 54321) + self.assertIn("created_at", saved_session) + self.assertIn("last_activity", saved_session) + + def test_update_session_activity(self): + """Test updating session activity timestamp.""" + # Create a session + test_session = { + "type": "test_session", + "last_activity": time.time() - 1000 # Set this to the past + } + save_session("activity-test-id", test_session) + + # Update activity + old_time = test_session["last_activity"] + result = update_session_activity("activity-test-id") + + # Verify result + self.assertTrue(result) + + # Verify timestamp was updated + sessions = load_sessions() + new_time = sessions["activity-test-id"]["last_activity"] + self.assertGreater(new_time, old_time) + + def test_end_session(self): + """Test ending a session.""" + # Create two sessions + save_session("session-to-end", {"type": "test"}) + save_session("session-to-keep", {"type": "test"}) + + # End one session + result = end_session("session-to-end") + + # Verify result + self.assertTrue(result) + + # Verify session was removed + sessions = load_sessions() + self.assertNotIn("session-to-end", sessions) + self.assertIn("session-to-keep", sessions) + + def test_cleanup_stale_sessions(self): + """Test cleaning up stale sessions.""" + # Create sessions with different PIDs + save_session("active-session", {"pid": os.getpid()}) + save_session("stale-session", {"pid": 99999}) # Unlikely to exist + + # Mock is_process_running for testing + original_is_process_running = is_process_running + try: + # Replace with mock implementation + def mock_is_process_running(pid): + return pid == os.getpid() + + # Monkey patch the function + globals()['is_process_running'] = mock_is_process_running + + # Call cleanup + active_sessions = cleanup_stale_sessions() + + # Verify only active session remains + self.assertIn("active-session", active_sessions) + self.assertNotIn("stale-session", active_sessions) + + # Verify file was updated + sessions = load_sessions() + self.assertIn("active-session", sessions) + self.assertNotIn("stale-session", sessions) + + finally: + # Restore original function + globals()['is_process_running'] = original_is_process_running + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_system.py b/tests/test_system.py new file mode 100644 index 0000000..88c8a13 --- /dev/null +++ b/tests/test_system.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Tests for system utilities module. +""" + +import unittest +import os +import sys +import json +import tempfile +from unittest.mock import patch, MagicMock + +# Add parent directory to path so we can import modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import functions to test +from qcmd_cli.utils.system import ( + check_for_updates, display_update_status, + execute_command, format_bytes +) + + +class TestSystemUtilities(unittest.TestCase): + """Test the system utilities functionality.""" + + def test_format_bytes(self): + """Test the format_bytes function.""" + # Test different byte sizes + self.assertEqual(format_bytes(500), "500.00 B") + self.assertEqual(format_bytes(1024), "1.00 KB") + self.assertEqual(format_bytes(1024 * 1024), "1.00 MB") + self.assertEqual(format_bytes(1024 * 1024 * 1024), "1.00 GB") + self.assertEqual(format_bytes(1024 * 1024 * 1024 * 1024), "1.00 TB") + + def test_execute_command(self): + """Test the execute_command function.""" + # Test a simple command + exit_code, output = execute_command("echo 'test command'") + self.assertEqual(exit_code, 0) + self.assertIn("test command", output) + + # Test a failing command + exit_code, output = execute_command("command_that_does_not_exist") + self.assertNotEqual(exit_code, 0) + + @patch('requests.get') + def test_check_for_updates_newer_version(self, mock_get): + """Test check_for_updates when a newer version is available.""" + # Mock the response from PyPI + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'info': { + 'version': '1.1.0' # Newer version than current + } + } + mock_get.return_value = mock_response + + # Patch the current version + with patch('qcmd_cli.utils.system.__version__', '1.0.0'): + # Call the function + result = check_for_updates(force_display=False) + + # Verify result + self.assertTrue(result['update_available']) + self.assertEqual(result['current_version'], '1.0.0') + self.assertEqual(result['latest_version'], '1.1.0') + + @patch('requests.get') + def test_check_for_updates_same_version(self, mock_get): + """Test check_for_updates when the current version is the latest.""" + # Mock the response from PyPI + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'info': { + 'version': '1.0.0' # Same as current + } + } + mock_get.return_value = mock_response + + # Patch the current version + with patch('qcmd_cli.utils.system.__version__', '1.0.0'): + # Call the function + result = check_for_updates(force_display=False) + + # Verify result + self.assertFalse(result['update_available']) + self.assertEqual(result['current_version'], '1.0.0') + self.assertEqual(result['latest_version'], '1.0.0') + + @patch('requests.get') + def test_check_for_updates_connection_error(self, mock_get): + """Test check_for_updates when a connection error occurs.""" + # Mock a connection error + mock_get.side_effect = Exception("Connection error") + + # Call the function + result = check_for_updates(force_display=False) + + # Verify result + self.assertIsNone(result) + + @patch('qcmd_cli.utils.system.check_for_updates') + @patch('qcmd_cli.utils.system.print') + def test_display_update_status_with_update(self, mock_print, mock_check): + """Test display_update_status when an update is available.""" + # Mock the update check to return an update is available + mock_check.return_value = { + 'update_available': True, + 'current_version': '1.0.0', + 'latest_version': '1.1.0' + } + + # Call the function + display_update_status() + + # Verify display was called + mock_print.assert_called() # At least one call to print + + @patch('qcmd_cli.utils.system.check_for_updates') + @patch('qcmd_cli.utils.system.print') + def test_display_update_status_no_update(self, mock_print, mock_check): + """Test display_update_status when no update is available.""" + # Mock the update check to return no update is available + mock_check.return_value = { + 'update_available': False, + 'current_version': '1.0.0', + 'latest_version': '1.0.0' + } + + # Call the function + display_update_status() + + # Verify display was not called (no message needed) + self.assertEqual(mock_print.call_count, 0) + + @patch('qcmd_cli.utils.system.load_config') + def test_display_update_status_disabled(self, mock_load_config): + """Test display_update_status when updates are disabled in config.""" + # Mock the config to disable update checks + mock_load_config.return_value = { + 'disable_update_check': True + } + + # Call the function + with patch('qcmd_cli.utils.system.check_for_updates') as mock_check: + display_update_status() + + # Verify check_for_updates was not called + mock_check.assert_not_called() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From b1c666757af087315e456df3aecaaf34c9a043ff Mon Sep 17 00:00:00 2001 From: ibrahim Date: Fri, 11 Apr 2025 13:10:03 +0000 Subject: [PATCH 3/4] Fix: Make status display consistent between CLI and interactive shell --- qcmd_cli/core/interactive_shell.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/qcmd_cli/core/interactive_shell.py b/qcmd_cli/core/interactive_shell.py index 0c1d513..27669bf 100644 --- a/qcmd_cli/core/interactive_shell.py +++ b/qcmd_cli/core/interactive_shell.py @@ -21,10 +21,9 @@ from qcmd_cli.ui.display import Colors, print_cool_header, clear_screen from qcmd_cli.core.command_generator import generate_command, is_dangerous_command, list_models, fix_command from qcmd_cli.utils.history import save_to_history, load_history, show_history -from qcmd_cli.utils.system import execute_command, get_system_status, display_update_status +from qcmd_cli.utils.system import execute_command, get_system_status, display_update_status, display_system_status from qcmd_cli.log_analysis.log_files import handle_log_analysis from qcmd_cli.log_analysis.analyzer import analyze_log_file -from qcmd_cli.ui.display import display_system_status from qcmd_cli.utils.ollama import is_ollama_running # Setup session tracking @@ -196,8 +195,7 @@ def signal_handler(sig, frame): elif user_input.lower() == '/status': # Show system status print(f"\n{Colors.CYAN}Getting system status...{Colors.END}") - status = get_system_status() - display_system_status(status) + display_system_status() continue elif user_input.lower() == '/update': From b6e5666392664be0a8fc319bf6a79ba44866eae4 Mon Sep 17 00:00:00 2001 From: ibrahim Date: Fri, 11 Apr 2025 13:20:34 +0000 Subject: [PATCH 4/4] Bump version to 1.0.15 --- pyproject.toml | 2 +- qcmd_cli/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bddee1f..926a423 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] # This uses the PyPI-optimized README for better display on PyPI name = "ibrahimiq-qcmd" -version = "1.0.0" +version = "1.0.15" authors = [ {name = "Ibrahim IQ", email = "example@example.com"} ] diff --git a/qcmd_cli/__init__.py b/qcmd_cli/__init__.py index 0ff87d2..3c524cd 100644 --- a/qcmd_cli/__init__.py +++ b/qcmd_cli/__init__.py @@ -2,7 +2,7 @@ QCMD CLI - A command-line tool that generates shell commands using Qwen2.5-Coder via Ollama. """ -__version__ = "1.0.0" +__version__ = "1.0.15" # Don't import modules here to avoid circular dependencies