#!/usr/bin/env python3 """ Luzia Research Consolidator Extracts research findings from projects KG and consolidates into research KG """ import sqlite3 import json from pathlib import Path from datetime import datetime class LuziaResearchConsolidator: """Consolidate project research into research KG""" def __init__(self): self.projects_kg = Path("/etc/zen-swarm/memory/projects.db") self.research_kg = Path("/etc/luz-knowledge/research.db") self.log_file = Path("/opt/server-agents/logs/research-consolidation.log") self.log_file.parent.mkdir(parents=True, exist_ok=True) def log(self, message): """Log consolidation action""" timestamp = datetime.now().isoformat() log_entry = f"[{timestamp}] {message}\n" with open(self.log_file, 'a') as f: f.write(log_entry) print(message) def get_projects_research(self): """Extract all research entities from projects KG""" if not self.projects_kg.exists(): self.log("âš ī¸ Projects KG not found") return {'entities': [], 'relations': []} try: conn = sqlite3.connect(self.projects_kg) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get all entities from projects KG (no type filtering available) cursor.execute("SELECT id, name, type FROM entities ORDER BY name") research = [] entity_map = {} # Map name to id for relations for row in cursor.fetchall(): entity = { 'id': row['id'], 'name': row['name'], 'type': row['type'] or 'finding' } research.append(entity) entity_map[row['name']] = row['id'] self.log(f"📍 Found {len(research)} entities in projects KG") # Get relations cursor.execute(""" SELECT r.source_id, e1.name as source_name, r.target_id, e2.name as target_name, r.relation, r.context FROM relations r LEFT JOIN entities e1 ON r.source_id = e1.id LEFT JOIN entities e2 ON r.target_id = e2.id """) relations = [] for row in cursor.fetchall(): relations.append({ 'source_id': row['source_id'], 'source_name': row['source_name'], 'target_id': row['target_id'], 'target_name': row['target_name'], 'relation': row['relation'], 'context': row['context'] }) self.log(f"📍 Found {len(relations)} relations in projects KG") conn.close() return {'entities': research, 'relations': relations, 'entity_map': entity_map} except Exception as e: self.log(f"❌ Error reading projects KG: {e}") import traceback traceback.print_exc() return {'entities': [], 'relations': [], 'entity_map': {}} def merge_into_research_kg(self, project_research): """Merge project research into research KG""" if not project_research['entities']: self.log("â„šī¸ No research found in projects KG") return 0 try: import uuid import time conn = sqlite3.connect(self.research_kg) cursor = conn.cursor() added = 0 skipped = 0 # Add entities with proper schema for entity in project_research['entities']: try: cursor.execute(""" SELECT id FROM entities WHERE name = ? """, (entity['name'],)) if cursor.fetchone(): skipped += 1 continue entity_id = str(uuid.uuid4()) now = time.time() cursor.execute(""" INSERT INTO entities (id, name, type, domain, content, metadata, created_at, updated_at, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entity_id, entity['name'], entity['type'], 'infrastructure', # domain entity['name'], # content json.dumps({'project_source': 'projects.db'}), # metadata now, now, 'projects_kg' # source )) added += 1 self.log(f" ✅ Added: {entity['name']} ({entity['type']})") except Exception as e: self.log(f" âš ī¸ Error adding {entity['name']}: {e}") # Add relations with proper schema for rel in project_research['relations']: try: # Get entity IDs cursor.execute("SELECT id FROM entities WHERE name = ?", (rel['source_name'],)) source_result = cursor.fetchone() cursor.execute("SELECT id FROM entities WHERE name = ?", (rel['target_name'],)) target_result = cursor.fetchone() if source_result and target_result: relation_id = str(uuid.uuid4()) now = time.time() cursor.execute(""" INSERT INTO relations (id, source_id, target_id, relation, context, weight, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( relation_id, source_result[0], target_result[0], rel['relation'], rel['context'], 1, now )) except Exception as e: self.log(f" âš ī¸ Error adding relation: {e}") conn.commit() conn.close() self.log(f"✨ Consolidation complete: {added} added, {skipped} skipped") return added except Exception as e: self.log(f"❌ Error merging into research KG: {e}") import traceback traceback.print_exc() return 0 def get_summary(self): """Get summary of research KG""" try: conn = sqlite3.connect(self.research_kg) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM entities") total_entities = cursor.fetchone()[0] cursor.execute("SELECT COUNT(DISTINCT type) FROM entities") entity_types = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM relations") total_relations = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM observations") total_observations = cursor.fetchone()[0] conn.close() return { 'total_entities': total_entities, 'entity_types': entity_types, 'total_relations': total_relations, 'total_observations': total_observations } except Exception as e: self.log(f"❌ Error getting summary: {e}") import traceback traceback.print_exc() return {} def run(self): """Execute consolidation""" self.log("🔄 Luzia Research Consolidator started") self.log(f"Source: {self.projects_kg}") self.log(f"Target: {self.research_kg}") # Get research from projects self.log("đŸ“Ĩ Extracting research from projects KG...") project_research = self.get_projects_research() # Merge into research KG self.log("📤 Merging into research KG...") added = self.merge_into_research_kg(project_research) # Get summary summary = self.get_summary() self.log(f"📊 Research KG summary:") self.log(f" Entities: {summary.get('total_entities', 0)}") self.log(f" Entity Types: {summary.get('entity_types', 0)}") self.log(f" Relations: {summary.get('total_relations', 0)}") self.log(f" Observations: {summary.get('total_observations', 0)}") return { 'status': 'completed', 'added': added, 'summary': summary } if __name__ == '__main__': consolidator = LuziaResearchConsolidator() result = consolidator.run() print(json.dumps(result, indent=2))