#!/usr/bin/env python3 """ Knowledge Graph Maintainer Maintains Knowledge Graph health through: - Automatic deduplication (merge similar entities) - Index optimization - Pruning outdated information - Relation strengthening """ import json import sqlite3 import time from pathlib import Path from typing import List, Dict, Tuple from datetime import datetime, timedelta class KGMaintainer: """Maintain Knowledge Graph health.""" KG_DB_PATHS = [ '/etc/luz-knowledge/research.db', '/etc/luz-knowledge/projects.db', '/etc/luz-knowledge/users.db', '/etc/luz-knowledge/sysadmin.db', ] def __init__(self): """Initialize KG maintainer.""" pass def find_duplicate_entities(self, db_path: str, similarity_threshold: float = 0.8) -> List[Tuple]: """ Find potentially duplicate entities in KG. Args: db_path: Path to KG database similarity_threshold: Similarity score threshold (0-1) Returns: List of (entity1_id, entity2_id, similarity_score) tuples """ duplicates = [] if not Path(db_path).exists(): return duplicates try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # Get all entities cursor.execute("SELECT id, name FROM entities") entities = cursor.fetchall() # Compare names for similarity for i, (id1, name1) in enumerate(entities): for id2, name2 in entities[i+1:]: similarity = self._string_similarity(name1, name2) if similarity >= similarity_threshold: duplicates.append((id1, id2, similarity)) except Exception as e: print(f"Error finding duplicates in {db_path}: {e}") return duplicates def merge_duplicate_entities(self, db_path: str, entity1_id: str, entity2_id: str, dry_run: bool = True) -> Dict: """ Merge two duplicate entities. Args: db_path: Path to KG database entity1_id: First entity ID (keep this) entity2_id: Second entity ID (delete this, merge into first) dry_run: If True, preview only Returns: Dict with merge result """ result = { 'entity1_id': entity1_id, 'entity2_id': entity2_id, 'status': 'pending', 'actions': [], 'dry_run': dry_run } if not Path(db_path).exists(): result['status'] = 'error' result['actions'].append('Database not found') return result try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # 1. Merge observations cursor.execute( "SELECT content FROM observations WHERE entity_id = ?", (entity2_id,) ) obs2 = cursor.fetchall() for (obs,) in obs2: result['actions'].append(f"Merge observation from {entity2_id}") if not dry_run: cursor.execute( "INSERT INTO observations (entity_id, content) VALUES (?, ?)", (entity1_id, obs) ) # 2. Update relations pointing to entity2 to point to entity1 cursor.execute( "SELECT id, from_entity_id, to_entity_id, relation_type FROM relations WHERE to_entity_id = ?", (entity2_id,) ) relations = cursor.fetchall() for rel_id, from_id, to_id, rel_type in relations: result['actions'].append(f"Update relation {rel_type} to point to {entity1_id}") if not dry_run: cursor.execute( "UPDATE relations SET to_entity_id = ? WHERE id = ?", (entity1_id, rel_id) ) # 3. Delete entity2 result['actions'].append(f"Delete duplicate entity {entity2_id}") if not dry_run: cursor.execute("DELETE FROM observations WHERE entity_id = ?", (entity2_id,)) cursor.execute("DELETE FROM entities WHERE id = ?", (entity2_id,)) conn.commit() result['status'] = 'success' except Exception as e: result['status'] = 'error' result['actions'].append(f"Error: {e}") return result def optimize_indexes(self, db_path: str, dry_run: bool = True) -> Dict: """ Optimize database indexes. Args: db_path: Path to KG database dry_run: If True, preview only Returns: Dict with optimization result """ result = { 'database': db_path, 'status': 'pending', 'actions': [], 'dry_run': dry_run } if not Path(db_path).exists(): result['status'] = 'not_found' return result try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # VACUUM to optimize storage result['actions'].append("Run VACUUM to optimize storage") if not dry_run: cursor.execute("VACUUM") # ANALYZE to update statistics result['actions'].append("Run ANALYZE to update query statistics") if not dry_run: cursor.execute("ANALYZE") # Rebuild FTS5 indexes result['actions'].append("Rebuild FTS5 indexes") if not dry_run: try: cursor.execute("SELECT rebuild FROM entities_fts") except sqlite3.OperationalError: # FTS5 table might not exist pass if not dry_run: conn.commit() result['status'] = 'success' except Exception as e: result['status'] = 'error' result['actions'].append(f"Error: {e}") return result def prune_outdated_information(self, db_path: str, age_days: int = 365, dry_run: bool = True) -> Dict: """ Prune outdated entities (optional, with caution). Args: db_path: Path to KG database age_days: Remove entities older than N days dry_run: If True, preview only Returns: Dict with pruning result """ result = { 'database': db_path, 'pruned_count': 0, 'status': 'pending', 'actions': [], 'dry_run': dry_run } if not Path(db_path).exists(): result['status'] = 'not_found' return result # DON'T actually prune without explicit approval result['actions'].append(f"[REQUIRES APPROVAL] Would prune entities older than {age_days} days") result['status'] = 'requires_approval' return result def strengthen_relations(self, db_path: str, dry_run: bool = True) -> Dict: """ Strengthen entity relations by consolidating duplicates. Args: db_path: Path to KG database dry_run: If True, preview only Returns: Dict with relation strengthening result """ result = { 'database': db_path, 'actions': [], 'dry_run': dry_run, 'relations_strengthened': 0 } if not Path(db_path).exists(): result['status'] = 'not_found' return result try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() # Find and consolidate duplicate relations cursor.execute(""" SELECT from_entity_id, to_entity_id, relation_type, COUNT(*) as count FROM relations GROUP BY from_entity_id, to_entity_id, relation_type HAVING count > 1 """) duplicates = cursor.fetchall() for from_id, to_id, rel_type, count in duplicates: result['actions'].append( f"Consolidate {count} duplicate relations: {rel_type}" ) result['relations_strengthened'] += 1 if not dry_run: # Keep one, delete duplicates cursor.execute(""" DELETE FROM relations WHERE from_entity_id = ? AND to_entity_id = ? AND relation_type = ? AND id NOT IN ( SELECT id FROM relations WHERE from_entity_id = ? AND to_entity_id = ? AND relation_type = ? LIMIT 1 ) """, (from_id, to_id, rel_type, from_id, to_id, rel_type)) if not dry_run: conn.commit() result['status'] = 'success' except Exception as e: result['status'] = 'error' result['actions'].append(f"Error: {e}") return result def run_full_kg_maintenance(self, dry_run: bool = True) -> Dict: """ Run comprehensive KG maintenance across all databases. Args: dry_run: If True, preview only Returns: Dict with maintenance summary """ maintenance_result = { 'timestamp': time.time(), 'dry_run': dry_run, 'databases_processed': 0, 'duplicates_found': 0, 'duplicates_merged': 0, 'indexes_optimized': 0, 'relations_strengthened': 0, 'actions': [], 'dry_run': dry_run } for db_path in self.KG_DB_PATHS: if not Path(db_path).exists(): continue maintenance_result['databases_processed'] += 1 # Find duplicates duplicates = self.find_duplicate_entities(db_path, similarity_threshold=0.85) maintenance_result['duplicates_found'] += len(duplicates) # Merge duplicates (with caution) for entity1_id, entity2_id, similarity in duplicates[:5]: # Limit to 5 per DB if similarity > 0.95: # Only auto-merge very high similarity result = self.merge_duplicate_entities(db_path, entity1_id, entity2_id, dry_run=dry_run) if result['status'] == 'success': maintenance_result['duplicates_merged'] += 1 # Optimize indexes opt_result = self.optimize_indexes(db_path, dry_run=dry_run) if opt_result['status'] == 'success': maintenance_result['indexes_optimized'] += 1 # Strengthen relations rel_result = self.strengthen_relations(db_path, dry_run=dry_run) if rel_result['status'] == 'success': maintenance_result['relations_strengthened'] += rel_result['relations_strengthened'] return maintenance_result def _string_similarity(self, s1: str, s2: str) -> float: """Calculate string similarity (0-1).""" # Simple Levenshtein-based similarity if s1 == s2: return 1.0 # Normalize strings s1 = s1.lower().strip() s2 = s2.lower().strip() if s1 == s2: return 1.0 # Check for substring match if s1 in s2 or s2 in s1: return 0.9 # Levenshtein distance approximation max_len = max(len(s1), len(s2)) if max_len == 0: return 1.0 # Simple character overlap set1 = set(s1) set2 = set(s2) overlap = len(set1 & set2) / max(len(set1 | set2), 1) return overlap if __name__ == '__main__': maintainer = KGMaintainer() print("=" * 70) print("KG MAINTENANCE DRY RUN") print("=" * 70) result = maintainer.run_full_kg_maintenance(dry_run=True) print(f"\nDatabases processed: {result['databases_processed']}") print(f"Duplicates found: {result['duplicates_found']}") print(f"Would merge: {result['duplicates_merged']}") print(f"Indexes to optimize: {result['indexes_optimized']}") print(f"Relations to strengthen: {result['relations_strengthened']}")