Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
252 lines
8.8 KiB
Python
Executable File
252 lines
8.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Luzia Research Consolidator
|
||
Extracts research findings from projects KG and consolidates into research KG
|
||
"""
|
||
|
||
import sqlite3
|
||
import json
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
class LuziaResearchConsolidator:
|
||
"""Consolidate project research into research KG"""
|
||
|
||
def __init__(self):
|
||
self.projects_kg = Path("/etc/zen-swarm/memory/projects.db")
|
||
self.research_kg = Path("/etc/luz-knowledge/research.db")
|
||
self.log_file = Path("/opt/server-agents/logs/research-consolidation.log")
|
||
self.log_file.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
def log(self, message):
|
||
"""Log consolidation action"""
|
||
timestamp = datetime.now().isoformat()
|
||
log_entry = f"[{timestamp}] {message}\n"
|
||
with open(self.log_file, 'a') as f:
|
||
f.write(log_entry)
|
||
print(message)
|
||
|
||
def get_projects_research(self):
|
||
"""Extract all research entities from projects KG"""
|
||
if not self.projects_kg.exists():
|
||
self.log("⚠️ Projects KG not found")
|
||
return {'entities': [], 'relations': []}
|
||
|
||
try:
|
||
conn = sqlite3.connect(self.projects_kg)
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# Get all entities from projects KG (no type filtering available)
|
||
cursor.execute("SELECT id, name, type FROM entities ORDER BY name")
|
||
|
||
research = []
|
||
entity_map = {} # Map name to id for relations
|
||
|
||
for row in cursor.fetchall():
|
||
entity = {
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'type': row['type'] or 'finding'
|
||
}
|
||
research.append(entity)
|
||
entity_map[row['name']] = row['id']
|
||
|
||
self.log(f"📍 Found {len(research)} entities in projects KG")
|
||
|
||
# Get relations
|
||
cursor.execute("""
|
||
SELECT r.source_id, e1.name as source_name,
|
||
r.target_id, e2.name as target_name,
|
||
r.relation, r.context
|
||
FROM relations r
|
||
LEFT JOIN entities e1 ON r.source_id = e1.id
|
||
LEFT JOIN entities e2 ON r.target_id = e2.id
|
||
""")
|
||
|
||
relations = []
|
||
for row in cursor.fetchall():
|
||
relations.append({
|
||
'source_id': row['source_id'],
|
||
'source_name': row['source_name'],
|
||
'target_id': row['target_id'],
|
||
'target_name': row['target_name'],
|
||
'relation': row['relation'],
|
||
'context': row['context']
|
||
})
|
||
|
||
self.log(f"📍 Found {len(relations)} relations in projects KG")
|
||
|
||
conn.close()
|
||
return {'entities': research, 'relations': relations, 'entity_map': entity_map}
|
||
|
||
except Exception as e:
|
||
self.log(f"❌ Error reading projects KG: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {'entities': [], 'relations': [], 'entity_map': {}}
|
||
|
||
def merge_into_research_kg(self, project_research):
|
||
"""Merge project research into research KG"""
|
||
if not project_research['entities']:
|
||
self.log("ℹ️ No research found in projects KG")
|
||
return 0
|
||
|
||
try:
|
||
import uuid
|
||
import time
|
||
|
||
conn = sqlite3.connect(self.research_kg)
|
||
cursor = conn.cursor()
|
||
|
||
added = 0
|
||
skipped = 0
|
||
|
||
# Add entities with proper schema
|
||
for entity in project_research['entities']:
|
||
try:
|
||
cursor.execute("""
|
||
SELECT id FROM entities WHERE name = ?
|
||
""", (entity['name'],))
|
||
|
||
if cursor.fetchone():
|
||
skipped += 1
|
||
continue
|
||
|
||
entity_id = str(uuid.uuid4())
|
||
now = time.time()
|
||
|
||
cursor.execute("""
|
||
INSERT INTO entities
|
||
(id, name, type, domain, content, metadata,
|
||
created_at, updated_at, source)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
entity_id,
|
||
entity['name'],
|
||
entity['type'],
|
||
'infrastructure', # domain
|
||
entity['name'], # content
|
||
json.dumps({'project_source': 'projects.db'}), # metadata
|
||
now,
|
||
now,
|
||
'projects_kg' # source
|
||
))
|
||
|
||
added += 1
|
||
self.log(f" ✅ Added: {entity['name']} ({entity['type']})")
|
||
|
||
except Exception as e:
|
||
self.log(f" ⚠️ Error adding {entity['name']}: {e}")
|
||
|
||
# Add relations with proper schema
|
||
for rel in project_research['relations']:
|
||
try:
|
||
# Get entity IDs
|
||
cursor.execute("SELECT id FROM entities WHERE name = ?",
|
||
(rel['source_name'],))
|
||
source_result = cursor.fetchone()
|
||
|
||
cursor.execute("SELECT id FROM entities WHERE name = ?",
|
||
(rel['target_name'],))
|
||
target_result = cursor.fetchone()
|
||
|
||
if source_result and target_result:
|
||
relation_id = str(uuid.uuid4())
|
||
now = time.time()
|
||
|
||
cursor.execute("""
|
||
INSERT INTO relations
|
||
(id, source_id, target_id, relation, context,
|
||
weight, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
relation_id,
|
||
source_result[0],
|
||
target_result[0],
|
||
rel['relation'],
|
||
rel['context'],
|
||
1,
|
||
now
|
||
))
|
||
except Exception as e:
|
||
self.log(f" ⚠️ Error adding relation: {e}")
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
self.log(f"✨ Consolidation complete: {added} added, {skipped} skipped")
|
||
return added
|
||
|
||
except Exception as e:
|
||
self.log(f"❌ Error merging into research KG: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 0
|
||
|
||
def get_summary(self):
|
||
"""Get summary of research KG"""
|
||
try:
|
||
conn = sqlite3.connect(self.research_kg)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM entities")
|
||
total_entities = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(DISTINCT type) FROM entities")
|
||
entity_types = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM relations")
|
||
total_relations = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM observations")
|
||
total_observations = cursor.fetchone()[0]
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'total_entities': total_entities,
|
||
'entity_types': entity_types,
|
||
'total_relations': total_relations,
|
||
'total_observations': total_observations
|
||
}
|
||
|
||
except Exception as e:
|
||
self.log(f"❌ Error getting summary: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {}
|
||
|
||
def run(self):
|
||
"""Execute consolidation"""
|
||
self.log("🔄 Luzia Research Consolidator started")
|
||
self.log(f"Source: {self.projects_kg}")
|
||
self.log(f"Target: {self.research_kg}")
|
||
|
||
# Get research from projects
|
||
self.log("📥 Extracting research from projects KG...")
|
||
project_research = self.get_projects_research()
|
||
|
||
# Merge into research KG
|
||
self.log("📤 Merging into research KG...")
|
||
added = self.merge_into_research_kg(project_research)
|
||
|
||
# Get summary
|
||
summary = self.get_summary()
|
||
self.log(f"📊 Research KG summary:")
|
||
self.log(f" Entities: {summary.get('total_entities', 0)}")
|
||
self.log(f" Entity Types: {summary.get('entity_types', 0)}")
|
||
self.log(f" Relations: {summary.get('total_relations', 0)}")
|
||
self.log(f" Observations: {summary.get('total_observations', 0)}")
|
||
|
||
return {
|
||
'status': 'completed',
|
||
'added': added,
|
||
'summary': summary
|
||
}
|
||
|
||
if __name__ == '__main__':
|
||
consolidator = LuziaResearchConsolidator()
|
||
result = consolidator.run()
|
||
print(json.dumps(result, indent=2))
|