diff --git a/app.py b/app.py index a1e2d51..26ddc3b 100644 --- a/app.py +++ b/app.py @@ -120,6 +120,12 @@ def __init__(self, id: str, vector: List[float], payload: Dict[str, Any]): CONSOLIDATION_RUN_LABEL, CONSOLIDATION_CONTROL_NODE_ID, CONSOLIDATION_TASK_FIELDS, + # Memory protection configuration + CONSOLIDATION_DELETE_THRESHOLD, + CONSOLIDATION_ARCHIVE_THRESHOLD, + CONSOLIDATION_GRACE_PERIOD_DAYS, + CONSOLIDATION_IMPORTANCE_PROTECTION_THRESHOLD, + CONSOLIDATION_PROTECTED_TYPES, ENRICHMENT_MAX_ATTEMPTS, ENRICHMENT_SIMILARITY_LIMIT, ENRICHMENT_SIMILARITY_THRESHOLD, @@ -1465,7 +1471,15 @@ def _persist_consolidation_run(graph: Any, result: Dict[str, Any]) -> None: def _build_scheduler_from_graph(graph: Any) -> Optional[ConsolidationScheduler]: vector_store = get_qdrant_client() - consolidator = MemoryConsolidator(graph, vector_store) + consolidator = MemoryConsolidator( + graph, + vector_store, + delete_threshold=CONSOLIDATION_DELETE_THRESHOLD, + archive_threshold=CONSOLIDATION_ARCHIVE_THRESHOLD, + grace_period_days=CONSOLIDATION_GRACE_PERIOD_DAYS, + importance_protection_threshold=CONSOLIDATION_IMPORTANCE_PROTECTION_THRESHOLD, + protected_types=CONSOLIDATION_PROTECTED_TYPES, + ) scheduler = ConsolidationScheduler(consolidator) _apply_scheduler_overrides(scheduler) @@ -2655,7 +2669,15 @@ def consolidate_memories() -> Any: try: vector_store = get_qdrant_client() - consolidator = MemoryConsolidator(graph, vector_store) + consolidator = MemoryConsolidator( + graph, + vector_store, + delete_threshold=CONSOLIDATION_DELETE_THRESHOLD, + archive_threshold=CONSOLIDATION_ARCHIVE_THRESHOLD, + grace_period_days=CONSOLIDATION_GRACE_PERIOD_DAYS, + importance_protection_threshold=CONSOLIDATION_IMPORTANCE_PROTECTION_THRESHOLD, + protected_types=CONSOLIDATION_PROTECTED_TYPES, + ) results = consolidator.consolidate(mode=mode, dry_run=dry_run) if not dry_run: diff --git a/automem/config.py b/automem/config.py index 2e5e4e0..18b9597 100644 --- a/automem/config.py +++ b/automem/config.py @@ -27,6 +27,19 @@ float(_DECAY_THRESHOLD_RAW) if _DECAY_THRESHOLD_RAW else None ) CONSOLIDATION_HISTORY_LIMIT = int(os.getenv("CONSOLIDATION_HISTORY_LIMIT", "20")) + +# Memory protection configuration (prevents accidental data loss) +CONSOLIDATION_DELETE_THRESHOLD = float(os.getenv("CONSOLIDATION_DELETE_THRESHOLD", "0.05")) +CONSOLIDATION_ARCHIVE_THRESHOLD = float(os.getenv("CONSOLIDATION_ARCHIVE_THRESHOLD", "0.2")) +CONSOLIDATION_GRACE_PERIOD_DAYS = int(os.getenv("CONSOLIDATION_GRACE_PERIOD_DAYS", "90")) +CONSOLIDATION_IMPORTANCE_PROTECTION_THRESHOLD = float( + os.getenv("CONSOLIDATION_IMPORTANCE_PROTECTION_THRESHOLD", "0.5") +) +_PROTECTED_TYPES_RAW = os.getenv("CONSOLIDATION_PROTECTED_TYPES", "Decision,Insight").strip() +CONSOLIDATION_PROTECTED_TYPES = ( + frozenset(t.strip() for t in _PROTECTED_TYPES_RAW.split(",") if t.strip()) + if _PROTECTED_TYPES_RAW else frozenset() +) CONSOLIDATION_CONTROL_LABEL = "ConsolidationControl" CONSOLIDATION_RUN_LABEL = "ConsolidationRun" CONSOLIDATION_CONTROL_NODE_ID = os.getenv("CONSOLIDATION_CONTROL_NODE_ID", "global") diff --git a/consolidation.py b/consolidation.py index ddeadfd..3261174 100644 --- a/consolidation.py +++ b/consolidation.py @@ -104,9 +104,27 @@ class MemoryConsolidator: - Sleep consolidates memories by strengthening important connections - Dreams create novel associations between disparate memories - Forgetting is controlled and serves learning + + Protection Features: + - Grace period: New/restored memories are protected for configurable days + - Importance threshold: High-importance memories are never deleted + - Protected flag: Explicitly protected memories are preserved + - Type protection: Critical memory types (Decision, Insight) are preserved """ - def __init__(self, graph: GraphLike, vector_store: Optional[VectorStoreProtocol] = None): + # Memory types that should never be automatically deleted + PROTECTED_TYPES = frozenset({'Decision', 'Insight'}) + + def __init__( + self, + graph: GraphLike, + vector_store: Optional[VectorStoreProtocol] = None, + delete_threshold: float = 0.05, + archive_threshold: float = 0.2, + grace_period_days: int = 90, + importance_protection_threshold: float = 0.5, + protected_types: Optional[Set[str]] = None, + ): self.graph = graph self.vector_store = vector_store self._graph_id = id(graph) # Unique ID for cache invalidation @@ -120,9 +138,14 @@ def __init__(self, graph: GraphLike, vector_store: Optional[VectorStoreProtocol] self.min_cluster_size = 3 self.similarity_threshold = 0.75 - # Forgetting thresholds - self.archive_threshold = 0.2 # Archive below this relevance - self.delete_threshold = 0.05 # Delete below this (very old, unused) + # Forgetting thresholds (now configurable) + self.archive_threshold = archive_threshold + self.delete_threshold = delete_threshold + + # Protection parameters (prevent data loss) + self.grace_period_days = grace_period_days + self.importance_protection_threshold = importance_protection_threshold + self.protected_types = protected_types or self.PROTECTED_TYPES def _query_graph(self, query: str, params: Optional[Dict[str, Any]] = None) -> List[Sequence[Any]]: """Execute graph query and return the raw result set from FalkorDB.""" @@ -163,6 +186,48 @@ def _get_relationship_count(self, memory_id: str) -> int: logger.exception("Failed to get relationship count for %s", memory_id) return 0 + def _should_protect_memory( + self, + memory: Dict[str, Any], + current_time: datetime + ) -> tuple[bool, str]: + """ + Determine if a memory should be protected from deletion. + + Multi-layer protection prevents accidental data loss: + 1. Explicit protection flag (protected=true) + 2. High importance score (>= threshold) + 3. Grace period for new/restored memories + 4. Critical memory types (Decision, Insight) + + Returns: + tuple: (should_protect: bool, reason: str) + """ + # Check explicit protection flag + if memory.get('protected'): + return True, f"explicitly protected: {memory.get('protected_reason', 'no reason')}" + + # Check importance threshold + importance = memory.get('importance', 0) or 0 + if importance >= self.importance_protection_threshold: + return True, f"high importance ({importance:.2f} >= {self.importance_protection_threshold})" + + # Check grace period + timestamp_str = memory.get('timestamp') + if timestamp_str: + created_at = _parse_iso_datetime(timestamp_str) + if created_at: + age = current_time - created_at + if age.days < self.grace_period_days: + return True, f"within grace period ({age.days} < {self.grace_period_days} days)" + + # Check memory type + memory_type = memory.get('type') + if memory_type and memory_type in self.protected_types: + return True, f"protected type: {memory_type}" + + return False, "" + def calculate_relevance_score( self, memory: Dict[str, Any], @@ -423,24 +488,32 @@ def apply_controlled_forgetting( dry_run: bool = True ) -> Dict[str, Any]: """ - Archive or delete low-relevance memories. + Archive or delete low-relevance memories with multi-layer protection. + + Protection prevents deletion of: + - Explicitly protected memories (protected=true) + - High-importance memories (importance >= threshold) + - Recent memories within grace period + - Critical memory types (Decision, Insight) - Returns statistics about what was archived/deleted. + Returns statistics about what was archived/deleted/protected. """ stats = { 'examined': 0, 'archived': [], 'deleted': [], - 'preserved': 0 + 'preserved': 0, + 'protected': [], # Track memories saved by protection } - # Get all memories with scores + # Get all memories with scores and protection fields all_memories_query = """ MATCH (m:Memory) RETURN m.id as id, m.content as content, m.relevance_score as score, m.timestamp as timestamp, m.type as type, m.importance as importance, - m.last_accessed as last_accessed + m.last_accessed as last_accessed, + m.protected as protected, m.protected_reason as protected_reason """ result = self._query_graph(all_memories_query, {}) @@ -449,7 +522,8 @@ def apply_controlled_forgetting( for row in result: stats['examined'] += 1 - # Result row order: id, content, score, timestamp, type, importance, last_accessed + # Result row order: id, content, score, timestamp, type, importance, + # last_accessed, protected, protected_reason memory = { 'id': row[0], 'content': row[1], @@ -457,14 +531,33 @@ def apply_controlled_forgetting( 'timestamp': row[3], 'type': row[4], 'importance': row[5], - 'last_accessed': row[6] if len(row) > 6 else None + 'last_accessed': row[6] if len(row) > 6 else None, + 'protected': row[7] if len(row) > 7 else None, + 'protected_reason': row[8] if len(row) > 8 else None, } # Calculate current relevance relevance = self.calculate_relevance_score(memory, current_time) + # Check multi-layer protection before deletion + should_protect, protect_reason = self._should_protect_memory(memory, current_time) + # Determine fate if relevance < self.delete_threshold: + # Would normally delete, but check protection first + if should_protect: + stats['protected'].append({ + 'id': memory['id'], + 'content_preview': memory['content'][:50] if memory['content'] else '', + 'relevance': relevance, + 'type': memory.get('type', 'Memory'), + 'protection_reason': protect_reason, + }) + logger.debug( + "Protected memory %s from deletion: %s (relevance=%.3f)", + memory['id'], protect_reason, relevance + ) + continue # Skip to next memory stats['deleted'].append({ 'id': memory['id'], 'content_preview': memory['content'][:50], @@ -530,6 +623,18 @@ def apply_controlled_forgetting( "score": relevance }) + # Log protection summary + if stats['protected']: + logger.info( + "Memory protection activated: %d memories saved from deletion " + "(examined=%d, deleted=%d, archived=%d, preserved=%d)", + len(stats['protected']), + stats['examined'], + len(stats['deleted']), + len(stats['archived']), + stats['preserved'], + ) + return stats def consolidate(