Files
luzia/lib/kg_maintainer.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

394 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Knowledge Graph Maintainer
Maintains Knowledge Graph health through:
- Automatic deduplication (merge similar entities)
- Index optimization
- Pruning outdated information
- Relation strengthening
"""
import json
import sqlite3
import time
from pathlib import Path
from typing import List, Dict, Tuple
from datetime import datetime, timedelta
class KGMaintainer:
"""Maintain Knowledge Graph health."""
KG_DB_PATHS = [
'/etc/luz-knowledge/research.db',
'/etc/luz-knowledge/projects.db',
'/etc/luz-knowledge/users.db',
'/etc/luz-knowledge/sysadmin.db',
]
def __init__(self):
"""Initialize KG maintainer."""
pass
def find_duplicate_entities(self, db_path: str, similarity_threshold: float = 0.8) -> List[Tuple]:
"""
Find potentially duplicate entities in KG.
Args:
db_path: Path to KG database
similarity_threshold: Similarity score threshold (0-1)
Returns:
List of (entity1_id, entity2_id, similarity_score) tuples
"""
duplicates = []
if not Path(db_path).exists():
return duplicates
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Get all entities
cursor.execute("SELECT id, name FROM entities")
entities = cursor.fetchall()
# Compare names for similarity
for i, (id1, name1) in enumerate(entities):
for id2, name2 in entities[i+1:]:
similarity = self._string_similarity(name1, name2)
if similarity >= similarity_threshold:
duplicates.append((id1, id2, similarity))
except Exception as e:
print(f"Error finding duplicates in {db_path}: {e}")
return duplicates
def merge_duplicate_entities(self, db_path: str, entity1_id: str, entity2_id: str,
dry_run: bool = True) -> Dict:
"""
Merge two duplicate entities.
Args:
db_path: Path to KG database
entity1_id: First entity ID (keep this)
entity2_id: Second entity ID (delete this, merge into first)
dry_run: If True, preview only
Returns:
Dict with merge result
"""
result = {
'entity1_id': entity1_id,
'entity2_id': entity2_id,
'status': 'pending',
'actions': [],
'dry_run': dry_run
}
if not Path(db_path).exists():
result['status'] = 'error'
result['actions'].append('Database not found')
return result
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# 1. Merge observations
cursor.execute(
"SELECT content FROM observations WHERE entity_id = ?",
(entity2_id,)
)
obs2 = cursor.fetchall()
for (obs,) in obs2:
result['actions'].append(f"Merge observation from {entity2_id}")
if not dry_run:
cursor.execute(
"INSERT INTO observations (entity_id, content) VALUES (?, ?)",
(entity1_id, obs)
)
# 2. Update relations pointing to entity2 to point to entity1
cursor.execute(
"SELECT id, from_entity_id, to_entity_id, relation_type FROM relations WHERE to_entity_id = ?",
(entity2_id,)
)
relations = cursor.fetchall()
for rel_id, from_id, to_id, rel_type in relations:
result['actions'].append(f"Update relation {rel_type} to point to {entity1_id}")
if not dry_run:
cursor.execute(
"UPDATE relations SET to_entity_id = ? WHERE id = ?",
(entity1_id, rel_id)
)
# 3. Delete entity2
result['actions'].append(f"Delete duplicate entity {entity2_id}")
if not dry_run:
cursor.execute("DELETE FROM observations WHERE entity_id = ?", (entity2_id,))
cursor.execute("DELETE FROM entities WHERE id = ?", (entity2_id,))
conn.commit()
result['status'] = 'success'
except Exception as e:
result['status'] = 'error'
result['actions'].append(f"Error: {e}")
return result
def optimize_indexes(self, db_path: str, dry_run: bool = True) -> Dict:
"""
Optimize database indexes.
Args:
db_path: Path to KG database
dry_run: If True, preview only
Returns:
Dict with optimization result
"""
result = {
'database': db_path,
'status': 'pending',
'actions': [],
'dry_run': dry_run
}
if not Path(db_path).exists():
result['status'] = 'not_found'
return result
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# VACUUM to optimize storage
result['actions'].append("Run VACUUM to optimize storage")
if not dry_run:
cursor.execute("VACUUM")
# ANALYZE to update statistics
result['actions'].append("Run ANALYZE to update query statistics")
if not dry_run:
cursor.execute("ANALYZE")
# Rebuild FTS5 indexes
result['actions'].append("Rebuild FTS5 indexes")
if not dry_run:
try:
cursor.execute("SELECT rebuild FROM entities_fts")
except sqlite3.OperationalError:
# FTS5 table might not exist
pass
if not dry_run:
conn.commit()
result['status'] = 'success'
except Exception as e:
result['status'] = 'error'
result['actions'].append(f"Error: {e}")
return result
def prune_outdated_information(self, db_path: str, age_days: int = 365,
dry_run: bool = True) -> Dict:
"""
Prune outdated entities (optional, with caution).
Args:
db_path: Path to KG database
age_days: Remove entities older than N days
dry_run: If True, preview only
Returns:
Dict with pruning result
"""
result = {
'database': db_path,
'pruned_count': 0,
'status': 'pending',
'actions': [],
'dry_run': dry_run
}
if not Path(db_path).exists():
result['status'] = 'not_found'
return result
# DON'T actually prune without explicit approval
result['actions'].append(f"[REQUIRES APPROVAL] Would prune entities older than {age_days} days")
result['status'] = 'requires_approval'
return result
def strengthen_relations(self, db_path: str, dry_run: bool = True) -> Dict:
"""
Strengthen entity relations by consolidating duplicates.
Args:
db_path: Path to KG database
dry_run: If True, preview only
Returns:
Dict with relation strengthening result
"""
result = {
'database': db_path,
'actions': [],
'dry_run': dry_run,
'relations_strengthened': 0
}
if not Path(db_path).exists():
result['status'] = 'not_found'
return result
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Find and consolidate duplicate relations
cursor.execute("""
SELECT from_entity_id, to_entity_id, relation_type, COUNT(*) as count
FROM relations
GROUP BY from_entity_id, to_entity_id, relation_type
HAVING count > 1
""")
duplicates = cursor.fetchall()
for from_id, to_id, rel_type, count in duplicates:
result['actions'].append(
f"Consolidate {count} duplicate relations: {rel_type}"
)
result['relations_strengthened'] += 1
if not dry_run:
# Keep one, delete duplicates
cursor.execute("""
DELETE FROM relations
WHERE from_entity_id = ? AND to_entity_id = ? AND relation_type = ?
AND id NOT IN (
SELECT id FROM relations
WHERE from_entity_id = ? AND to_entity_id = ? AND relation_type = ?
LIMIT 1
)
""", (from_id, to_id, rel_type, from_id, to_id, rel_type))
if not dry_run:
conn.commit()
result['status'] = 'success'
except Exception as e:
result['status'] = 'error'
result['actions'].append(f"Error: {e}")
return result
def run_full_kg_maintenance(self, dry_run: bool = True) -> Dict:
"""
Run comprehensive KG maintenance across all databases.
Args:
dry_run: If True, preview only
Returns:
Dict with maintenance summary
"""
maintenance_result = {
'timestamp': time.time(),
'dry_run': dry_run,
'databases_processed': 0,
'duplicates_found': 0,
'duplicates_merged': 0,
'indexes_optimized': 0,
'relations_strengthened': 0,
'actions': [],
'dry_run': dry_run
}
for db_path in self.KG_DB_PATHS:
if not Path(db_path).exists():
continue
maintenance_result['databases_processed'] += 1
# Find duplicates
duplicates = self.find_duplicate_entities(db_path, similarity_threshold=0.85)
maintenance_result['duplicates_found'] += len(duplicates)
# Merge duplicates (with caution)
for entity1_id, entity2_id, similarity in duplicates[:5]: # Limit to 5 per DB
if similarity > 0.95: # Only auto-merge very high similarity
result = self.merge_duplicate_entities(db_path, entity1_id, entity2_id, dry_run=dry_run)
if result['status'] == 'success':
maintenance_result['duplicates_merged'] += 1
# Optimize indexes
opt_result = self.optimize_indexes(db_path, dry_run=dry_run)
if opt_result['status'] == 'success':
maintenance_result['indexes_optimized'] += 1
# Strengthen relations
rel_result = self.strengthen_relations(db_path, dry_run=dry_run)
if rel_result['status'] == 'success':
maintenance_result['relations_strengthened'] += rel_result['relations_strengthened']
return maintenance_result
def _string_similarity(self, s1: str, s2: str) -> float:
"""Calculate string similarity (0-1)."""
# Simple Levenshtein-based similarity
if s1 == s2:
return 1.0
# Normalize strings
s1 = s1.lower().strip()
s2 = s2.lower().strip()
if s1 == s2:
return 1.0
# Check for substring match
if s1 in s2 or s2 in s1:
return 0.9
# Levenshtein distance approximation
max_len = max(len(s1), len(s2))
if max_len == 0:
return 1.0
# Simple character overlap
set1 = set(s1)
set2 = set(s2)
overlap = len(set1 & set2) / max(len(set1 | set2), 1)
return overlap
if __name__ == '__main__':
maintainer = KGMaintainer()
print("=" * 70)
print("KG MAINTENANCE DRY RUN")
print("=" * 70)
result = maintainer.run_full_kg_maintenance(dry_run=True)
print(f"\nDatabases processed: {result['databases_processed']}")
print(f"Duplicates found: {result['duplicates_found']}")
print(f"Would merge: {result['duplicates_merged']}")
print(f"Indexes to optimize: {result['indexes_optimized']}")
print(f"Relations to strengthen: {result['relations_strengthened']}")