diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..07ded28 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,22 @@ +name: CI + +on: [push, pull_request] + +jobs: + test: + strategy: + matrix: + os: [ubuntu-latest, windows-latest] + python-version: ["3.9", "3.11", "3.12"] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - run: pip install -e ".[dev]" + - run: ruff check keyguard/ tests/ + - run: black --check keyguard/ tests/ + - run: pytest tests/ -v --tb=short + env: + DISPLAY: "" diff --git a/README.md b/README.md index 5aafae9..66fccfd 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,148 @@ -### 🚀 **README.md** +# KeyGuard – Secure Password Manager -# 🔒 KeyGuard – Secure Password Manager +![license](https://img.shields.io/badge/license-Apache%202.0-blue.svg) ![python](https://img.shields.io/badge/python-3.9%2B-blue) -![license](https://img.shields.io/badge/license-Apache%202.0-blue.svg) ![python](https://img.shields.io/badge/python-3.8%2B-blue) - -KeyGuard is a cross-platform, highly secure desktop application designed for managing and safeguarding your passwords. Built with Python's robust cryptography and security best practices, KeyGuard provides seamless encryption, memory protection, and advanced zeroization techniques. +KeyGuard is a cross-platform desktop password manager built with Python. +It uses Argon2id key derivation, ChaCha20-Poly1305 AEAD encryption, and secure memory handling to protect your passwords. --- -## ✨ Key Features +## Features -* **Cryptographically Strong Encryption** – Uses Argon2id and AES-GCM to securely encrypt your data. -* **Secure Memory Handling** – Implements zeroization and obfuscation techniques to ensure passwords and keys aren't exposed in memory. -* **Master Password Management** – Allows secure changing of the master password, automatically re-encrypting the vault. -* **Detailed Password Viewer** – Password masking by default with secure toggling visibility. -* **Interactive Menu** – User-friendly interface with built-in password strength analysis. -* **Portable Executable** – Easily build and distribute as a single-file binary via PyInstaller. +- **Argon2id + ChaCha20-Poly1305** — industry-standard KDF and AEAD +- **Self-descriptive vault header** — KDF parameters stored in the vault (v4 format) +- **Configurable KDF profiles** — `compat` (64 MiB), `balanced` (256 MiB), `high` (512 MiB) +- **Secure memory** — mlock/VirtualLock, multi-pass wipe, key obfuscation +- **Cross-platform** — Windows + Linux (XDG-compliant directories via `platformdirs`) +- **Auto-migration** — v3 vaults and legacy `~/.keyguard3` directories migrate automatically +- **Clipboard safety** — auto-clears clipboard after 15 seconds +- **GUI** — Tkinter/ttkbootstrap with vault viewer, search, drag-and-drop reorder --- -## 📦 Getting Started +## Getting Started ### Requirements -* Python 3.8 or higher ([download](https://www.python.org/downloads/)) -* Dependencies: `ttkbootstrap`, `cryptography`, `argon2-cffi`, `psutil` +- Python 3.9+ +- Dependencies: `argon2-cffi`, `cryptography`, `psutil`, `ttkbootstrap`, `platformdirs` + +### Install ```bash -# Clone repository -git clone [https://github.com/Crypt-Guard/KeyGuard.git] +git clone https://github.com/Crypt-Guard/KeyGuard.git cd KeyGuard -# Create a virtual environment (optional but recommended) +# (Optional) virtual environment python -m venv .venv source .venv/bin/activate # Linux/macOS -.\.venv\Scripts\activate # Windows +# .venv\Scripts\activate # Windows -# Install dependencies -pip install -r requirements.txt +# Install +pip install -e ".[dev]" ``` -### Running KeyGuard +### Run ```bash -python KeyGuard/KeyGuard.py +python -m keyguard +# or, after pip install: +keyguard ``` -### Building Standalone Executable +### Run Tests ```bash -pyinstaller --onefile --noconsole --icon=assets/key.ico KeyGuard/KeyGuard.py +pytest tests/ -v ``` -Executable will be available at `dist/KeyGuard.exe`. +### Lint / Format ---- +```bash +pip install ruff black +ruff check keyguard/ tests/ +black --check keyguard/ tests/ +``` -## 🛡️ Security & Privacy +--- -KeyGuard never transmits or exposes your passwords online. All sensitive information is securely encrypted, stored locally, and managed entirely offline. +## Project Structure -| File | Purpose | Encrypted? | -| --------------------------- | ------------------------ | ---------------------------------- | -| `.keyguard/vault.kgv` | Encrypted password vault | ✅ AES-GCM | -| `.keyguard/logKeyGuard.log` | Application error log | ❌ Plain text (no passwords logged) | +``` +keyguard/ + __init__.py # version, dependency check + main.py # entrypoint + config.py # Config, KDF profiles, config.ini I/O + paths.py # cross-platform dirs, legacy migration + logging_setup.py # secure logging + crypto/ + engine.py # CryptoEngine, PasswordGenerator + formats.py # VaultHeaderV3/V4, constants + storage/ + backend.py # atomic writes, backup, locking + vault/ + models.py # VaultEntry + manager.py # VaultManager, v3→v4 migration + ui/ + dialogs.py # SecurePasswordDialog + app.py # KeyGuardApp + views.py # UI builders + util/ + memory.py # SecureMemory, KeyObfuscator, TimedExposure + rate_limit.py # RateLimiter + platform_harden.py # OS hardening (no debug detection) +tests/ + test_crypto.py + test_formats.py + test_storage.py + test_vault.py + test_migration.py + test_memory.py + test_password_gen.py + test_rate_limit.py +``` --- -## 🤝 Contributing +## Data Directories + +| OS | Location | +|---------|-------------------------------------------| +| Linux | `~/.local/share/KeyGuard/` | +| Windows | `%LOCALAPPDATA%\CryptGuard\KeyGuard\` | +| macOS | `~/Library/Application Support/KeyGuard/` | + +Legacy `~/.keyguard3` directories are auto-migrated on first run. -Contributions are welcome! Please follow these steps: +--- + +## Vault Migration (v3 → v4) -1. Fork the repository. -2. Create your feature branch (`git checkout -b feature/your-feature`). -3. Commit your changes (`git commit -m "feat: describe your feature"`). -4. Push your changes (`git push origin feature/your-feature`). -5. Open a pull request. +When KeyGuard opens a v3 vault (`KG3` magic): -All pull requests must pass pre-commit hooks (`black`, `flake8`, `isort`) and include unit tests when applicable. +1. Decrypts using config.ini KDF params as fallback +2. Creates a timestamped backup (`.v3backup-`) +3. Re-saves in v4 format with KDF parameters embedded in the header +4. Future opens use the self-descriptive v4 header (no external config dependency) --- -## 📜 License +## Security & Privacy + +- All data stored locally, never transmitted +- No debugger detection or kill switches (removed in v4.0) +- OS hardening: DEP enforcement (Windows), core dump disable (Linux), DLL restriction +- Secrets never logged; log rotation with restricted permissions -Licensed under [Apache 2.0 License](LICENSE). +| File | Purpose | Encrypted | +|------------------------|------------------------|-----------| +| `vault.kg3` | Password vault | ChaCha20-Poly1305 | +| `vault.kg3.backup` | Automatic backup | ChaCha20-Poly1305 | +| `keyguard.log` | Application log | No (no secrets) | +| `config.ini` | KDF calibration result | No | --- -## 🙏 Acknowledgments +## License -* [Python Cryptography](https://cryptography.io/) -* [Tkinter](https://docs.python.org/3/library/tkinter.html) -* [ttkbootstrap](https://github.com/israel-dryer/ttkbootstrap) -* [PyInstaller](https://www.pyinstaller.org/) +[Apache 2.0](LICENSE) diff --git a/keyguard/__init__.py b/keyguard/__init__.py new file mode 100644 index 0000000..5274252 --- /dev/null +++ b/keyguard/__init__.py @@ -0,0 +1,17 @@ +"""KeyGuard - Secure Password Manager.""" + +__version__ = "4.0.0" +__all__ = ["__version__"] + + +def check_dependencies(): + """Halt with a clear message if a critical dependency is missing.""" + import importlib.util + import sys + + required = ["psutil", "ttkbootstrap", "cryptography", "argon2", "platformdirs"] + missing = [pkg for pkg in required if importlib.util.find_spec(pkg) is None] + if missing: + print("ERROR: Missing dependencies ->", ", ".join(missing)) + print("Install with: pip install " + " ".join(missing)) + sys.exit(1) diff --git a/keyguard/__main__.py b/keyguard/__main__.py new file mode 100644 index 0000000..61c6b5a --- /dev/null +++ b/keyguard/__main__.py @@ -0,0 +1,5 @@ +"""Allow running as ``python -m keyguard``.""" + +from keyguard.main import main + +main() diff --git a/keyguard/config.py b/keyguard/config.py new file mode 100644 index 0000000..6b2f423 --- /dev/null +++ b/keyguard/config.py @@ -0,0 +1,216 @@ +"""Centralised configuration, KDF profiles, and config.ini I/O.""" + +from __future__ import annotations + +import configparser +import logging +import multiprocessing +import os +import secrets +import string +import tempfile +import time +from pathlib import Path + +import psutil + +logger = logging.getLogger("keyguard.config") + + +# ============================================================================ +# KDF profiles (compat / balanced / high) +# ============================================================================ +KDF_PROFILES = { + "compat": { + "time_cost": 3, + "memory_cost": 65_536, # 64 MiB + "parallelism": 2, + }, + "balanced": { + "time_cost": 4, + "memory_cost": 262_144, # 256 MiB + "parallelism": min(4, multiprocessing.cpu_count() or 2), + }, + "high": { + "time_cost": 6, + "memory_cost": 524_288, # 512 MiB + "parallelism": min(8, multiprocessing.cpu_count() or 2), + }, +} + +# Security floor: never go below the compat profile +_KDF_FLOOR = KDF_PROFILES["compat"] + + +# ============================================================================ +# Character-set constants (password generation) +# ============================================================================ +CHARSETS = { + "numbers": string.digits, + "letters": string.ascii_letters, + "alphanumeric": string.ascii_letters + string.digits, + "full": string.ascii_letters + string.digits + "!@#$%^&*()_+-=[]{}|;:,.<>?", +} +OPT_TO_KEY = {1: "numbers", 2: "letters", 3: "alphanumeric", 4: "full"} +MIN_TOTAL_BITS = 64 +MIN_CLASS_BITS = 2 + + +# ============================================================================ +# Config class +# ============================================================================ +class Config: + """Centralised settings.""" + + # UI + AUTO_HIDE_DELAY = 10_000 # ms + MIN_MASTER_PASSWORD_LENGTH = 12 + DEFAULT_PASSWORD_LENGTH = 20 + MIN_GENERATED_PASSWORD_LENGTH = 4 + MAX_GENERATED_PASSWORD_LENGTH = 128 + CLIPBOARD_TIMEOUT = 15 # seconds + + # Security + MAX_VAULT_SIZE = 10 * 1024 * 1024 # 10 MB + SESSION_TIMEOUT = 300 # seconds + + # Performance + ENTROPY_CACHE_SIZE = 100 + + # ------------------------------------------------------------------ + # KDF helpers + # ------------------------------------------------------------------ + @staticmethod + def get_kdf_params(data_dir: Path | None = None) -> dict: + """Read KDF params from config.ini, enforcing a security floor.""" + if data_dir is None: + from keyguard.paths import get_data_dir + + data_dir = get_data_dir() + + config_path = data_dir / "config.ini" + try: + if config_path.exists(): + cfg = configparser.ConfigParser() + cfg.read(config_path) + pars = { + "time_cost": cfg.getint( + "kdf", "time_cost", fallback=_KDF_FLOOR["time_cost"] + ), + "memory_cost": cfg.getint( + "kdf", "memory_cost", fallback=_KDF_FLOOR["memory_cost"] + ), + "parallelism": cfg.getint( + "kdf", "parallelism", fallback=_KDF_FLOOR["parallelism"] + ), + } + # Enforce security floor (compat profile) + pars["memory_cost"] = max(pars["memory_cost"], _KDF_FLOOR["memory_cost"]) + pars["time_cost"] = max(pars["time_cost"], _KDF_FLOOR["time_cost"]) + pars["parallelism"] = max(pars["parallelism"], 2) + return pars + except Exception: + pass + return dict(_KDF_FLOOR) + + @staticmethod + def calibrate_kdf(data_dir: Path, target_ms: int = 1000) -> None: + """Select the highest KDF profile the hardware supports.""" + import argon2 + import argon2.low_level as low + + ram_total = psutil.virtual_memory().total + ram_cap = ram_total * 3 // 4 + cores = multiprocessing.cpu_count() or 2 + + salt = secrets.token_bytes(16) + pw = b"benchmark" + + best_profile = "compat" + best_params = dict(KDF_PROFILES["compat"]) + + for name in ("compat", "balanced", "high"): + profile = KDF_PROFILES[name] + mem_bytes = profile["memory_cost"] * 1024 + if mem_bytes > ram_cap: + logger.info("Skipping profile '%s': exceeds RAM cap", name) + continue + + par = min(profile["parallelism"], cores) + try: + t0 = time.perf_counter() + low.hash_secret_raw( + pw, + salt, + time_cost=profile["time_cost"], + memory_cost=profile["memory_cost"], + parallelism=par, + hash_len=32, + type=argon2.Type.ID, + ) + dt = (time.perf_counter() - t0) * 1_000 + best_profile = name + best_params = { + "time_cost": profile["time_cost"], + "memory_cost": profile["memory_cost"], + "parallelism": par, + } + logger.info( + "Profile '%s' OK: t=%d m=%d KiB p=%d (%.0f ms)", + name, + profile["time_cost"], + profile["memory_cost"], + par, + dt, + ) + except (MemoryError, OSError): + logger.warning("Profile '%s' failed (not enough RAM)", name) + break + + # Atomic write of config.ini + _write_config(data_dir, best_params) + logger.info("KDF calibrated: selected profile '%s'", best_profile) + + @staticmethod + def config_exists(data_dir: Path) -> bool: + return (data_dir / "config.ini").exists() + + +# ============================================================================ +# Atomic config writer +# ============================================================================ +def _write_config(data_dir: Path, kdf_params: dict) -> None: + data_dir.mkdir(parents=True, exist_ok=True) + if os.name != "nt": + try: + os.chmod(data_dir, 0o700) + except OSError: + pass + + config_path = data_dir / "config.ini" + cfg = configparser.ConfigParser() + cfg["kdf"] = { + "time_cost": str(kdf_params["time_cost"]), + "memory_cost": str(kdf_params["memory_cost"]), + "parallelism": str(kdf_params["parallelism"]), + } + + fd = tempfile.NamedTemporaryFile( + mode="w", dir=data_dir, prefix="cfg_tmp_", suffix=".ini", delete=False + ) + try: + cfg.write(fd) + fd.flush() + os.fsync(fd.fileno()) + fd.close() + tmp = Path(fd.name) + if os.name != "nt": + os.chmod(tmp, 0o600) + tmp.replace(config_path) + except BaseException: + fd.close() + try: + Path(fd.name).unlink(missing_ok=True) + except OSError: + pass + raise diff --git a/keyguard/crypto/__init__.py b/keyguard/crypto/__init__.py new file mode 100644 index 0000000..e2ebbaf --- /dev/null +++ b/keyguard/crypto/__init__.py @@ -0,0 +1,20 @@ +"""KeyGuard cryptographic modules.""" + +from keyguard.crypto.engine import CryptoEngine, PasswordGenerator +from keyguard.crypto.formats import ( + MAGIC_V3, + MAGIC_V4, + VaultHeaderV3, + VaultHeaderV4, + parse_vault_header, +) + +__all__ = [ + "CryptoEngine", + "PasswordGenerator", + "MAGIC_V3", + "MAGIC_V4", + "VaultHeaderV3", + "VaultHeaderV4", + "parse_vault_header", +] diff --git a/keyguard/crypto/engine.py b/keyguard/crypto/engine.py new file mode 100644 index 0000000..017c311 --- /dev/null +++ b/keyguard/crypto/engine.py @@ -0,0 +1,234 @@ +"""CryptoEngine (KDF, AEAD, HMAC) and PasswordGenerator.""" + +from __future__ import annotations + +import hashlib +import hmac as hmac_mod +import logging +import math +import secrets +import string +from typing import Tuple + +import argon2 +import argon2.low_level +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives.kdf.hkdf import HKDF + +from keyguard.crypto.formats import KEY_SIZE, NONCE_SIZE +from keyguard.util.memory import SecureMemory + +logger = logging.getLogger("keyguard.crypto") + + +# ============================================================================ +# CryptoEngine +# ============================================================================ +class CryptoEngine: + """Argon2id KDF + ChaCha20-Poly1305 AEAD + HMAC-SHA256.""" + + def __init__(self, kdf_params: dict | None = None): + if kdf_params is None: + from keyguard.config import Config + + kdf_params = Config.get_kdf_params() + + self.time_cost = kdf_params["time_cost"] + self.memory_cost = kdf_params["memory_cost"] + self.parallelism = kdf_params["parallelism"] + + logger.info( + "CryptoEngine: Argon2id(t=%d, m=%d KiB, p=%d)", + self.time_cost, + self.memory_cost, + self.parallelism, + ) + + # ------------------------------------------------------------------ + # HKDF info constants for different vault versions + HKDF_INFO_V3 = b"KeyGuard-3.0.1 key-split" + HKDF_INFO_V4 = b"KeyGuard-4.0 key-split" + + def derive_keys( + self, + password: SecureMemory, + salt: bytes, + hkdf_info: bytes | None = None, + ) -> Tuple[bytes, bytes]: + if len(password) == 0: + raise ValueError("Empty password") + + if hkdf_info is None: + hkdf_info = self.HKDF_INFO_V4 + + try: + master_key = argon2.low_level.hash_secret_raw( + password.get_bytes(), + salt, + time_cost=self.time_cost, + memory_cost=self.memory_cost, + parallelism=self.parallelism, + hash_len=KEY_SIZE, + type=argon2.Type.ID, + ) + + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=KEY_SIZE * 2, + salt=None, + info=hkdf_info, + ) + expanded = hkdf.derive(master_key) + + enc_key = expanded[:KEY_SIZE] + hmac_key = expanded[KEY_SIZE:] + return enc_key, hmac_key + + except MemoryError: + raise RuntimeError( + f"Not enough RAM for KDF ({self.memory_cost // 1024} MiB required). " + "Try a lower KDF profile." + ) + finally: + if "master_key" in locals(): + ba = bytearray(master_key) + for i in range(len(ba)): + ba[i] = 0 + if "expanded" in locals(): + ba2 = bytearray(expanded) + for i in range(len(ba2)): + ba2[i] = 0 + + # ------------------------------------------------------------------ + def encrypt_data( + self, key: bytes, plaintext: bytes, associated_data: bytes = b"" + ) -> Tuple[bytes, bytes]: + cipher = ChaCha20Poly1305(key) + nonce = secrets.token_bytes(NONCE_SIZE) + ciphertext = cipher.encrypt(nonce, plaintext, associated_data) + return nonce, ciphertext + + def decrypt_data( + self, key: bytes, nonce: bytes, ciphertext: bytes, associated_data: bytes = b"" + ) -> bytes: + cipher = ChaCha20Poly1305(key) + return cipher.decrypt(nonce, ciphertext, associated_data) + + # ------------------------------------------------------------------ + def compute_hmac(self, key: bytes, data: bytes) -> bytes: + h = hmac_mod.new(key, data, hashlib.sha256) + return h.digest() + + def verify_hmac(self, key: bytes, data: bytes, expected: bytes) -> bool: + actual = self.compute_hmac(key, data) + return hmac_mod.compare_digest(actual, expected) + + @staticmethod + def constant_time_compare(a: bytes, b: bytes) -> bool: + return hmac_mod.compare_digest(a, b) + + +# ============================================================================ +# PasswordGenerator +# ============================================================================ +class PasswordGenerator: + """Secure random password generation with quality checks.""" + + _entropy_cache: dict = {} + + @staticmethod + def generate(length: int, charset: str) -> str: + if length < 1: + raise ValueError("Length must be at least 1") + if not charset: + raise ValueError("Empty charset") + + charset = "".join(sorted(set(charset))) + + while True: + password = "".join(secrets.choice(charset) for _ in range(length)) + if PasswordGenerator._check_quality(password, charset): + return password + + @staticmethod + def _check_quality(password: str, charset: str) -> bool: + if PasswordGenerator._has_patterns(password): + return False + + if len(password) >= 8: + char_types = { + "lower": string.ascii_lowercase, + "upper": string.ascii_uppercase, + "digit": string.digits, + "special": string.punctuation, + } + present_types = [] + for type_name, type_chars in char_types.items(): + if any(c in type_chars for c in charset): + if any(c in type_chars for c in password): + present_types.append(type_name) + + available_types = sum( + 1 for _, chars in char_types.items() if any(c in chars for c in charset) + ) + if available_types >= 2 and len(present_types) < 2: + return False + + return True + + @staticmethod + def _has_patterns(password: str) -> bool: + pwd_lower = password.lower() + sequences = [ + "qwerty", + "asdfgh", + "zxcvbn", + "123456", + "654321", + "qwertyuiop", + "asdfghjkl", + "zxcvbnm", + ] + for seq in sequences: + if seq in pwd_lower or seq[::-1] in pwd_lower: + return True + + for i in range(len(password) - 2): + if password[i] == password[i + 1] == password[i + 2]: + return True + + for i in range(len(password) - 2): + chars = password[i : i + 3] + if chars.isdigit(): + nums = [int(c) for c in chars] + if nums[1] == nums[0] + 1 and nums[2] == nums[1] + 1: + return True + if nums[1] == nums[0] - 1 and nums[2] == nums[1] - 1: + return True + if chars.isalpha(): + ords = [ord(c.lower()) for c in chars] + if ords[1] == ords[0] + 1 and ords[2] == ords[1] + 1: + return True + if ords[1] == ords[0] - 1 and ords[2] == ords[1] - 1: + return True + + return False + + @staticmethod + def calculate_entropy(password: str, charset: str) -> float: + if not password or not charset: + return 0.0 + + cache_key = (len(password), len(set(charset))) + if cache_key in PasswordGenerator._entropy_cache: + return PasswordGenerator._entropy_cache[cache_key] + + charset_size = len(set(charset)) + entropy = len(password) * math.log2(charset_size) + + if len(PasswordGenerator._entropy_cache) > 100: + PasswordGenerator._entropy_cache.clear() + + PasswordGenerator._entropy_cache[cache_key] = entropy + return entropy diff --git a/keyguard/crypto/formats.py b/keyguard/crypto/formats.py new file mode 100644 index 0000000..682dd32 --- /dev/null +++ b/keyguard/crypto/formats.py @@ -0,0 +1,183 @@ +"""Vault header formats (v3 legacy, v4 current), protocol constants, and migration helpers.""" + +from __future__ import annotations + +import struct +from dataclasses import dataclass +from typing import Union + +# ============================================================================ +# Protocol constants +# ============================================================================ +MAGIC_V3 = b"KG3" +MAGIC_V4 = b"KG4" +MAGIC_LEN = 3 + +SALT_SIZE = 32 # 256 bits +NONCE_SIZE = 12 # 96 bits (ChaCha20-Poly1305) +KEY_SIZE = 32 # 256 bits +HMAC_SIZE = 32 # 256 bits + +# -- v3 header layout ------------------------------------------------------- +# version(2) + counter(2) + salt(32) + created(8) + modified(8) = 52 bytes +HEADER_V3_FMT = ">HH32sQd" +HEADER_V3_SIZE = struct.calcsize(HEADER_V3_FMT) # 52 +PROTOCOL_VERSION_V3 = 3 + +# -- v4 header layout ------------------------------------------------------- +# version(2) + counter(2) + salt(32) + created(8) + modified(8) +# + kdf_algo(1) + kdf_ver(1) + kdf_time(4) + kdf_mem(4) + kdf_par(1) +# + kdf_hashlen(1) + reserved(2) = 69 bytes +HEADER_V4_FMT = ">HH32sQdBBIIBBH" +HEADER_V4_SIZE = struct.calcsize(HEADER_V4_FMT) # 69 +PROTOCOL_VERSION_V4 = 4 + +# KDF algorithm IDs +KDF_ARGON2ID = 0 +KDF_VERSION_19 = 0x13 # Argon2 v19 (current) + + +# ============================================================================ +# VaultHeaderV3 (legacy, read-only) +# ============================================================================ +@dataclass +class VaultHeaderV3: + version: int + counter: int + salt: bytes + created: float + modified: float + hmac: bytes + + def to_bytes(self) -> bytes: + data = struct.pack( + HEADER_V3_FMT, + self.version, + self.counter, + self.salt, + int(self.created), + self.modified, + ) + return data + self.hmac + + @classmethod + def from_bytes(cls, data: bytes) -> VaultHeaderV3: + if len(data) < HEADER_V3_SIZE + HMAC_SIZE: + raise ValueError("Invalid v3 header") + version, counter, salt, created, modified = struct.unpack( + HEADER_V3_FMT, data[:HEADER_V3_SIZE] + ) + hmac_val = data[HEADER_V3_SIZE : HEADER_V3_SIZE + HMAC_SIZE] + return cls( + version=version, + counter=counter, + salt=salt, + created=float(created), + modified=modified, + hmac=hmac_val, + ) + + +# ============================================================================ +# VaultHeaderV4 (current, self-descriptive) +# ============================================================================ +@dataclass +class VaultHeaderV4: + version: int + counter: int + salt: bytes + created: float + modified: float + kdf_algorithm: int # KDF_ARGON2ID + kdf_version: int # KDF_VERSION_19 + kdf_time_cost: int + kdf_memory_cost: int # KiB + kdf_parallelism: int + kdf_hash_len: int # 32 + reserved: int # 0 + hmac: bytes + + def to_bytes(self) -> bytes: + data = struct.pack( + HEADER_V4_FMT, + self.version, + self.counter, + self.salt, + int(self.created), + self.modified, + self.kdf_algorithm, + self.kdf_version, + self.kdf_time_cost, + self.kdf_memory_cost, + self.kdf_parallelism, + self.kdf_hash_len, + self.reserved, + ) + return data + self.hmac + + @classmethod + def from_bytes(cls, data: bytes) -> VaultHeaderV4: + if len(data) < HEADER_V4_SIZE + HMAC_SIZE: + raise ValueError("Invalid v4 header") + ( + version, + counter, + salt, + created, + modified, + kdf_algo, + kdf_ver, + kdf_time, + kdf_mem, + kdf_par, + kdf_hashlen, + reserved, + ) = struct.unpack(HEADER_V4_FMT, data[:HEADER_V4_SIZE]) + hmac_val = data[HEADER_V4_SIZE : HEADER_V4_SIZE + HMAC_SIZE] + return cls( + version=version, + counter=counter, + salt=salt, + created=float(created), + modified=modified, + kdf_algorithm=kdf_algo, + kdf_version=kdf_ver, + kdf_time_cost=kdf_time, + kdf_memory_cost=kdf_mem, + kdf_parallelism=kdf_par, + kdf_hash_len=kdf_hashlen, + reserved=reserved, + hmac=hmac_val, + ) + + def get_kdf_params(self) -> dict: + """Extract KDF parameters from header for key derivation.""" + return { + "time_cost": self.kdf_time_cost, + "memory_cost": self.kdf_memory_cost, + "parallelism": self.kdf_parallelism, + } + + +# ============================================================================ +# Factory / detection +# ============================================================================ +def parse_vault_header(data: bytes) -> Union[VaultHeaderV3, VaultHeaderV4]: + """Parse the vault header from raw bytes (including the 3-byte magic).""" + if len(data) < MAGIC_LEN: + raise ValueError("Data too short to be a vault") + + magic = data[:MAGIC_LEN] + payload = data[MAGIC_LEN:] + + if magic == MAGIC_V3: + return VaultHeaderV3.from_bytes(payload) + elif magic == MAGIC_V4: + return VaultHeaderV4.from_bytes(payload) + else: + raise ValueError(f"Unrecognised vault magic: {magic!r}") + + +def is_legacy_vault(data: bytes) -> bool: + """Quick check whether data starts with the v3 magic.""" + return data[:MAGIC_LEN] == MAGIC_V3 diff --git a/keyguard/logging_setup.py b/keyguard/logging_setup.py new file mode 100644 index 0000000..b12e5a4 --- /dev/null +++ b/keyguard/logging_setup.py @@ -0,0 +1,60 @@ +"""Secure logging setup — no secrets in logs, rotation, OS-appropriate dir.""" + +from __future__ import annotations + +import logging +import logging.handlers +import os +import platform +from pathlib import Path + + +class SecureFormatter(logging.Formatter): + """Formatter that sanitises potentially sensitive arguments.""" + + def format(self, record): + if hasattr(record, "args") and record.args: + safe = [] + for arg in record.args: + if isinstance(arg, (bytes, bytearray)): + safe.append(f"<{len(arg)} bytes>") + elif isinstance(arg, str) and len(arg) > 50: + safe.append(f"<{len(arg)} chars>") + else: + safe.append(arg) + record.args = tuple(safe) + return super().format(record) + + +def setup_secure_logging(log_dir: Path) -> logging.Logger: + """Configure the *keyguard* logger with rotation and safe formatting.""" + log_dir.mkdir(parents=True, exist_ok=True) + if platform.system() != "Windows": + try: + os.chmod(log_dir, 0o700) + except OSError: + pass + + log_file = log_dir / "keyguard.log" + + formatter = SecureFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + + handler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=10 * 1024 * 1024, backupCount=3, encoding="utf-8" + ) + handler.setFormatter(formatter) + + root_logger = logging.getLogger("keyguard") + root_logger.setLevel(logging.INFO) + # avoid duplicate handlers on repeated calls + if not root_logger.handlers: + root_logger.addHandler(handler) + root_logger.propagate = False + + try: + if platform.system() != "Windows": + os.chmod(log_file, 0o600) + except OSError: + pass + + return root_logger diff --git a/keyguard/main.py b/keyguard/main.py new file mode 100644 index 0000000..68ce2e6 --- /dev/null +++ b/keyguard/main.py @@ -0,0 +1,104 @@ +"""KeyGuard entrypoint.""" + +from __future__ import annotations + +import logging +import os +import sys + +logger = logging.getLogger("keyguard") + + +def main(): + """Application entry point.""" + # 1. Check dependencies + from keyguard import check_dependencies + + check_dependencies() + + # 2. Check for display (Linux headless detection) + if sys.platform.startswith("linux"): + if not os.environ.get("DISPLAY") and not os.environ.get("WAYLAND_DISPLAY"): + print( + "ERROR: No display found ($DISPLAY / $WAYLAND_DISPLAY not set).\n" + "KeyGuard requires a graphical environment.", + file=sys.stderr, + ) + sys.exit(1) + + # 3. Resolve data directory (with migration) + from keyguard.paths import get_data_dir, migrate_legacy_directory + + data_dir = get_data_dir() + data_dir.mkdir(parents=True, exist_ok=True) + + migrate_legacy_directory(data_dir) + + # 4. Initialise logging + from keyguard.logging_setup import setup_secure_logging + + setup_secure_logging(data_dir) + + # 5. Platform hardening + from keyguard.util.platform_harden import ( + apply_platform_hardening, + validate_system_requirements, + ) + + try: + validate_system_requirements() + except SystemError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + sys.exit(1) + + apply_platform_hardening() + + # 6. KDF calibration on first run + from keyguard.config import Config + from keyguard.paths import get_vault_path + + if not Config.config_exists(data_dir): + logger.info("First run — calibrating KDF...") + try: + Config.calibrate_kdf(data_dir) + except RuntimeError as exc: + logger.error("KDF calibration failed: %s", exc) + # Import tk lazily to show error dialog + try: + import tkinter as tk + from tkinter import messagebox as mb + + root = tk.Tk() + root.withdraw() + mb.showerror( + "System Error", + f"Could not calibrate the security system:\n\n{exc}\n\n" + "KeyGuard cannot run on this hardware.", + ) + root.destroy() + except Exception: + pass + sys.exit(1) + + # 7. Launch GUI + try: + from keyguard.ui.app import KeyGuardApp + + vault_path = get_vault_path(data_dir) + app = KeyGuardApp(vault_path=vault_path, data_dir=data_dir) + app.mainloop() + except KeyboardInterrupt: + logger.info("Application interrupted by user") + except Exception as exc: + logger.critical("Critical error: %s", exc) + raise + finally: + try: + if "app" in locals() and hasattr(app, "vault"): + app.vault.close() + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/keyguard/paths.py b/keyguard/paths.py new file mode 100644 index 0000000..f9da691 --- /dev/null +++ b/keyguard/paths.py @@ -0,0 +1,104 @@ +"""Cross-platform directory resolution and legacy migration.""" + +from __future__ import annotations + +import logging +import os +import platform +import shutil +import time +from pathlib import Path + +logger = logging.getLogger("keyguard.paths") + +_APP_NAME = "KeyGuard" +_APP_AUTHOR = "CryptGuard" + + +def get_data_dir() -> Path: + """Return the platform-appropriate data directory (XDG on Linux).""" + try: + import platformdirs + + return Path(platformdirs.user_data_dir(_APP_NAME, _APP_AUTHOR)) + except ImportError: + # Fallback without platformdirs + return _fallback_data_dir() + + +def _fallback_data_dir() -> Path: + system = platform.system() + if system == "Windows": + base = os.environ.get("LOCALAPPDATA", Path.home() / "AppData" / "Local") + return Path(base) / _APP_AUTHOR / _APP_NAME + elif system == "Darwin": + return Path.home() / "Library" / "Application Support" / _APP_NAME + else: + xdg = os.environ.get("XDG_DATA_HOME", str(Path.home() / ".local" / "share")) + return Path(xdg) / _APP_NAME + + +def get_legacy_dir() -> Path: + """Return the old v3 data directory path.""" + return Path.home() / ".keyguard3" + + +def migrate_legacy_directory(data_dir: Path) -> bool: + """Migrate from ~/.keyguard3 to the new platform directory. + + Returns True if migration was performed. + """ + legacy = get_legacy_dir() + if not legacy.is_dir(): + return False + + vault_in_new = data_dir / "vault.kg3" + if vault_in_new.exists(): + logger.info("New data directory already has a vault; skipping migration") + return False + + logger.info("Migrating from %s to %s", legacy, data_dir) + try: + data_dir.mkdir(parents=True, exist_ok=True) + if platform.system() != "Windows": + os.chmod(data_dir, 0o700) + + for item in legacy.iterdir(): + if item.is_file(): + dest = data_dir / item.name + shutil.copy2(item, dest) + if platform.system() != "Windows": + os.chmod(dest, 0o600) + logger.debug("Copied %s -> %s", item, dest) + + # Rename legacy dir as breadcrumb + ts = int(time.time()) + renamed = legacy.with_name(f".keyguard3.migrated-{ts}") + legacy.rename(renamed) + logger.info("Legacy directory renamed to %s", renamed) + return True + + except Exception as exc: + logger.error("Migration failed: %s", exc) + return False + + +# -- path helpers ----------------------------------------------------------- +def get_vault_path(data_dir: Path) -> Path: + return data_dir / "vault.kg3" + + +def get_backup_path(data_dir: Path) -> Path: + return data_dir / "vault.kg3.backup" + + +def get_lock_path(data_dir: Path) -> Path: + return data_dir / "vault.kg3.lock" + + +def get_log_path(data_dir: Path) -> Path: + return data_dir / "keyguard.log" + + +def get_config_path(data_dir: Path) -> Path: + return data_dir / "config.ini" diff --git a/keyguard/storage/__init__.py b/keyguard/storage/__init__.py new file mode 100644 index 0000000..4ba3e82 --- /dev/null +++ b/keyguard/storage/__init__.py @@ -0,0 +1,5 @@ +"""KeyGuard storage modules.""" + +from keyguard.storage.backend import StorageBackend + +__all__ = ["StorageBackend"] diff --git a/keyguard/storage/backend.py b/keyguard/storage/backend.py new file mode 100644 index 0000000..cac7469 --- /dev/null +++ b/keyguard/storage/backend.py @@ -0,0 +1,233 @@ +"""StorageBackend — atomic writes, backup/restore, file locking, permissions.""" + +from __future__ import annotations + +import logging +import os +import platform +import re +import shutil +import tempfile +import time +from pathlib import Path + +from keyguard.config import Config +from keyguard.crypto.formats import ( + HMAC_SIZE, + MAGIC_LEN, + MAGIC_V3, + MAGIC_V4, + HEADER_V3_SIZE, + HEADER_V4_SIZE, + VaultHeaderV3, + VaultHeaderV4, + PROTOCOL_VERSION_V3, + PROTOCOL_VERSION_V4, +) + +logger = logging.getLogger("keyguard.storage") + + +class StorageBackend: + """Vault file I/O with atomic writes, backup, and cross-platform locking.""" + + def __init__(self, vault_path: Path): + self.vault_path = vault_path + self.backup_path = vault_path.parent / (vault_path.name + ".backup") + self.lock_path = vault_path.parent / (vault_path.name + ".lock") + self._lock_file = None + + # Ensure directory exists + self.vault_path.parent.mkdir(parents=True, exist_ok=True) + if platform.system() != "Windows": + try: + os.chmod(self.vault_path.parent, 0o700) + except OSError: + pass + + self._acquire_lock() + + # -- locking ------------------------------------------------------------ + def _acquire_lock(self) -> None: + try: + self.lock_path.touch(mode=0o600, exist_ok=True) + self._lock_file = open(self.lock_path, "r+b") + if platform.system() != "Windows": + import fcntl + + fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except (IOError, OSError) as exc: + if self._lock_file is not None: + try: + self._lock_file.close() + except OSError: + pass + self._lock_file = None + raise RuntimeError("Vault is already in use by another process") from exc + + def _release_lock(self) -> None: + if self._lock_file: + try: + self._lock_file.close() + except OSError: + pass + finally: + self._lock_file = None + try: + self.lock_path.unlink() + except OSError: + pass + + # -- read / write ------------------------------------------------------- + def write_atomic(self, data: bytes) -> None: + # 1. Back up current file + if self.vault_path.exists(): + shutil.copy2(self.vault_path, self.backup_path) + self._secure_permissions(self.backup_path) + + # 2. Write to temp file with restricted permissions via umask + old_umask = None + try: + if os.name != "nt": + old_umask = os.umask(0o077) + with tempfile.NamedTemporaryFile( + mode="wb", + dir=self.vault_path.parent, + prefix="kg_tmp_", + suffix=".dat", + delete=False, + ) as tmp: + tmp.write(data) + tmp.flush() + os.fsync(tmp.fileno()) + temp_path = Path(tmp.name) + finally: + if old_umask is not None: + os.umask(old_umask) + + # 3. Secure permissions on temp file + self._secure_permissions(temp_path) + + # 4. Atomic rename + temp_path.replace(self.vault_path) + self._secure_permissions(self.vault_path) + + # 5. Cleanup orphaned temps + self._cleanup_temp_files() + logger.info("Vault saved successfully") + + def read(self) -> bytes: + if not self.vault_path.exists(): + raise FileNotFoundError("Vault not found") + + size = self.vault_path.stat().st_size + if size > Config.MAX_VAULT_SIZE: + raise ValueError(f"Vault too large: {size} bytes (max {Config.MAX_VAULT_SIZE})") + + # Fix open permissions + if platform.system() != "Windows": + st = self.vault_path.stat() + if st.st_mode & 0o077: + logger.warning("Vault permissions too open, fixing...") + os.chmod(self.vault_path, 0o600) + + return self.vault_path.read_bytes() + + def exists(self) -> bool: + return self.vault_path.exists() + + # -- backup / restore --------------------------------------------------- + def restore_backup(self) -> bool: + if self.verify_backup_integrity(): + shutil.copy2(self.backup_path, self.vault_path) + logger.info("Vault restored from backup") + return True + return False + + def verify_backup_integrity(self) -> bool: + if not self.backup_path.exists(): + return False + try: + data = self.backup_path.read_bytes() + magic = data[:MAGIC_LEN] + if magic == MAGIC_V3: + if len(data) < MAGIC_LEN + HEADER_V3_SIZE + HMAC_SIZE: + return False + hdr = VaultHeaderV3.from_bytes(data[MAGIC_LEN:]) + return hdr.version == PROTOCOL_VERSION_V3 + elif magic == MAGIC_V4: + if len(data) < MAGIC_LEN + HEADER_V4_SIZE + HMAC_SIZE: + return False + hdr = VaultHeaderV4.from_bytes(data[MAGIC_LEN:]) + return hdr.version == PROTOCOL_VERSION_V4 + return False + except Exception as exc: + logger.error("Backup corrupted: %s", exc) + return False + + def cleanup_old_backups(self) -> None: + """Remove backups encrypted with old keys (after password change).""" + try: + if self.backup_path.exists(): + old_backup = self.backup_path.parent / ( + self.backup_path.name + f".old-{int(time.time())}" + ) + self.backup_path.rename(old_backup) + + cutoff = time.time() - (7 * 24 * 3600) + for old in self.vault_path.parent.glob("*.backup.old-*"): + try: + m = re.search(r"\.old-(\d+)$", old.name) + if m: + ts = int(m.group(1)) + if ts < cutoff: + old.unlink() + logger.debug("Expired backup removed: %s", old) + except (ValueError, OSError): + pass + except Exception as exc: + logger.warning("Backup cleanup error: %s", exc) + + # -- permissions -------------------------------------------------------- + def _secure_permissions(self, path: Path) -> None: + try: + if platform.system() == "Windows": + try: + import win32security + import win32api + import ntsecuritycon as nsec + + user_name = win32api.GetUserName() + user_sid, _, _ = win32security.LookupAccountName(None, user_name) + dacl = win32security.ACL() + dacl.AddAccessAllowedAce( + win32security.ACL_REVISION, + nsec.FILE_GENERIC_READ | nsec.FILE_GENERIC_WRITE | nsec.DELETE, + user_sid, + ) + sd = win32security.SECURITY_DESCRIPTOR() + sd.SetSecurityDescriptorDacl(1, dacl, 0) + win32security.SetFileSecurity( + str(path), win32security.DACL_SECURITY_INFORMATION, sd + ) + except ImportError: + os.chmod(path, 0o600) + else: + os.chmod(path, 0o600) + except Exception as exc: + logger.warning("Error setting permissions on %s: %s", path, exc) + + def _cleanup_temp_files(self) -> None: + try: + for tmp in self.vault_path.parent.glob("kg_tmp_*"): + try: + if time.time() - tmp.stat().st_mtime > 3600: + tmp.unlink() + except Exception: + pass + except Exception: + pass + + # -- lifecycle ---------------------------------------------------------- + def __del__(self): + self._release_lock() diff --git a/keyguard/ui/__init__.py b/keyguard/ui/__init__.py new file mode 100644 index 0000000..bc8e559 --- /dev/null +++ b/keyguard/ui/__init__.py @@ -0,0 +1 @@ +"""KeyGuard UI modules.""" diff --git a/keyguard/ui/app.py b/keyguard/ui/app.py new file mode 100644 index 0000000..319d8d9 --- /dev/null +++ b/keyguard/ui/app.py @@ -0,0 +1,241 @@ +"""KeyGuardApp — main GUI application (Tkinter / ttkbootstrap).""" + +from __future__ import annotations + +import hashlib +import logging +import string +import sys +import tkinter as tk +from pathlib import Path +from tkinter import messagebox as mb +from tkinter import simpledialog as sd +from typing import Optional + +import ttkbootstrap as ttk +from ttkbootstrap.constants import INFO + +from keyguard.config import CHARSETS, MIN_TOTAL_BITS, OPT_TO_KEY, Config +from keyguard.crypto.engine import CryptoEngine, PasswordGenerator +from keyguard.storage.backend import StorageBackend +from keyguard.ui.dialogs import SecurePasswordDialog +from keyguard.ui.views import build_main_view, build_vault_viewer +from keyguard.util.memory import PasswordTimeout, SecureMemory +from keyguard.vault.manager import VaultManager + +logger = logging.getLogger("keyguard.ui") + + +class KeyGuardApp(ttk.Window): + """Main GUI application, coupled to the security core.""" + + def __init__(self, vault_path: Path, data_dir: Path): + super().__init__(themename="superhero") + + self._master_pw: Optional[SecureMemory] = None + self.title("KeyGuard 4.0") + self.geometry("580x480") + self.resizable(False, False) + + # -- backend -- + self.storage = StorageBackend(vault_path) + self.crypto = CryptoEngine(Config.get_kdf_params(data_dir)) + self.vault = VaultManager(self.storage, self.crypto) + self.password_gen = PasswordGenerator() + + # -- master password -- + pw_mem = SecurePasswordDialog.ask( + self, title="Senha-mestra", prompt="Digite a senha do vault:" + ) + if pw_mem is None or len(pw_mem) == 0: + self.destroy() + return + self._master_pw = pw_mem + + try: + if self.storage.exists(): + try: + self.vault.open(self._master_pw) + except Exception as exc: + mb.showerror("Erro", f"Senha incorreta ou vault corrompido:\n{exc}") + self.destroy() + return + else: + if not mb.askyesno("Novo vault", "Nenhum vault encontrado. Criar novo?"): + self.destroy() + return + try: + self.vault.create_new(self._master_pw) + except ValueError as exc: + mb.showerror("Erro", f"Erro ao criar vault:\n{exc}") + self.destroy() + return + except Exception: + if self._master_pw: + self._master_pw.clear() + raise + + # -- password timeout -- + self._pw_timeout = PasswordTimeout(self._master_pw, timeout=Config.SESSION_TIMEOUT) + reset = self._pw_timeout.reset + self.bind_all("", lambda _: reset()) + self.bind_all("", lambda _: reset()) + + # -- UI -- + self._build_menu() + build_main_view(self) + + # ------------------------------------------------------------------ menu + def _build_menu(self): + menubar = tk.Menu(self) + m = tk.Menu(menubar, tearoff=0) + m.add_command(label="Trocar Senha Mestra", command=self._change_master) + m.add_command(label="Atualizar Todas as Senhas", command=self._update_all_passwords) + menubar.add_cascade(label="Menu", menu=m) + self.config(menu=menubar) + + # ----------------------------------------------------------- change master + def _change_master(self): + old = sd.askstring("Trocar Senha", "Senha atual:", show="*", parent=self) + if old is None: + return + new = sd.askstring("Trocar Senha", "Nova senha-mestra:", show="*", parent=self) + if new is None: + return + conf = sd.askstring("Trocar Senha", "Confirme a nova senha-mestra:", show="*", parent=self) + if conf is None or new != conf: + mb.showerror("Erro", "Confirmação não confere", parent=self) + return + try: + self.vault.change_password(SecureMemory(old), SecureMemory(new)) + mb.showinfo("Sucesso", "Senha-mestra alterada", parent=self) + except Exception as exc: + mb.showerror("Erro", str(exc), parent=self) + + # -------------------------------------------------------- bulk update + def _update_all_passwords(self): + if not self.vault.entries: + mb.showinfo("Aviso", "O vault está vazio.", parent=self) + return + msg = ( + f"Serão geradas novas senhas para {len(self.vault.entries)} entradas.\n\n" + "As senhas antigas serão PERMANENTEMENTE substituídas " + "e um backup será criado.\n\nDeseja continuar?" + ) + if not mb.askyesno("Confirmar Atualização", msg, icon="warning", parent=self): + return + if not mb.askyesno( + "Última Confirmação", + "Tem certeza? Esta ação não pode ser desfeita!", + icon="warning", + parent=self, + ): + return + + try: + prog = ttk.Toplevel(self) + prog.title("Atualizando") + prog.geometry("280x110") + prog.resizable(False, False) + ttk.Label(prog, text="Gerando novas senhas...").pack(pady=10) + bar = ttk.Progressbar(prog, mode="indeterminate", length=220, bootstyle=INFO) + bar.pack(pady=10) + bar.start(10) + prog.update() + + total = self.vault.update_all_passwords(self.password_gen) + prog.destroy() + mb.showinfo("Concluído", f"{total} senhas atualizadas com sucesso.", parent=self) + except Exception as exc: + if "prog" in locals(): + prog.destroy() + mb.showerror( + "Erro", + f"Falha ao atualizar senhas:\n{exc}\n\n" + "O vault foi restaurado ao estado anterior.", + parent=self, + ) + try: + self.vault.close() + self.vault = VaultManager(self.storage, self.crypto) + self.vault.open(self._master_pw) + except Exception as reload_exc: + logger.critical("Error reloading vault: %s", reload_exc) + + # -------------------------------------------------------- callbacks + def _make_pwd(self) -> str: + try: + length = int(self.spin.get()) + length = max(Config.MIN_GENERATED_PASSWORD_LENGTH, min(128, length)) + if int(self.spin.get()) != length: + self.spin.set(str(length)) + except ValueError: + length = 16 + self.spin.set(str(length)) + charset = CHARSETS[OPT_TO_KEY[self.opt.get()]] + return self.password_gen.generate(length, charset) + + def _on_generate(self, *_): + pwd = self._make_pwd() + keyset = OPT_TO_KEY[self.opt.get()] + charset = CHARSETS[keyset] + bits = PasswordGenerator.calculate_entropy(pwd, charset) + self.var_pwd.set(pwd) + self.bar["value"] = min(bits, 120) + + msg = "Entropia: %.1f bits" % bits + if bits < MIN_TOTAL_BITS: + msg += " (fraco)" + classes = { + "lower": any(c in string.ascii_lowercase for c in pwd), + "upper": any(c in string.ascii_uppercase for c in pwd), + "digit": any(c in string.digits for c in pwd), + "symbol": any(c in string.punctuation for c in pwd), + } + if sum(classes.values()) < 2: + msg += " (classe fraca)" + self.lbl.config(text=msg) + + if self.flag_save.get(): + name = self.ent_app.get().strip() or "Sem_nome" + try: + self.vault.add_entry(name, pwd) + except ValueError: + self.vault.update_entry(name, password=pwd) + + def _on_copy(self, *_): + s = self.var_pwd.get() + if s: + self.clipboard_clear() + self.clipboard_append(s) + # Auto-clear clipboard after timeout + self.after(Config.CLIPBOARD_TIMEOUT * 1000, self.clipboard_clear) + + def _on_clear(self, *_): + self.clipboard_clear() + self.var_pwd.set("") + self.bar["value"] = 0 + self.lbl.config(text="Entropia / força") + if self.chk_eye.instate(["selected"]): + self.chk_eye.state(["!selected"]) + self.ent_pwd.config(show="•") + + def _vault_view(self): + build_vault_viewer(self) + + # -------------------------------------------------------- cleanup + def destroy(self): + try: + if hasattr(self, "_pw_timeout"): + self._pw_timeout.cancel() + self.clipboard_clear() + if hasattr(self, "var_pwd"): + self.var_pwd.set("") + if hasattr(self, "vault"): + self.vault.close() + if hasattr(self, "_master_pw") and self._master_pw is not None: + self._master_pw.clear() + except Exception as exc: + logger.error("Error during cleanup: %s", exc) + finally: + super().destroy() diff --git a/keyguard/ui/dialogs.py b/keyguard/ui/dialogs.py new file mode 100644 index 0000000..d12a89d --- /dev/null +++ b/keyguard/ui/dialogs.py @@ -0,0 +1,50 @@ +"""Secure password dialogs for KeyGuard.""" + +from __future__ import annotations + +import tkinter as tk +from tkinter import messagebox as mb + +import ttkbootstrap as ttk + +from keyguard.util.memory import SecureMemory + + +class SecurePasswordDialog: + """Modal dialog that returns a SecureMemory object (never a plain string).""" + + @staticmethod + def ask(parent, title="Password", prompt="Enter the password:"): + dlg = tk.Toplevel(parent) + dlg.title(title) + dlg.grab_set() + + var = tk.StringVar() + ttk.Label(dlg, text=prompt).pack(padx=20, pady=10) + ent = ttk.Entry(dlg, textvariable=var, show="*", width=30) + ent.pack(padx=20, pady=5) + ent.focus() + + res = {"pw": None} + + def _ok(): + raw = var.get() + if not raw: + mb.showerror("Error", "Password must not be empty.", parent=dlg) + return + res["pw"] = SecureMemory(raw) + var.set("") + dlg.destroy() + + def _cancel(): + var.set("") + dlg.destroy() + + btns = ttk.Frame(dlg) + btns.pack(pady=10) + ttk.Button(btns, text="OK", command=_ok).pack(side="left", padx=5) + ttk.Button(btns, text="Cancel", command=_cancel).pack(side="left", padx=5) + ent.bind("", lambda *_: _ok()) + dlg.bind("", lambda *_: _cancel()) + dlg.wait_window() + return res["pw"] diff --git a/keyguard/ui/views.py b/keyguard/ui/views.py new file mode 100644 index 0000000..e24f549 --- /dev/null +++ b/keyguard/ui/views.py @@ -0,0 +1,288 @@ +"""UI construction helpers — separated from app logic for maintainability.""" + +from __future__ import annotations + +import string +import tkinter as tk + +import ttkbootstrap as ttk +from ttkbootstrap.constants import DANGER, INFO, PRIMARY, SUCCESS +from ttkbootstrap.tooltip import ToolTip + +from keyguard.config import CHARSETS, MIN_TOTAL_BITS, OPT_TO_KEY, Config + + +def build_main_view(app) -> None: + """Build the main password-generator UI inside *app*.""" + container = ttk.Frame(app) + container.place(relx=0.5, rely=0.5, anchor="c") + + # ----- parameters frame ----- + frm = ttk.LabelFrame(container, text="Parâmetros") + frm.grid(row=0, column=0, sticky="n") + frm.columnconfigure(1, weight=1) + + ttk.Label(frm, text="Comprimento:").grid(row=0, column=0, sticky="e", padx=6, pady=4) + + def validate_length(value): + if value == "": + return True + try: + num = int(value) + return 1 <= num <= Config.MAX_GENERATED_PASSWORD_LENGTH + except ValueError: + return False + + vcmd = (app.register(validate_length), "%P") + app.spin = ttk.Spinbox( + frm, + from_=Config.MIN_GENERATED_PASSWORD_LENGTH, + to=Config.MAX_GENERATED_PASSWORD_LENGTH, + width=6, + bootstyle=PRIMARY, + validate="key", + validatecommand=vcmd, + ) + app.spin.set(16) + app.spin.grid(row=0, column=1, sticky="w", padx=(2, 8), pady=4) + ToolTip( + app.spin, + text=f"Password length ({Config.MIN_GENERATED_PASSWORD_LENGTH}" + f"-{Config.MAX_GENERATED_PASSWORD_LENGTH})", + ) + + app.opt = ttk.IntVar(value=4) + labels = ("Números", "Letras", "Letras+Números", "Todos") + for i, txt in enumerate(labels, 1): + r = ttk.Radiobutton(frm, text=txt, value=i, variable=app.opt) + r.grid(row=i, column=0 if i % 2 else 1, sticky="w", padx=8, pady=2) + + app.flag_save = ttk.BooleanVar() + ttk.Checkbutton(frm, text="Salvar no vault", variable=app.flag_save).grid( + row=5, column=0, columnspan=2, sticky="w", padx=8, pady=(6, 2) + ) + + ttk.Label(frm, text="Aplicação:").grid(row=6, column=0, sticky="e", padx=6) + app.ent_app = ttk.Entry(frm, width=24) + app.ent_app.grid(row=6, column=1, sticky="w", padx=(2, 8), pady=4) + + # ----- output frame ----- + out = ttk.Frame(container) + out.grid(row=1, column=0, pady=12, sticky="ew") + out.columnconfigure(0, weight=1) + + app.var_pwd = ttk.StringVar() + app.ent_pwd = ttk.Entry( + out, + textvariable=app.var_pwd, + font=("Consolas", 14), + state="readonly", + width=38, + show="•", + ) + app.ent_pwd.grid(row=0, column=0, sticky="ew", ipadx=6, ipady=4) + + app.chk_eye = ttk.Checkbutton( + out, + text="👁", + style="toolbutton", + command=lambda: app.ent_pwd.config( + show="" if app.chk_eye.instate(["selected"]) else "•" + ), + ) + app.chk_eye.grid(row=0, column=1, padx=4) + + app.bar = ttk.Progressbar(out, maximum=120, length=400, bootstyle=SUCCESS) + app.bar.grid(row=1, column=0, columnspan=2, pady=6) + app.lbl = ttk.Label(out, text="Entropia / força") + app.lbl.grid(row=2, column=0, columnspan=2) + + # ----- buttons ----- + btn = ttk.Frame(container) + btn.grid(row=2, column=0, pady=6) + ttk.Button(btn, text="Gerar", bootstyle=PRIMARY, command=app._on_generate).pack( + side="left", padx=6 + ) + ttk.Button(btn, text="Copiar", command=app._on_copy).pack(side="left", padx=6) + ttk.Button(btn, text="Limpar", command=app._on_clear).pack(side="left", padx=6) + ttk.Button(btn, text="Vault", command=app._vault_view).pack(side="left", padx=6) + ttk.Button(btn, text="Sair", bootstyle=DANGER, command=app.destroy).pack( + side="left", padx=6 + ) + + # -- shortcuts + app.bind_all("", lambda *_: app._on_generate()) + app.bind_all("", lambda *_: app._on_copy()) + app.bind_all("", lambda *_: app._on_clear()) + app.bind_all("", lambda *_: app.destroy()) + + +def build_vault_viewer(app) -> None: + """Build the vault viewer window.""" + top = ttk.Toplevel(app) + top.title("Vault") + top.geometry("380x350") + + # -- search + sf = ttk.Frame(top) + sf.pack(fill=tk.X, padx=5, pady=5) + ttk.Label(sf, text="Buscar:").pack(side=tk.LEFT, padx=5) + search_var = ttk.StringVar() + ttk.Entry(sf, textvariable=search_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5) + + # -- treeview + tree = ttk.Treeview(top, columns=("app", "pwd"), show="headings") + tree.heading("app", text="Aplicação") + tree.heading("pwd", text="Senha") + tree.column("pwd", width=120, anchor="center") + tree.pack(fill=tk.BOTH, expand=True) + + def filter_entries(*_args): + query = search_var.get().lower() + tree.delete(*tree.get_children()) + for name in app.vault.list_entries(): + if query in name.lower(): + tree.insert("", tk.END, iid=name, values=(name, "••••••••")) + + search_var.trace_add("write", lambda *_: filter_entries()) + filter_entries() + + # -- bindings + tree.bind("", lambda _: _detail(app, tree)) + tree.bind("", lambda _: _detail(app, tree)) + tree.bind("", lambda _: _delete_sel(app, tree)) + tree.bind("", lambda _: _copy_sel(app, tree)) + top.bind("", lambda _: top.destroy()) + + # -- drag & drop reorder + tree._drag = {"item": None, "moved": False} + + def _press(e): + tree._drag["item"] = tree.identify_row(e.y) + tree._drag["moved"] = False + + def _motion(e): + iid = tree._drag["item"] + if not iid: + return + target = tree.identify_row(e.y) + if not target or target == iid: + return + idx = tree.index(target) + tree.move(iid, "", idx) + tree._drag["moved"] = True + + def _release(_e): + if not tree._drag["moved"]: + return + new_order = [tree.item(iid, "values")[0] for iid in tree.get_children()] + _persist_order(app, new_order) + + tree.bind("", _press) + tree.bind("", _motion) + tree.bind("", _release) + + # -- buttons + bar = ttk.Frame(top) + bar.pack(pady=6) + ttk.Button(bar, text="Ver detalhes", command=lambda: _detail(app, tree)).pack( + side="left", padx=6 + ) + ttk.Button(bar, text="Copiar", command=lambda: _copy_sel(app, tree)).pack( + side="left", padx=6 + ) + ttk.Button( + bar, text="Excluir", bootstyle=DANGER, command=lambda: _delete_sel(app, tree) + ).pack(side="left", padx=6) + + +# --------------------------------------------------------------------------- +# Helper functions used by vault viewer +# --------------------------------------------------------------------------- +def _detail(app, tree) -> None: + from tkinter import messagebox as mb + + sel = tree.selection() + if not sel: + return + name = sel[0] + entry = app.vault.entries.get(name) + if not entry: + return + pwd = entry.get_password() + show = pwd[: min(16, len(pwd))] + mask = "•" * len(show) + + dlg = ttk.Toplevel(app) + dlg.title(name) + dlg.grab_set() + + ttk.Label(dlg, text=f"Aplicação: {name}", font=("Segoe UI", 11, "bold")).pack( + pady=(12, 4) + ) + frame = ttk.Frame(dlg) + frame.pack(padx=12, pady=4, fill="x") + lbl = ttk.Label(frame, text=mask, font=("Consolas", 12)) + lbl.pack(side=tk.LEFT, fill="x", expand=True) + var_eye = ttk.IntVar(value=0) + ttk.Checkbutton( + frame, + text="👁", + style="toolbutton", + variable=var_eye, + command=lambda: lbl.config(text=show if var_eye.get() else mask), + ).pack(side=tk.LEFT, padx=6) + ttk.Button( + dlg, + text="Copiar", + command=lambda: (app.clipboard_clear(), app.clipboard_append(pwd), dlg.destroy()), + ).pack(pady=8) + + def auto_hide(): + try: + if dlg and dlg.winfo_exists(): + var_eye.set(0) + lbl.config(text=mask) + except (tk.TclError, AttributeError): + pass + + dlg.after(Config.AUTO_HIDE_DELAY, auto_hide) + + +def _copy_sel(app, tree) -> None: + sel = tree.selection() + if sel: + entry = app.vault.entries.get(sel[0]) + if entry: + app.clipboard_clear() + app.clipboard_append(entry.get_password()) + # schedule clipboard clear + app.after(Config.CLIPBOARD_TIMEOUT * 1000, app.clipboard_clear) + + +def _delete_sel(app, tree) -> None: + from tkinter import messagebox as mb + + sel = tree.selection() + if not sel: + return + name = sel[0] + if not mb.askyesno( + "Confirmar", f"Remover '{name}' do vault?", parent=tree.winfo_toplevel() + ): + return + try: + app.vault.delete_entry(name) + tree.delete(name) + except ValueError as exc: + mb.showerror("Erro", f"Erro ao excluir: {exc}", parent=tree.winfo_toplevel()) + except Exception as exc: + mb.showerror("Erro", f"Erro inesperado: {exc}", parent=tree.winfo_toplevel()) + + +def _persist_order(app, new_order: list) -> None: + if set(new_order) != set(app.vault.entries.keys()): + return + app.vault.entry_order = new_order.copy() + app.vault._modified = True + app.vault._save() diff --git a/keyguard/util/__init__.py b/keyguard/util/__init__.py new file mode 100644 index 0000000..5869c58 --- /dev/null +++ b/keyguard/util/__init__.py @@ -0,0 +1 @@ +"""KeyGuard utility modules.""" diff --git a/keyguard/util/memory.py b/keyguard/util/memory.py new file mode 100644 index 0000000..52a0e93 --- /dev/null +++ b/keyguard/util/memory.py @@ -0,0 +1,289 @@ +"""Secure memory management: SecureMemory, FragmentedSecret, KeyObfuscator, +TimedExposure, PasswordTimeout.""" + +from __future__ import annotations + +import ctypes +import logging +import platform +import secrets +import threading +import time +from typing import Optional, Union + +logger = logging.getLogger("keyguard.memory") + + +# --------------------------------------------------------------------------- +# SecureMemory +# --------------------------------------------------------------------------- +class SecureMemory: + """Manages a bytearray in locked (non-swappable) memory with multi-pass wipe.""" + + def __init__(self, data: Union[bytes, bytearray, str]): + if isinstance(data, str): + data = data.encode("utf-8") + self._size = len(data) + self._data = bytearray(data) + self._locked = False + self._protect_memory() + + # -- memory protection -------------------------------------------------- + def _protect_memory(self) -> None: + if self._size == 0: + return + try: + address = ctypes.addressof(ctypes.c_char.from_buffer(self._data)) + if platform.system() == "Windows": + kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + if kernel32.VirtualLock( + ctypes.c_void_p(address), ctypes.c_size_t(self._size) + ): + self._locked = True + handle = kernel32.GetCurrentProcess() + kernel32.SetProcessWorkingSetSize(handle, -1, -1) + else: + libc = ctypes.CDLL(None) + if ( + libc.mlock(ctypes.c_void_p(address), ctypes.c_size_t(self._size)) + == 0 + ): + self._locked = True + except Exception as exc: + logger.debug("Memory protection unavailable: %s", exc) + + # -- public API --------------------------------------------------------- + def get_bytes(self) -> bytes: + if not self._data: + raise ValueError("Memory already cleared") + return bytes(self._data) + + def clear(self) -> None: + if not self._data: + return + try: + patterns = [ + bytes([0xFF] * self._size), + bytes([0x00] * self._size), + bytes([0x55] * self._size), + bytes([0xAA] * self._size), + secrets.token_bytes(self._size), + secrets.token_bytes(self._size), + bytes([0x00] * self._size), + ] + for pat in patterns: + self._data[:] = pat + + if self._locked: + try: + address = ctypes.addressof(ctypes.c_char.from_buffer(self._data)) + if platform.system() == "Windows": + k32 = ctypes.WinDLL("kernel32", use_last_error=True) + k32.VirtualUnlock( + ctypes.c_void_p(address), ctypes.c_size_t(self._size) + ) + else: + libc = ctypes.CDLL(None) + libc.munlock( + ctypes.c_void_p(address), ctypes.c_size_t(self._size) + ) + except Exception: + pass + finally: + self._data = bytearray() + self._size = 0 + self._locked = False + + def __len__(self) -> int: + return self._size + + def __del__(self): + self.clear() + + @property + def is_protected(self) -> bool: + return self._locked + + +# --------------------------------------------------------------------------- +# PasswordTimeout (bug-fix: added threading.Lock) +# --------------------------------------------------------------------------- +class PasswordTimeout: + """Wipes a SecureMemory after *timeout* seconds of inactivity.""" + + def __init__(self, secure_memory: SecureMemory, timeout: int = 300): + self._mem = secure_memory + self._timeout = timeout + self._timer: Optional[threading.Timer] = None + self._destroyed = False + self._lock = threading.Lock() + self.reset() + + def _wipe(self): + with self._lock: + if not self._destroyed: + self._mem.clear() + self._destroyed = True + logger.info("Master password destroyed by timeout") + + def reset(self): + with self._lock: + if self._destroyed: + return + if self._timer: + self._timer.cancel() + self._timer = threading.Timer(self._timeout, self._wipe) + self._timer.daemon = True + self._timer.start() + + def cancel(self): + with self._lock: + if self._timer: + self._timer.cancel() + if not self._destroyed: + self._mem.clear() + self._destroyed = True + + +# --------------------------------------------------------------------------- +# FragmentedSecret +# --------------------------------------------------------------------------- +class FragmentedSecret: + """Splits a secret into N XOR-masked fragments.""" + + def __init__(self, data: Union[bytes, bytearray, str], parts: int = 3): + b = data.encode() if isinstance(data, str) else bytes(data) + ln = len(b) + masks = [secrets.token_bytes(ln) for _ in range(parts - 1)] + last = bytearray(b) + for m in masks: + for i in range(ln): + last[i] ^= m[i] + self._parts = [SecureMemory(m) for m in masks] + [SecureMemory(last)] + + def reconstruct(self) -> SecureMemory: + ln = len(self._parts[-1]) + res = bytearray(self._parts[-1].get_bytes()) + for p in self._parts[:-1]: + blk = p.get_bytes() + for i in range(ln): + res[i] ^= blk[i] + return SecureMemory(res) + + def clear(self): + for p in self._parts: + p.clear() + self._parts = [] + + +# --------------------------------------------------------------------------- +# KeyObfuscator (bug-fix: added threading.Lock) +# --------------------------------------------------------------------------- +class KeyObfuscator: + """Keeps derived key obfuscated; reveal only via TimedExposure.""" + + def __init__(self, key: SecureMemory): + self._key = key + self._mask: Optional[SecureMemory] = None + self._frags: Optional[FragmentedSecret] = None + self._obfuscated = False + self._lock = threading.Lock() + + def obfuscate(self): + with self._lock: + if self._key is None and self._obfuscated: + return + if self._obfuscated: + plain_sm = self._deobfuscate_unlocked() + if self._mask: + self._mask.clear() + self._mask = None + if self._frags: + self._frags.clear() + self._frags = None + self._key = plain_sm + self._obfuscated = False + + if self._key is None or len(self._key) == 0: + return + + kb = self._key.get_bytes() + mask_b = secrets.token_bytes(len(kb)) + masked = bytearray(a ^ b for a, b in zip(kb, mask_b)) + self._mask = SecureMemory(mask_b) + self._frags = FragmentedSecret(masked, 3) + self._key.clear() + self._obfuscated = True + + def deobfuscate(self) -> SecureMemory: + with self._lock: + return self._deobfuscate_unlocked() + + def _deobfuscate_unlocked(self) -> SecureMemory: + if not self._obfuscated: + return self._key + masked_sb = self._frags.reconstruct() + mask = self._mask.get_bytes() + plain = bytearray(a ^ b for a, b in zip(masked_sb.get_bytes(), mask)) + masked_sb.clear() + return SecureMemory(plain) + + def clear(self): + with self._lock: + if self._mask: + self._mask.clear() + if self._frags: + self._frags.clear() + if self._key: + self._key.clear() + self._obfuscated = False + + +# --------------------------------------------------------------------------- +# TimedExposure (bug-fix: added threading.Lock) +# --------------------------------------------------------------------------- +class TimedExposure: + """Context manager that keeps a key in the clear only briefly.""" + + def __init__(self, ko: KeyObfuscator, timeout: float = 0.5): + self.ko = ko + self.timeout = timeout + self._plain: Optional[SecureMemory] = None + self._timer: Optional[threading.Timer] = None + self._lock = threading.Lock() + + def __enter__(self) -> SecureMemory: + with self._lock: + self._cancel_timer_unlocked() + self._plain = self.ko.deobfuscate() + return self._plain + + def _re_mask(self): + with self._lock: + if self._plain: + self._plain.clear() + self._plain = None + try: + self.ko.obfuscate() + except (ValueError, AttributeError): + pass + + def __exit__(self, exc_type, exc, tb): + try: + self._re_mask() + finally: + with self._lock: + self._cancel_timer_unlocked() + if self.timeout > 0 and exc_type is None: + self._timer = threading.Timer(self.timeout, self._re_mask) + self._timer.daemon = True + self._timer.start() + + def _cancel_timer_unlocked(self): + if self._timer and self._timer.is_alive(): + self._timer.cancel() + self._timer = None + + def cancel_timer(self): + with self._lock: + self._cancel_timer_unlocked() diff --git a/keyguard/util/platform_harden.py b/keyguard/util/platform_harden.py new file mode 100644 index 0000000..610275c --- /dev/null +++ b/keyguard/util/platform_harden.py @@ -0,0 +1,156 @@ +"""Platform hardening (non-debug) and SecurityWarning. + +All debugger-detection code has been intentionally removed. +Only useful OS-level hardening remains (DEP, DLL restriction, core dump disable). +""" + +from __future__ import annotations + +import ctypes +import logging +import multiprocessing +import platform +import time +import warnings +from typing import Dict + +logger = logging.getLogger("keyguard.harden") + + +# --------------------------------------------------------------------------- +# SecurityWarning +# --------------------------------------------------------------------------- +class SecurityWarning(UserWarning): + """Categorised security warning with auto-logging.""" + + _warning_counts: Dict[str, int] = { + "memory_protection": 0, + "process_protection": 0, + "crypto_fallback": 0, + "file_permissions": 0, + "other": 0, + } + + def __init__( + self, + message: str, + category: str = "other", + severity: str = "medium", + recommendation: str | None = None, + ): + super().__init__(message) + self.category = category + self.severity = severity + self.recommendation = recommendation + self.timestamp = time.time() + + if category in self._warning_counts: + self._warning_counts[category] += 1 + else: + self._warning_counts["other"] += 1 + + self._auto_log() + + def _auto_log(self): + msg = f"[{self.severity.upper()}] {self.category}: {self}" + if self.recommendation: + msg += f" | Recommendation: {self.recommendation}" + level = { + "critical": logging.CRITICAL, + "high": logging.ERROR, + "medium": logging.WARNING, + }.get(self.severity, logging.INFO) + logger.log(level, msg) + + @classmethod + def get_security_metrics(cls) -> Dict[str, int]: + return cls._warning_counts.copy() + + @classmethod + def reset_metrics(cls): + for key in cls._warning_counts: + cls._warning_counts[key] = 0 + + def __str__(self) -> str: + base = super().__str__() + return f"{base} [{self.category}]" + + +# --------------------------------------------------------------------------- +# Convenience warning helpers +# --------------------------------------------------------------------------- +def warn_memory_protection(message: str, severity: str = "medium"): + warnings.warn(SecurityWarning(message, "memory_protection", severity)) + + +def warn_process_protection(message: str, severity: str = "medium"): + warnings.warn(SecurityWarning(message, "process_protection", severity)) + + +def warn_crypto_fallback(message: str, severity: str = "medium"): + warnings.warn(SecurityWarning(message, "crypto_fallback", severity)) + + +def warn_file_permissions(message: str, severity: str = "medium"): + warnings.warn(SecurityWarning(message, "file_permissions", severity)) + + +# --------------------------------------------------------------------------- +# Platform hardening (NO debugger detection) +# --------------------------------------------------------------------------- +def apply_platform_hardening() -> None: + """Apply OS-level hardening without any debugger detection.""" + system = platform.system() + if system == "Windows": + _harden_windows() + elif system in ("Linux", "Darwin"): + _harden_unix() + + +def _harden_windows() -> None: + try: + kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + + # DEP (Data Execution Prevention) + DEP_ENABLE = 0x00000001 + if hasattr(kernel32, "SetProcessDEPPolicy"): + result = kernel32.SetProcessDEPPolicy(DEP_ENABLE) + if result == 0: + error = ctypes.get_last_error() + if error not in (5, 50, 87): + logger.warning("Failed to enable DEP (error %d)", error) + + # DLL injection protection + if hasattr(kernel32, "SetDllDirectoryW"): + kernel32.SetDllDirectoryW("") + logger.debug("DLL directory restricted to system") + + except Exception as exc: + logger.error("Error applying Windows protections: %s", exc) + warn_process_protection( + "Some process protections could not be applied", severity="high" + ) + + +def _harden_unix() -> None: + try: + import resource + + resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) + logger.debug("Core dumps disabled") + except Exception as exc: + logger.error("Error applying Unix protections: %s", exc) + warn_process_protection(f"Error applying Unix protections: {exc}", severity="medium") + + +# --------------------------------------------------------------------------- +# System requirements validation +# --------------------------------------------------------------------------- +def validate_system_requirements() -> None: + import psutil + + avail = psutil.virtual_memory().available / (1024**3) + if avail < 0.5: + raise SystemError(f"Insufficient RAM: {avail:.1f} GB free (minimum 0.5 GB).") + if (multiprocessing.cpu_count() or 1) < 2: + warnings.warn("Only 1 CPU core – performance may be low.", RuntimeWarning) diff --git a/keyguard/util/rate_limit.py b/keyguard/util/rate_limit.py new file mode 100644 index 0000000..21d990e --- /dev/null +++ b/keyguard/util/rate_limit.py @@ -0,0 +1,50 @@ +"""Rate limiter with exponential backoff for brute-force protection.""" + +from __future__ import annotations + +import logging +import time + +logger = logging.getLogger("keyguard.rate_limit") + +# Defaults (can be overridden by Config at import time) +MAX_LOGIN_ATTEMPTS = 5 +LOGIN_DELAY_BASE = 2 # seconds + + +class RateLimiter: + """Exponential-backoff rate limiter against brute force.""" + + def __init__( + self, + max_attempts: int = MAX_LOGIN_ATTEMPTS, + delay_base: int = LOGIN_DELAY_BASE, + ): + self._max_attempts = max_attempts + self._delay_base = delay_base + self.attempts = 0 + self.last_attempt: float = 0 + + def check(self): + now = time.time() + if self.attempts > 0: + required_delay = self._delay_base**self.attempts + elapsed = now - self.last_attempt + if elapsed < required_delay: + wait_time = required_delay - elapsed + logger.warning("Rate limiting: waiting %.1fs", wait_time) + time.sleep(wait_time) + + if self.attempts >= self._max_attempts: + logger.error("Maximum of %d attempts exceeded", self._max_attempts) + raise ValueError( + f"Exceeded the limit of {self._max_attempts} attempts. " + "Wait before trying again." + ) + + self.attempts += 1 + self.last_attempt = time.time() + + def reset(self): + self.attempts = 0 + self.last_attempt = 0 diff --git a/keyguard/vault/__init__.py b/keyguard/vault/__init__.py new file mode 100644 index 0000000..0a8dc8a --- /dev/null +++ b/keyguard/vault/__init__.py @@ -0,0 +1,6 @@ +"""KeyGuard vault modules.""" + +from keyguard.vault.manager import VaultManager +from keyguard.vault.models import VaultEntry + +__all__ = ["VaultManager", "VaultEntry"] diff --git a/keyguard/vault/manager.py b/keyguard/vault/manager.py new file mode 100644 index 0000000..894c1db --- /dev/null +++ b/keyguard/vault/manager.py @@ -0,0 +1,470 @@ +"""VaultManager — CRUD, open/save, and v3-to-v4 migration.""" + +from __future__ import annotations + +import json +import logging +import secrets +import shutil +import struct +import time +from typing import Dict, List, Optional + +from keyguard.config import Config +from keyguard.crypto.engine import CryptoEngine +from keyguard.crypto.formats import ( + HEADER_V3_FMT, + HEADER_V3_SIZE, + HEADER_V4_FMT, + HEADER_V4_SIZE, + HMAC_SIZE, + KDF_ARGON2ID, + KDF_VERSION_19, + KEY_SIZE, + MAGIC_LEN, + MAGIC_V3, + MAGIC_V4, + NONCE_SIZE, + PROTOCOL_VERSION_V3, + PROTOCOL_VERSION_V4, + SALT_SIZE, + VaultHeaderV3, + VaultHeaderV4, + is_legacy_vault, + parse_vault_header, +) +from keyguard.storage.backend import StorageBackend +from keyguard.util.memory import KeyObfuscator, SecureMemory, TimedExposure +from keyguard.util.rate_limit import RateLimiter +from keyguard.vault.models import VaultEntry + +logger = logging.getLogger("keyguard.vault") + + +class VaultManager: + """High-level vault operations.""" + + def __init__(self, storage: StorageBackend, crypto: CryptoEngine): + self.storage = storage + self.crypto = crypto + self.entries: Dict[str, VaultEntry] = {} + self.entry_order: List[str] = [] + self.header: Optional[VaultHeaderV4] = None + self._enc_ko: Optional[KeyObfuscator] = None + self._hmac_ko: Optional[KeyObfuscator] = None + self._modified = False + self.rate_limiter = RateLimiter() + + # ------------------------------------------------------------------ + # Create + # ------------------------------------------------------------------ + def create_new(self, password: SecureMemory) -> None: + pwd_str = password.get_bytes().decode("utf-8") + if len(pwd_str) < Config.MIN_MASTER_PASSWORD_LENGTH: + raise ValueError( + f"Master password must be at least " + f"{Config.MIN_MASTER_PASSWORD_LENGTH} characters" + ) + + has_upper = any(c.isupper() for c in pwd_str) + has_lower = any(c.islower() for c in pwd_str) + has_digit = any(c.isdigit() for c in pwd_str) + has_special = any(not c.isalnum() for c in pwd_str) + if sum([has_upper, has_lower, has_digit, has_special]) < 3: + raise ValueError( + "Master password must contain at least 3 character types " + "(upper, lower, digit, symbol)" + ) + + salt = secrets.token_bytes(SALT_SIZE) + try: + enc_key, hmac_key = self.crypto.derive_keys(password, salt) + self._enc_ko = KeyObfuscator(SecureMemory(enc_key)) + self._enc_ko.obfuscate() + self._hmac_ko = KeyObfuscator(SecureMemory(hmac_key)) + self._hmac_ko.obfuscate() + + self.header = VaultHeaderV4( + version=PROTOCOL_VERSION_V4, + counter=0, + salt=salt, + created=time.time(), + modified=time.time(), + kdf_algorithm=KDF_ARGON2ID, + kdf_version=KDF_VERSION_19, + kdf_time_cost=self.crypto.time_cost, + kdf_memory_cost=self.crypto.memory_cost, + kdf_parallelism=self.crypto.parallelism, + kdf_hash_len=KEY_SIZE, + reserved=0, + hmac=b"\x00" * HMAC_SIZE, + ) + self._save() + logger.info("New vault created") + finally: + if "enc_key" in locals(): + ba = bytearray(enc_key) + for i in range(len(ba)): + ba[i] = 0 + if "hmac_key" in locals(): + ba2 = bytearray(hmac_key) + for i in range(len(ba2)): + ba2[i] = 0 + + # ------------------------------------------------------------------ + # Open (supports v3 legacy + v4) + # ------------------------------------------------------------------ + def open(self, password: SecureMemory) -> None: + self.rate_limiter.check() + try: + data = self.storage.read() + + if len(data) < MAGIC_LEN: + raise ValueError("File is not a valid vault") + + magic = data[:MAGIC_LEN] + if magic == MAGIC_V3: + self._open_v3(data, password) + elif magic == MAGIC_V4: + self._open_v4(data, password) + else: + raise ValueError("Unrecognised vault format") + + self.rate_limiter.reset() + logger.info("Vault opened — %d entries", len(self.entries)) + + except (ValueError, KeyError, TypeError, json.JSONDecodeError): + raise + except (IOError, OSError): + raise + except Exception: + raise + + def _open_v3(self, data: bytes, password: SecureMemory) -> None: + """Open a legacy v3 vault, then migrate to v4.""" + hdr = VaultHeaderV3.from_bytes(data[MAGIC_LEN:]) + if hdr.version != PROTOCOL_VERSION_V3: + raise ValueError(f"Unsupported v3 vault version: {hdr.version}") + + # Use current engine params as fallback, but with v3 HKDF info + enc_key, hmac_key = self.crypto.derive_keys( + password, hdr.salt, hkdf_info=CryptoEngine.HKDF_INFO_V3 + ) + try: + self._enc_ko = KeyObfuscator(SecureMemory(enc_key)) + self._enc_ko.obfuscate() + self._hmac_ko = KeyObfuscator(SecureMemory(hmac_key)) + self._hmac_ko.obfuscate() + + # Verify HMAC + with TimedExposure(self._hmac_ko) as hk: + header_hmac = self.crypto.compute_hmac( + hk.get_bytes(), data[: MAGIC_LEN + HEADER_V3_SIZE] + ) + if not CryptoEngine.constant_time_compare(header_hmac, hdr.hmac): + self._clear_keys() + raise ValueError("Invalid header HMAC — wrong password or corrupted vault") + + # Decrypt + encrypted = data[MAGIC_LEN + HEADER_V3_SIZE + HMAC_SIZE :] + self._decrypt_entries(encrypted, data[: MAGIC_LEN + HEADER_V3_SIZE + HMAC_SIZE]) + + # Migrate to v4: build v4 header with current KDF params + self.header = VaultHeaderV4( + version=PROTOCOL_VERSION_V4, + counter=hdr.counter, + salt=hdr.salt, + created=hdr.created, + modified=hdr.modified, + kdf_algorithm=KDF_ARGON2ID, + kdf_version=KDF_VERSION_19, + kdf_time_cost=self.crypto.time_cost, + kdf_memory_cost=self.crypto.memory_cost, + kdf_parallelism=self.crypto.parallelism, + kdf_hash_len=KEY_SIZE, + reserved=0, + hmac=b"\x00" * HMAC_SIZE, + ) + + # Re-derive keys with v4 HKDF info for the new vault format + enc_key_v4, hmac_key_v4 = self.crypto.derive_keys( + password, hdr.salt, hkdf_info=CryptoEngine.HKDF_INFO_V4 + ) + self._enc_ko.clear() + self._hmac_ko.clear() + self._enc_ko = KeyObfuscator(SecureMemory(enc_key_v4)) + self._enc_ko.obfuscate() + self._hmac_ko = KeyObfuscator(SecureMemory(hmac_key_v4)) + self._hmac_ko.obfuscate() + + # Backup original v3 file before overwriting + v3_backup = self.storage.vault_path.parent / ( + self.storage.vault_path.name + f".v3backup-{int(time.time())}" + ) + shutil.copy2(self.storage.vault_path, v3_backup) + logger.info("v3 vault backed up to %s", v3_backup) + + # Re-save as v4 + self._save() + logger.info("Vault migrated from v3 to v4") + + finally: + ba = bytearray(enc_key) + for i in range(len(ba)): + ba[i] = 0 + ba2 = bytearray(hmac_key) + for i in range(len(ba2)): + ba2[i] = 0 + if "enc_key_v4" in locals(): + ba3 = bytearray(enc_key_v4) + for i in range(len(ba3)): + ba3[i] = 0 + if "hmac_key_v4" in locals(): + ba4 = bytearray(hmac_key_v4) + for i in range(len(ba4)): + ba4[i] = 0 + + def _open_v4(self, data: bytes, password: SecureMemory) -> None: + """Open a v4 vault (self-descriptive KDF params in header).""" + hdr = VaultHeaderV4.from_bytes(data[MAGIC_LEN:]) + if hdr.version != PROTOCOL_VERSION_V4: + raise ValueError(f"Unsupported v4 vault version: {hdr.version}") + + # Use KDF params from the header itself + kdf_params = hdr.get_kdf_params() + engine = CryptoEngine(kdf_params) + enc_key, hmac_key = engine.derive_keys(password, hdr.salt) + + try: + self._enc_ko = KeyObfuscator(SecureMemory(enc_key)) + self._enc_ko.obfuscate() + self._hmac_ko = KeyObfuscator(SecureMemory(hmac_key)) + self._hmac_ko.obfuscate() + + # Verify HMAC + with TimedExposure(self._hmac_ko) as hk: + header_hmac = engine.compute_hmac( + hk.get_bytes(), data[: MAGIC_LEN + HEADER_V4_SIZE] + ) + if not CryptoEngine.constant_time_compare(header_hmac, hdr.hmac): + self._clear_keys() + raise ValueError("Invalid header HMAC — wrong password or corrupted vault") + + # Decrypt + encrypted = data[MAGIC_LEN + HEADER_V4_SIZE + HMAC_SIZE :] + self._decrypt_entries(encrypted, data[: MAGIC_LEN + HEADER_V4_SIZE + HMAC_SIZE]) + + self.header = hdr + # Keep the header's engine for future saves + self.crypto = engine + + finally: + ba = bytearray(enc_key) + for i in range(len(ba)): + ba[i] = 0 + ba2 = bytearray(hmac_key) + for i in range(len(ba2)): + ba2[i] = 0 + + def _decrypt_entries(self, encrypted: bytes, ad: bytes) -> None: + """Shared decryption logic for both v3 and v4.""" + if not encrypted: + self.entries = {} + self.entry_order = [] + return + + nonce = encrypted[:NONCE_SIZE] + ciphertext = encrypted[NONCE_SIZE:] + + with TimedExposure(self._enc_ko) as ek: + plaintext = self.crypto.decrypt_data(ek.get_bytes(), nonce, ciphertext, ad) + + try: + vault_data = json.loads(plaintext.decode("utf-8")) + + if isinstance(vault_data, dict) and "entries" in vault_data: + entries_data = vault_data["entries"] + self.entry_order = vault_data.get("order", []) + else: + entries_data = vault_data + self.entry_order = [] + + self.entries = { + name: VaultEntry.from_dict(entry) for name, entry in entries_data.items() + } + + if not self.entry_order: + self.entry_order = sorted(self.entries.keys()) + finally: + pt_ba = bytearray(plaintext) + for i in range(len(pt_ba)): + pt_ba[i] = 0 + + # ------------------------------------------------------------------ + # Save (always v4) + # ------------------------------------------------------------------ + def _save(self) -> None: + vault_data = { + "entries": {n: e.to_dict() for n, e in self.entries.items()}, + "order": self.entry_order, + } + plaintext = json.dumps(vault_data, indent=2).encode("utf-8") + + try: + self.header.counter += 1 + self.header.modified = time.time() + + header_bytes = struct.pack( + HEADER_V4_FMT, + self.header.version, + self.header.counter, + self.header.salt, + int(self.header.created), + self.header.modified, + self.header.kdf_algorithm, + self.header.kdf_version, + self.header.kdf_time_cost, + self.header.kdf_memory_cost, + self.header.kdf_parallelism, + self.header.kdf_hash_len, + self.header.reserved, + ) + + with TimedExposure(self._hmac_ko) as hk: + self.header.hmac = self.crypto.compute_hmac( + hk.get_bytes(), MAGIC_V4 + header_bytes + ) + + ad = MAGIC_V4 + header_bytes + self.header.hmac + + with TimedExposure(self._enc_ko) as ek: + nonce, ciphertext = self.crypto.encrypt_data(ek.get_bytes(), plaintext, ad) + + blob = MAGIC_V4 + header_bytes + self.header.hmac + nonce + ciphertext + self.storage.write_atomic(blob) + self._modified = False + finally: + if "plaintext" in locals(): + pt_ba = bytearray(plaintext) + for i in range(len(pt_ba)): + pt_ba[i] = 0 + + # ------------------------------------------------------------------ + # CRUD + # ------------------------------------------------------------------ + def add_entry(self, name: str, password: str, metadata: Optional[Dict] = None) -> None: + if name in self.entries: + raise ValueError(f"Entry '{name}' already exists") + self.entries[name] = VaultEntry(name, password, metadata) + if name not in self.entry_order: + self.entry_order.append(name) + self._modified = True + self._save() + + def update_entry( + self, name: str, password: str | None = None, metadata: Optional[Dict] = None + ) -> None: + if name not in self.entries: + raise ValueError(f"Entry '{name}' not found") + entry = self.entries[name] + if password is not None: + entry.set_password(password) + if metadata is not None: + entry.metadata = metadata + entry.modified = time.time() + self._modified = True + self._save() + + def delete_entry(self, name: str) -> None: + if name not in self.entries: + raise ValueError(f"Entry '{name}' not found") + self.entries[name]._pw_ko.clear() + del self.entries[name] + if name in self.entry_order: + self.entry_order.remove(name) + self._modified = True + self._save() + + def list_entries(self) -> List[str]: + """Return ordered entry names without mutating internal state.""" + ordered = [n for n in self.entry_order if n in self.entries] + remaining = [n for n in self.entries if n not in self.entry_order] + return ordered + sorted(remaining) + + def update_all_passwords(self, password_gen) -> int: + if not self.entries: + return 0 + from keyguard.config import CHARSETS + + count = 0 + for entry in self.entries.values(): + new_pw = password_gen.generate(20, CHARSETS["full"]) + entry.set_password(new_pw) + count += 1 + self._modified = True + self._save() + logger.info("%d passwords bulk-updated", count) + return count + + # ------------------------------------------------------------------ + # Password change + # ------------------------------------------------------------------ + def change_password(self, old_password: SecureMemory, new_password: SecureMemory) -> None: + temp_enc_key, _ = self.crypto.derive_keys(old_password, self.header.salt) + try: + with TimedExposure(self._enc_ko) as ek: + if not CryptoEngine.constant_time_compare(temp_enc_key, ek.get_bytes()): + raise ValueError("Current password is incorrect") + finally: + ba = bytearray(temp_enc_key) + for i in range(len(ba)): + ba[i] = 0 + + new_salt = secrets.token_bytes(SALT_SIZE) + enc_key, hmac_key = self.crypto.derive_keys(new_password, new_salt) + try: + self._enc_ko.clear() + self._hmac_ko.clear() + self._enc_ko = KeyObfuscator(SecureMemory(enc_key)) + self._enc_ko.obfuscate() + self._hmac_ko = KeyObfuscator(SecureMemory(hmac_key)) + self._hmac_ko.obfuscate() + + self.header.salt = new_salt + self.header.kdf_time_cost = self.crypto.time_cost + self.header.kdf_memory_cost = self.crypto.memory_cost + self.header.kdf_parallelism = self.crypto.parallelism + + self._save() + self.storage.cleanup_old_backups() + logger.info("Master password changed") + finally: + ba = bytearray(enc_key) + for i in range(len(ba)): + ba[i] = 0 + ba2 = bytearray(hmac_key) + for i in range(len(ba2)): + ba2[i] = 0 + + # ------------------------------------------------------------------ + # Close / cleanup + # ------------------------------------------------------------------ + def close(self) -> None: + try: + for entry in self.entries.values(): + entry._pw_ko.clear() + self._clear_keys() + except Exception as exc: + logger.error("Error closing vault: %s", exc) + finally: + self.entries.clear() + self.entry_order.clear() + self._enc_ko = None + self._hmac_ko = None + self._modified = False + + def _clear_keys(self) -> None: + if self._enc_ko: + self._enc_ko.clear() + if self._hmac_ko: + self._hmac_ko.clear() diff --git a/keyguard/vault/models.py b/keyguard/vault/models.py new file mode 100644 index 0000000..c02e520 --- /dev/null +++ b/keyguard/vault/models.py @@ -0,0 +1,50 @@ +"""VaultEntry — single password entry with obfuscated in-memory storage.""" + +from __future__ import annotations + +import time +from typing import Dict, Optional + +from keyguard.util.memory import KeyObfuscator, SecureMemory, TimedExposure + + +class VaultEntry: + """A vault entry whose password is always kept in obfuscated SecureMemory.""" + + def __init__(self, name: str, password: str, metadata: Optional[Dict] = None): + self.name = name + self.metadata = metadata or {} + self.created = time.time() + self.modified = time.time() + + sm = SecureMemory(password.encode()) + self._pw_ko = KeyObfuscator(sm) + self._pw_ko.obfuscate() + + def get_password(self) -> str: + with TimedExposure(self._pw_ko) as sm: + return sm.get_bytes().decode() + + def set_password(self, new_pwd: str) -> None: + sm_new = SecureMemory(new_pwd.encode()) + if self._pw_ko: + self._pw_ko.clear() + self._pw_ko = KeyObfuscator(sm_new) + self._pw_ko.obfuscate() + self.modified = time.time() + + def to_dict(self) -> Dict: + return { + "name": self.name, + "password": self.get_password(), + "metadata": self.metadata, + "created": self.created, + "modified": self.modified, + } + + @classmethod + def from_dict(cls, data: Dict) -> VaultEntry: + entry = cls(data["name"], data["password"], data.get("metadata", {})) + entry.created = data.get("created", time.time()) + entry.modified = data.get("modified", time.time()) + return entry diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..678d632 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["setuptools>=64"] +build-backend = "setuptools.backends._legacy:_Backend" + +[project] +name = "keyguard" +version = "4.0.0" +description = "Secure Password Manager" +requires-python = ">=3.9" +license = "Apache-2.0" +dependencies = [ + "argon2-cffi", + "cryptography", + "psutil", + "ttkbootstrap", + "platformdirs", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", + "pytest-cov", + "ruff", + "black", +] + +[project.scripts] +keyguard = "keyguard.main:main" + +[tool.ruff] +line-length = 100 +target-version = "py39" +select = ["E", "F", "W", "I", "B"] + +[tool.black] +line-length = 100 +target-version = ["py39"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/requirements.txt b/requirements.txt index a85782c..928bc6a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ argon2-cffi cryptography psutil ttkbootstrap +platformdirs diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0c7dd46 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,125 @@ +"""Shared test fixtures.""" + +from __future__ import annotations + +import secrets +import struct +import json +import time +import tempfile +from pathlib import Path + +import pytest + +from keyguard.crypto.formats import ( + HEADER_V3_FMT, + HEADER_V3_SIZE, + HMAC_SIZE, + KEY_SIZE, + MAGIC_V3, + NONCE_SIZE, + PROTOCOL_VERSION_V3, + SALT_SIZE, +) + + +@pytest.fixture +def tmp_dir(tmp_path): + """A temporary directory for vault files.""" + return tmp_path + + +@pytest.fixture +def sample_password(): + """A valid master password that meets complexity requirements.""" + return "MyStr0ng!Pass#99" + + +@pytest.fixture +def weak_password(): + """A password that does NOT meet complexity requirements.""" + return "weak" + + +@pytest.fixture +def vault_dir(tmp_dir): + """A directory with proper structure for vault operations.""" + vault_dir = tmp_dir / "vault_test" + vault_dir.mkdir() + return vault_dir + + +def build_v3_vault(password: str, entries: dict, vault_path: Path) -> None: + """Helper: create a v3-format vault file for migration tests. + + *entries* is ``{name: password_str}``. + """ + import argon2 + import argon2.low_level + import hashlib + import hmac as hmac_mod + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + + salt = secrets.token_bytes(SALT_SIZE) + now = time.time() + + # Default KDF params (compat profile for speed in tests) + time_cost = 3 + memory_cost = 65_536 + parallelism = 2 + + master_key = argon2.low_level.hash_secret_raw( + password.encode(), + salt, + time_cost=time_cost, + memory_cost=memory_cost, + parallelism=parallelism, + hash_len=KEY_SIZE, + type=argon2.Type.ID, + ) + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=KEY_SIZE * 2, + salt=None, + info=b"KeyGuard-3.0.1 key-split", + ) + expanded = hkdf.derive(master_key) + enc_key = expanded[:KEY_SIZE] + hmac_key = expanded[KEY_SIZE:] + + header_bytes = struct.pack( + HEADER_V3_FMT, + PROTOCOL_VERSION_V3, + 1, # counter + salt, + int(now), + now, + ) + + header_hmac = hmac_mod.new(hmac_key, MAGIC_V3 + header_bytes, hashlib.sha256).digest() + + vault_data = { + "entries": { + name: { + "name": name, + "password": pwd, + "metadata": {}, + "created": now, + "modified": now, + } + for name, pwd in entries.items() + }, + "order": list(entries.keys()), + } + plaintext = json.dumps(vault_data).encode("utf-8") + + ad = MAGIC_V3 + header_bytes + header_hmac + cipher = ChaCha20Poly1305(enc_key) + nonce = secrets.token_bytes(NONCE_SIZE) + ciphertext = cipher.encrypt(nonce, plaintext, ad) + + blob = MAGIC_V3 + header_bytes + header_hmac + nonce + ciphertext + vault_path.parent.mkdir(parents=True, exist_ok=True) + vault_path.write_bytes(blob) diff --git a/tests/test_crypto.py b/tests/test_crypto.py new file mode 100644 index 0000000..76868dc --- /dev/null +++ b/tests/test_crypto.py @@ -0,0 +1,96 @@ +"""Tests for CryptoEngine — KDF roundtrip, encrypt/decrypt.""" + +from __future__ import annotations + +import secrets + +import pytest + +from keyguard.crypto.engine import CryptoEngine +from keyguard.crypto.formats import KEY_SIZE, SALT_SIZE +from keyguard.util.memory import SecureMemory + + +@pytest.fixture +def engine(): + # Use compat profile for fast tests + return CryptoEngine({"time_cost": 3, "memory_cost": 65_536, "parallelism": 2}) + + +class TestDeriveKeys: + def test_roundtrip(self, engine): + pw = SecureMemory("TestP@ssw0rd!") + salt = secrets.token_bytes(SALT_SIZE) + enc_key, hmac_key = engine.derive_keys(pw, salt) + assert len(enc_key) == KEY_SIZE + assert len(hmac_key) == KEY_SIZE + assert enc_key != hmac_key + + def test_deterministic(self, engine): + pw = SecureMemory("TestP@ssw0rd!") + salt = secrets.token_bytes(SALT_SIZE) + k1 = engine.derive_keys(SecureMemory("TestP@ssw0rd!"), salt) + k2 = engine.derive_keys(SecureMemory("TestP@ssw0rd!"), salt) + assert k1[0] == k2[0] + assert k1[1] == k2[1] + + def test_different_salt_gives_different_keys(self, engine): + pw = SecureMemory("TestP@ssw0rd!") + k1 = engine.derive_keys(SecureMemory("TestP@ssw0rd!"), secrets.token_bytes(SALT_SIZE)) + k2 = engine.derive_keys(SecureMemory("TestP@ssw0rd!"), secrets.token_bytes(SALT_SIZE)) + assert k1[0] != k2[0] + + def test_empty_password_raises(self, engine): + with pytest.raises(ValueError, match="Empty password"): + engine.derive_keys(SecureMemory(b""), secrets.token_bytes(SALT_SIZE)) + + +class TestEncryptDecrypt: + def test_roundtrip(self, engine): + key = secrets.token_bytes(KEY_SIZE) + plaintext = b"hello world secrets" + nonce, ct = engine.encrypt_data(key, plaintext) + result = engine.decrypt_data(key, nonce, ct) + assert result == plaintext + + def test_with_ad(self, engine): + key = secrets.token_bytes(KEY_SIZE) + plaintext = b"data" + ad = b"associated" + nonce, ct = engine.encrypt_data(key, plaintext, ad) + assert engine.decrypt_data(key, nonce, ct, ad) == plaintext + + def test_tampered_ciphertext_fails(self, engine): + from cryptography.exceptions import InvalidTag + + key = secrets.token_bytes(KEY_SIZE) + nonce, ct = engine.encrypt_data(key, b"secret") + tampered = bytearray(ct) + tampered[0] ^= 0xFF + with pytest.raises(InvalidTag): + engine.decrypt_data(key, nonce, bytes(tampered)) + + def test_wrong_key_fails(self, engine): + from cryptography.exceptions import InvalidTag + + key1 = secrets.token_bytes(KEY_SIZE) + key2 = secrets.token_bytes(KEY_SIZE) + nonce, ct = engine.encrypt_data(key1, b"secret") + with pytest.raises(InvalidTag): + engine.decrypt_data(key2, nonce, ct) + + +class TestHMAC: + def test_verify(self, engine): + key = secrets.token_bytes(KEY_SIZE) + data = b"message" + mac = engine.compute_hmac(key, data) + assert engine.verify_hmac(key, data, mac) + + def test_wrong_mac_fails(self, engine): + key = secrets.token_bytes(KEY_SIZE) + data = b"message" + mac = engine.compute_hmac(key, data) + bad = bytearray(mac) + bad[0] ^= 0xFF + assert not engine.verify_hmac(key, data, bytes(bad)) diff --git a/tests/test_formats.py b/tests/test_formats.py new file mode 100644 index 0000000..6e4f7ef --- /dev/null +++ b/tests/test_formats.py @@ -0,0 +1,142 @@ +"""Tests for vault header serialisation (v3 and v4).""" + +from __future__ import annotations + +import secrets +import struct +import time + +import pytest + +from keyguard.crypto.formats import ( + HEADER_V3_FMT, + HEADER_V3_SIZE, + HEADER_V4_FMT, + HEADER_V4_SIZE, + HMAC_SIZE, + KDF_ARGON2ID, + KDF_VERSION_19, + MAGIC_V3, + MAGIC_V4, + PROTOCOL_VERSION_V3, + PROTOCOL_VERSION_V4, + SALT_SIZE, + VaultHeaderV3, + VaultHeaderV4, + is_legacy_vault, + parse_vault_header, +) + + +class TestVaultHeaderV3: + def _make(self): + return VaultHeaderV3( + version=PROTOCOL_VERSION_V3, + counter=5, + salt=secrets.token_bytes(SALT_SIZE), + created=time.time(), + modified=time.time(), + hmac=secrets.token_bytes(HMAC_SIZE), + ) + + def test_roundtrip(self): + hdr = self._make() + raw = hdr.to_bytes() + assert len(raw) == HEADER_V3_SIZE + HMAC_SIZE + hdr2 = VaultHeaderV3.from_bytes(raw) + assert hdr2.version == hdr.version + assert hdr2.counter == hdr.counter + assert hdr2.salt == hdr.salt + assert hdr2.hmac == hdr.hmac + + def test_too_short_raises(self): + with pytest.raises(ValueError): + VaultHeaderV3.from_bytes(b"\x00" * 10) + + +class TestVaultHeaderV4: + def _make(self): + return VaultHeaderV4( + version=PROTOCOL_VERSION_V4, + counter=1, + salt=secrets.token_bytes(SALT_SIZE), + created=time.time(), + modified=time.time(), + kdf_algorithm=KDF_ARGON2ID, + kdf_version=KDF_VERSION_19, + kdf_time_cost=3, + kdf_memory_cost=65_536, + kdf_parallelism=2, + kdf_hash_len=32, + reserved=0, + hmac=secrets.token_bytes(HMAC_SIZE), + ) + + def test_roundtrip(self): + hdr = self._make() + raw = hdr.to_bytes() + assert len(raw) == HEADER_V4_SIZE + HMAC_SIZE + hdr2 = VaultHeaderV4.from_bytes(raw) + assert hdr2.version == PROTOCOL_VERSION_V4 + assert hdr2.kdf_time_cost == 3 + assert hdr2.kdf_memory_cost == 65_536 + assert hdr2.kdf_parallelism == 2 + assert hdr2.kdf_hash_len == 32 + + def test_get_kdf_params(self): + hdr = self._make() + params = hdr.get_kdf_params() + assert params["time_cost"] == 3 + assert params["memory_cost"] == 65_536 + assert params["parallelism"] == 2 + + def test_too_short_raises(self): + with pytest.raises(ValueError): + VaultHeaderV4.from_bytes(b"\x00" * 10) + + +class TestParseVaultHeader: + def test_detect_v3(self): + hdr = VaultHeaderV3( + version=PROTOCOL_VERSION_V3, + counter=0, + salt=b"\x00" * SALT_SIZE, + created=0, + modified=0, + hmac=b"\x00" * HMAC_SIZE, + ) + raw = MAGIC_V3 + hdr.to_bytes() + parsed = parse_vault_header(raw) + assert isinstance(parsed, VaultHeaderV3) + + def test_detect_v4(self): + hdr = VaultHeaderV4( + version=PROTOCOL_VERSION_V4, + counter=0, + salt=b"\x00" * SALT_SIZE, + created=0, + modified=0, + kdf_algorithm=0, + kdf_version=0x13, + kdf_time_cost=3, + kdf_memory_cost=65536, + kdf_parallelism=2, + kdf_hash_len=32, + reserved=0, + hmac=b"\x00" * HMAC_SIZE, + ) + raw = MAGIC_V4 + hdr.to_bytes() + parsed = parse_vault_header(raw) + assert isinstance(parsed, VaultHeaderV4) + + def test_unknown_magic_raises(self): + with pytest.raises(ValueError, match="Unrecognised"): + parse_vault_header(b"XXX" + b"\x00" * 200) + + +class TestIsLegacyVault: + def test_v3_is_legacy(self): + assert is_legacy_vault(MAGIC_V3 + b"\x00" * 200) + + def test_v4_is_not_legacy(self): + assert not is_legacy_vault(MAGIC_V4 + b"\x00" * 200) diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 0000000..e626f93 --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,97 @@ +"""Tests for SecureMemory, KeyObfuscator, and related utilities.""" + +from __future__ import annotations + +import threading +import time + +import pytest + +from keyguard.util.memory import ( + FragmentedSecret, + KeyObfuscator, + PasswordTimeout, + SecureMemory, + TimedExposure, +) + + +class TestSecureMemory: + def test_store_and_retrieve(self): + sm = SecureMemory(b"secret") + assert sm.get_bytes() == b"secret" + assert len(sm) == 6 + + def test_clear(self): + sm = SecureMemory(b"secret") + sm.clear() + assert len(sm) == 0 + with pytest.raises(ValueError): + sm.get_bytes() + + def test_from_string(self): + sm = SecureMemory("hello") + assert sm.get_bytes() == b"hello" + + def test_double_clear_safe(self): + sm = SecureMemory(b"x") + sm.clear() + sm.clear() # Should not raise + + +class TestFragmentedSecret: + def test_reconstruct(self): + fs = FragmentedSecret(b"my secret data", parts=3) + sm = fs.reconstruct() + assert sm.get_bytes() == b"my secret data" + sm.clear() + + def test_clear(self): + fs = FragmentedSecret(b"data") + fs.clear() + assert fs._parts == [] + + +class TestKeyObfuscator: + def test_obfuscate_deobfuscate(self): + key = SecureMemory(b"a" * 32) + ko = KeyObfuscator(key) + ko.obfuscate() + recovered = ko.deobfuscate() + assert recovered.get_bytes() == b"a" * 32 + recovered.clear() + ko.clear() + + def test_double_obfuscate(self): + key = SecureMemory(b"b" * 32) + ko = KeyObfuscator(key) + ko.obfuscate() + ko.obfuscate() # Re-obfuscate should work + recovered = ko.deobfuscate() + assert recovered.get_bytes() == b"b" * 32 + recovered.clear() + ko.clear() + + +class TestTimedExposure: + def test_context_manager(self): + key = SecureMemory(b"c" * 32) + ko = KeyObfuscator(key) + ko.obfuscate() + with TimedExposure(ko, timeout=5.0) as sm: + assert sm.get_bytes() == b"c" * 32 + ko.clear() + + +class TestPasswordTimeout: + def test_wipe_on_cancel(self): + sm = SecureMemory(b"password") + pt = PasswordTimeout(sm, timeout=9999) + pt.cancel() + assert len(sm) == 0 + + def test_timeout_wipes(self): + sm = SecureMemory(b"password") + pt = PasswordTimeout(sm, timeout=0.1) + time.sleep(0.4) + assert len(sm) == 0 diff --git a/tests/test_migration.py b/tests/test_migration.py new file mode 100644 index 0000000..685c443 --- /dev/null +++ b/tests/test_migration.py @@ -0,0 +1,79 @@ +"""Tests for v3→v4 vault migration.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from keyguard.crypto.engine import CryptoEngine +from keyguard.crypto.formats import MAGIC_V4, is_legacy_vault +from keyguard.storage.backend import StorageBackend +from keyguard.util.memory import SecureMemory +from keyguard.vault.manager import VaultManager +from tests.conftest import build_v3_vault + +COMPAT_KDF = {"time_cost": 3, "memory_cost": 65_536, "parallelism": 2} +PASSWORD = "MyStr0ng!Pass#99" + + +class TestV3ToV4Migration: + def test_open_v3_migrates_to_v4(self, tmp_path): + vault_path = tmp_path / "mig" / "vault.kg3" + build_v3_vault(PASSWORD, {"github": "secret123!"}, vault_path) + assert is_legacy_vault(vault_path.read_bytes()) + + storage = StorageBackend(vault_path) + crypto = CryptoEngine(COMPAT_KDF) + vm = VaultManager(storage, crypto) + vm.open(SecureMemory(PASSWORD)) + + # Entry should be intact + assert "github" in vm.entries + assert vm.entries["github"].get_password() == "secret123!" + + # File should now be v4 + assert vault_path.read_bytes()[:3] == MAGIC_V4 + vm.close() + + def test_v3_backup_created(self, tmp_path): + vault_path = tmp_path / "mig2" / "vault.kg3" + build_v3_vault(PASSWORD, {"test": "pw!"}, vault_path) + + storage = StorageBackend(vault_path) + crypto = CryptoEngine(COMPAT_KDF) + vm = VaultManager(storage, crypto) + vm.open(SecureMemory(PASSWORD)) + + # Check v3 backup was created + backups = list(vault_path.parent.glob("*.v3backup-*")) + assert len(backups) == 1 + assert is_legacy_vault(backups[0].read_bytes()) + vm.close() + + def test_migrated_vault_reopens(self, tmp_path): + vault_path = tmp_path / "mig3" / "vault.kg3" + build_v3_vault( + PASSWORD, + {"a": "pass_a!", "b": "pass_b!"}, + vault_path, + ) + + storage = StorageBackend(vault_path) + crypto = CryptoEngine(COMPAT_KDF) + vm = VaultManager(storage, crypto) + vm.open(SecureMemory(PASSWORD)) + vm.close() + # Release lock so we can reopen + storage._release_lock() + + # Reopen the now-v4 vault + storage2 = StorageBackend(vault_path) + crypto2 = CryptoEngine(COMPAT_KDF) + vm2 = VaultManager(storage2, crypto2) + vm2.open(SecureMemory(PASSWORD)) + assert set(vm2.entries.keys()) == {"a", "b"} + assert vm2.entries["a"].get_password() == "pass_a!" + assert vm2.entries["b"].get_password() == "pass_b!" + vm2.close() + storage2._release_lock() diff --git a/tests/test_password_gen.py b/tests/test_password_gen.py new file mode 100644 index 0000000..eaf7038 --- /dev/null +++ b/tests/test_password_gen.py @@ -0,0 +1,56 @@ +"""Tests for PasswordGenerator.""" + +from __future__ import annotations + +import string + +import pytest + +from keyguard.crypto.engine import PasswordGenerator + + +class TestGenerate: + def test_correct_length(self): + pw = PasswordGenerator.generate(20, string.ascii_letters + string.digits) + assert len(pw) == 20 + + def test_only_charset_chars(self): + charset = string.digits + pw = PasswordGenerator.generate(50, charset) + assert all(c in charset for c in pw) + + def test_empty_charset_raises(self): + with pytest.raises(ValueError, match="Empty charset"): + PasswordGenerator.generate(10, "") + + def test_zero_length_raises(self): + with pytest.raises(ValueError, match="at least 1"): + PasswordGenerator.generate(0, string.ascii_letters) + + +class TestEntropy: + def test_positive_entropy(self): + e = PasswordGenerator.calculate_entropy("abc", string.ascii_lowercase) + assert e > 0 + + def test_longer_is_more_entropy(self): + e1 = PasswordGenerator.calculate_entropy("abc", string.ascii_lowercase) + e2 = PasswordGenerator.calculate_entropy("abcdef", string.ascii_lowercase) + assert e2 > e1 + + def test_empty_returns_zero(self): + assert PasswordGenerator.calculate_entropy("", string.ascii_letters) == 0.0 + + +class TestPatterns: + def test_keyboard_sequences_detected(self): + assert PasswordGenerator._has_patterns("qwerty123") + + def test_triple_repeat_detected(self): + assert PasswordGenerator._has_patterns("xxaaabbcc") + + def test_numeric_sequence_detected(self): + assert PasswordGenerator._has_patterns("xx123yy") + + def test_clean_password_passes(self): + assert not PasswordGenerator._has_patterns("Kx9!mP2@") diff --git a/tests/test_rate_limit.py b/tests/test_rate_limit.py new file mode 100644 index 0000000..333184e --- /dev/null +++ b/tests/test_rate_limit.py @@ -0,0 +1,29 @@ +"""Tests for RateLimiter.""" + +from __future__ import annotations + +import pytest + +from keyguard.util.rate_limit import RateLimiter + + +class TestRateLimiter: + def test_allows_first_attempt(self): + rl = RateLimiter(max_attempts=5, delay_base=1) + rl.check() # Should not raise + assert rl.attempts == 1 + + def test_exceeds_max_attempts(self): + rl = RateLimiter(max_attempts=2, delay_base=1) + rl.check() + rl.check() + with pytest.raises(ValueError, match="Exceeded"): + rl.check() + + def test_reset(self): + rl = RateLimiter(max_attempts=2, delay_base=1) + rl.check() + rl.check() + rl.reset() + assert rl.attempts == 0 + rl.check() # Should work again diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..c22c168 --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,98 @@ +"""Tests for StorageBackend — atomic write, backup, restore, permissions.""" + +from __future__ import annotations + +import os +import platform +from pathlib import Path + +import pytest + +from keyguard.crypto.formats import ( + HMAC_SIZE, + MAGIC_V4, + HEADER_V4_SIZE, + PROTOCOL_VERSION_V4, + SALT_SIZE, + VaultHeaderV4, + KDF_ARGON2ID, + KDF_VERSION_19, +) +from keyguard.storage.backend import StorageBackend + + +@pytest.fixture +def backend(tmp_path): + vault_path = tmp_path / "test_vault" / "vault.kg3" + return StorageBackend(vault_path) + + +class TestAtomicWrite: + def test_write_and_read(self, backend): + data = b"hello world" + backend.write_atomic(data) + assert backend.read() == data + + def test_creates_backup_on_overwrite(self, backend): + backend.write_atomic(b"first") + backend.write_atomic(b"second") + assert backend.backup_path.exists() + assert backend.backup_path.read_bytes() == b"first" + + def test_exists(self, backend): + assert not backend.exists() + backend.write_atomic(b"data") + assert backend.exists() + + def test_permissions_unix(self, backend): + if platform.system() == "Windows": + pytest.skip("Unix-only test") + backend.write_atomic(b"data") + mode = oct(backend.vault_path.stat().st_mode & 0o777) + assert mode == "0o600" + + +class TestBackup: + def _write_valid_v4(self, backend): + """Write a minimal valid v4 vault blob.""" + import secrets + import struct + from keyguard.crypto.formats import HEADER_V4_FMT + + salt = secrets.token_bytes(SALT_SIZE) + hdr_bytes = struct.pack( + HEADER_V4_FMT, + PROTOCOL_VERSION_V4, + 1, + salt, + 0, + 0.0, + KDF_ARGON2ID, + KDF_VERSION_19, + 3, + 65536, + 2, + 32, + 0, + ) + hmac_val = b"\x00" * HMAC_SIZE + blob = MAGIC_V4 + hdr_bytes + hmac_val + b"\x00" * 20 + backend.write_atomic(blob) + return blob + + def test_restore_backup(self, backend): + blob = self._write_valid_v4(backend) + # Overwrite vault + backend.write_atomic(b"corrupted") + assert backend.verify_backup_integrity() + assert backend.restore_backup() + assert backend.read() == blob + + def test_no_backup_returns_false(self, backend): + assert not backend.verify_backup_integrity() + assert not backend.restore_backup() + + def test_corrupted_backup_rejected(self, backend): + backend.write_atomic(b"data") + # backup now has b"data" which is not a valid vault + assert not backend.verify_backup_integrity() diff --git a/tests/test_vault.py b/tests/test_vault.py new file mode 100644 index 0000000..d7887cf --- /dev/null +++ b/tests/test_vault.py @@ -0,0 +1,134 @@ +"""Tests for VaultManager — roundtrip, tamper, CRUD, entry order.""" + +from __future__ import annotations + +import json +import secrets +from pathlib import Path + +import pytest + +from keyguard.crypto.engine import CryptoEngine +from keyguard.crypto.formats import HMAC_SIZE, KEY_SIZE, MAGIC_V4, SALT_SIZE +from keyguard.storage.backend import StorageBackend +from keyguard.util.memory import SecureMemory +from keyguard.vault.manager import VaultManager + + +COMPAT_KDF = {"time_cost": 3, "memory_cost": 65_536, "parallelism": 2} +GOOD_PASSWORD = "MyStr0ng!Pass#99" + + +@pytest.fixture +def vault_env(tmp_path): + """Return (storage, crypto, vault_path).""" + vault_path = tmp_path / "vdata" / "vault.kg3" + storage = StorageBackend(vault_path) + crypto = CryptoEngine(COMPAT_KDF) + return storage, crypto, vault_path + + +class TestRoundtrip: + def test_create_save_open_validate(self, vault_env): + storage, crypto, _ = vault_env + pw = SecureMemory(GOOD_PASSWORD) + + vm = VaultManager(storage, crypto) + vm.create_new(pw) + vm.add_entry("github", "gh_secret_123!") + vm.add_entry("gitlab", "gl_secret_456!") + vm.close() + + # Re-open + vm2 = VaultManager(storage, crypto) + vm2.open(SecureMemory(GOOD_PASSWORD)) + assert set(vm2.entries.keys()) == {"github", "gitlab"} + assert vm2.entries["github"].get_password() == "gh_secret_123!" + assert vm2.entries["gitlab"].get_password() == "gl_secret_456!" + vm2.close() + + def test_wrong_password_fails(self, vault_env): + storage, crypto, _ = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.close() + + vm2 = VaultManager(storage, crypto) + with pytest.raises(ValueError): + vm2.open(SecureMemory("WrongP@ss123!")) + + +class TestTamper: + def test_altered_bytes_fails(self, vault_env): + storage, crypto, vault_path = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.add_entry("test", "password123!") + vm.close() + + # Tamper with the vault file + data = bytearray(vault_path.read_bytes()) + # Flip a byte in the ciphertext area (well past the header) + idx = min(len(data) - 1, 150) + data[idx] ^= 0xFF + vault_path.write_bytes(bytes(data)) + + vm2 = VaultManager(storage, crypto) + with pytest.raises((ValueError, Exception)): + vm2.open(SecureMemory(GOOD_PASSWORD)) + + +class TestCRUD: + def test_add_and_delete(self, vault_env): + storage, crypto, _ = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.add_entry("a", "pass_a!") + vm.add_entry("b", "pass_b!") + assert "a" in vm.entries + vm.delete_entry("a") + assert "a" not in vm.entries + assert "b" in vm.entries + vm.close() + + def test_update_entry(self, vault_env): + storage, crypto, _ = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.add_entry("x", "old_pass!") + vm.update_entry("x", password="new_pass!") + assert vm.entries["x"].get_password() == "new_pass!" + vm.close() + + def test_duplicate_add_raises(self, vault_env): + storage, crypto, _ = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.add_entry("dup", "pass!") + with pytest.raises(ValueError, match="already exists"): + vm.add_entry("dup", "pass2!") + vm.close() + + +class TestEntryOrder: + def test_list_entries_preserves_order(self, vault_env): + storage, crypto, _ = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.add_entry("c", "p1!") + vm.add_entry("a", "p2!") + vm.add_entry("b", "p3!") + assert vm.list_entries() == ["c", "a", "b"] + vm.close() + + def test_list_entries_does_not_mutate(self, vault_env): + storage, crypto, _ = vault_env + vm = VaultManager(storage, crypto) + vm.create_new(SecureMemory(GOOD_PASSWORD)) + vm.add_entry("x", "p!") + order_before = vm.entry_order.copy() + # Call list_entries many times + for _ in range(10): + vm.list_entries() + assert vm.entry_order == order_before + vm.close()