Files
luzia/lib/luzia_unified_flow.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

566 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Luzia Unified Flow - Agentic Operating System
Orchestrates all Luzia capabilities:
- Request approval (routine → auto-approve, complex → escalate)
- Research tasks (security/speed/complexity filtering → tool routing)
- Task dispatch (send work to projects)
- Knowledge consolidation (extract findings → research KG)
- Agent collaboration (multi-agent coordination)
Architecture:
1. Ingestion & Intent Mapping
2. Triage & Triangulation (Research Agent)
3. Governance Gate (Approval Orchestrator)
4. Strategic Execution (Task Dispatch & Collaboration)
5. Harvesting & Graphing (Knowledge Consolidation)
6. Closure & Insights
"""
import json
import sqlite3
import uuid
import time
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List
from enum import Enum
from dataclasses import dataclass, asdict
class FlowState(Enum):
"""Unified Luzia Flow - Finite State Machine"""
RECEIVED = "received" # Task captured; awaiting analysis
ANALYZING = "analyzing" # Research Agent running filters
AWAITING_APPROVAL = "awaiting_approval" # Blocked by Governance Gate
STRATEGIZING = "strategizing" # Multi-agent collaboration decomposing
EXECUTING = "executing" # Task Dispatch sending to projects
CONSOLIDATING = "consolidating" # Data extraction into KG
FINALIZING = "finalizing" # Synthesis of multi-project results
RESOLVED = "resolved" # Task complete; findings stored
FAILED = "failed" # Error state with diagnostics
class TaskSource(Enum):
"""Where did the task come from?"""
USER_SUBMISSION = "user" # Direct user input
PROJECT_REQUEST = "project" # From another project
AUTOMATION = "automation" # Automated trigger
APPROVAL_ESCALATION = "escalation" # From approval system
@dataclass
class TaskMetadata:
"""Metadata for a task moving through the flow"""
task_id: str
source: TaskSource
submitter: str # user/project name
submission_time: float
description: str
tags: List[str] # For categorization
# Analysis results
security_level: Optional[str] = None # critical, sensitive, internal, public
speed_requirement: Optional[str] = None # interactive, responsive, thorough, research
complexity_level: Optional[str] = None # trivial, straightforward, complex, exploratory
recommended_tool: Optional[str] = None # chat, debug, thinkdeep, codereview, consensus, planner
# Approval tracking
requires_approval: bool = False
approved_by: Optional[str] = None
approval_reason: Optional[str] = None
approval_time: Optional[float] = None
# Execution tracking
assigned_projects: List[str] = None # Which projects are handling sub-tasks
execution_results: Dict = None
# KG integration
findings_entity_id: Optional[str] = None # Reference in research KG
related_entities: List[str] = None # Links to related research
class FlowEvent:
"""State transition event"""
def __init__(self, task_id: str, from_state: FlowState, to_state: FlowState, metadata: Dict = None):
self.task_id = task_id
self.from_state = from_state
self.to_state = to_state
self.timestamp = datetime.now().isoformat()
self.metadata = metadata or {}
def to_dict(self):
return {
'task_id': self.task_id,
'from_state': self.from_state.value if self.from_state else None,
'to_state': self.to_state.value,
'timestamp': self.timestamp,
'metadata': self.metadata,
}
class LuziaUnifiedFlow:
"""Main orchestrator for unified Luzia flow"""
def __init__(self):
self.flow_db = Path("/opt/server-agents/state/luzia-flow.db")
self.log_file = Path("/opt/server-agents/logs/luzia-flow.log")
self.log_file.parent.mkdir(parents=True, exist_ok=True)
self.research_kg = Path("/etc/luz-knowledge/research.db")
# Initialize database
self._init_db()
def _init_db(self):
"""Initialize flow state database"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
# Tasks table - tracks all tasks through the flow
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
state TEXT NOT NULL,
source TEXT NOT NULL,
submitter TEXT NOT NULL,
description TEXT,
tags TEXT,
security_level TEXT,
speed_requirement TEXT,
complexity_level TEXT,
recommended_tool TEXT,
requires_approval INTEGER,
approved_by TEXT,
assigned_projects TEXT,
findings_entity_id TEXT,
created_at REAL,
updated_at REAL,
metadata TEXT
)
""")
# Events table - audit trail of state transitions
cursor.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
from_state TEXT,
to_state TEXT,
timestamp TEXT,
metadata TEXT,
FOREIGN KEY(task_id) REFERENCES tasks(task_id)
)
""")
conn.commit()
conn.close()
except Exception as e:
self.log(f"❌ Error initializing DB: {e}")
def log(self, message):
"""Log flow event"""
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] {message}\n"
with open(self.log_file, 'a') as f:
f.write(log_entry)
print(message)
def receive_task(self, description: str, source: TaskSource, submitter: str, tags: List[str] = None) -> str:
"""
Phase 1: Ingestion & Intent Mapping
User submits task, system captures it.
"""
task_id = f"task_{source.value}_{int(time.time() * 1000)}"
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
now = time.time()
cursor.execute("""
INSERT INTO tasks
(task_id, state, source, submitter, description, tags, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
task_id,
FlowState.RECEIVED.value,
source.value,
submitter,
description,
json.dumps(tags or []),
now,
now
))
conn.commit()
conn.close()
self.log(f"📥 RECEIVED task {task_id}: {description[:50]}...")
self._emit_event(task_id, None, FlowState.RECEIVED)
return task_id
except Exception as e:
self.log(f"❌ Error receiving task: {e}")
return None
def analyze_task(self, task_id: str, analysis: Dict) -> bool:
"""
Phase 2: Triage & Triangulation
Research Agent analyzes security/speed/complexity.
"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
# Extract analysis results
security = analysis.get('security')
speed = analysis.get('speed')
complexity = analysis.get('complexity')
tool = analysis.get('recommended_tool')
requires_approval = analysis.get('requires_approval', False)
# Determine next state
next_state = FlowState.AWAITING_APPROVAL if requires_approval else FlowState.STRATEGIZING
cursor.execute("""
UPDATE tasks SET
state = ?,
security_level = ?,
speed_requirement = ?,
complexity_level = ?,
recommended_tool = ?,
requires_approval = ?,
updated_at = ?
WHERE task_id = ?
""", (
next_state.value,
security,
speed,
complexity,
tool,
1 if requires_approval else 0,
time.time(),
task_id
))
conn.commit()
conn.close()
self.log(f"🔍 ANALYZING task {task_id}")
self.log(f" Security: {security} | Speed: {speed} | Complexity: {complexity}")
self.log(f" Recommended tool: {tool}")
self.log(f" Requires approval: {requires_approval}")
self._emit_event(task_id, FlowState.RECEIVED, FlowState.ANALYZING, analysis)
self._emit_event(task_id, FlowState.ANALYZING, next_state)
return True
except Exception as e:
self.log(f"❌ Error analyzing task: {e}")
return False
def approve_task(self, task_id: str, approved_by: str, reason: str = "") -> bool:
"""
Phase 3: Governance Gate
Approval orchestrator approves/escalates tasks.
"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
cursor.execute("""
UPDATE tasks SET
state = ?,
approved_by = ?,
updated_at = ?
WHERE task_id = ?
""", (
FlowState.STRATEGIZING.value,
approved_by,
time.time(),
task_id
))
conn.commit()
conn.close()
self.log(f"✅ APPROVED task {task_id} by {approved_by}")
self._emit_event(task_id, FlowState.AWAITING_APPROVAL, FlowState.STRATEGIZING,
{'approved_by': approved_by, 'reason': reason})
return True
except Exception as e:
self.log(f"❌ Error approving task: {e}")
return False
def assign_projects(self, task_id: str, projects: List[str]) -> bool:
"""
Phase 4a: Strategic Execution
Task dispatcher assigns work to projects.
"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
cursor.execute("""
UPDATE tasks SET
state = ?,
assigned_projects = ?,
updated_at = ?
WHERE task_id = ?
""", (
FlowState.EXECUTING.value,
json.dumps(projects),
time.time(),
task_id
))
conn.commit()
conn.close()
self.log(f"🚀 EXECUTING task {task_id}")
self.log(f" Assigned to: {', '.join(projects)}")
self._emit_event(task_id, FlowState.STRATEGIZING, FlowState.EXECUTING,
{'assigned_projects': projects})
return True
except Exception as e:
self.log(f"❌ Error assigning projects: {e}")
return False
def consolidate_results(self, task_id: str, results: Dict) -> bool:
"""
Phase 5: Harvesting & Graphing
Knowledge consolidation extracts findings into research KG.
"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
# Create finding entity in research KG
findings_id = self._store_finding_in_kg(task_id, results)
cursor.execute("""
UPDATE tasks SET
state = ?,
execution_results = ?,
findings_entity_id = ?,
updated_at = ?
WHERE task_id = ?
""", (
FlowState.CONSOLIDATING.value,
json.dumps(results),
findings_id,
time.time(),
task_id
))
conn.commit()
conn.close()
self.log(f"📊 CONSOLIDATING task {task_id}")
self.log(f" Findings stored in KG: {findings_id}")
self._emit_event(task_id, FlowState.EXECUTING, FlowState.CONSOLIDATING,
{'findings_id': findings_id})
return True
except Exception as e:
self.log(f"❌ Error consolidating results: {e}")
return False
def resolve_task(self, task_id: str) -> bool:
"""
Phase 6: Closure & Insights
Task complete, user receives findings plus related research.
"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
# Get task details
cursor.execute("SELECT findings_entity_id FROM tasks WHERE task_id = ?", (task_id,))
result = cursor.fetchone()
cursor.execute("""
UPDATE tasks SET
state = ?,
updated_at = ?
WHERE task_id = ?
""", (
FlowState.RESOLVED.value,
time.time(),
task_id
))
conn.commit()
conn.close()
self.log(f"✨ RESOLVED task {task_id}")
self._emit_event(task_id, FlowState.CONSOLIDATING, FlowState.RESOLVED)
return True
except Exception as e:
self.log(f"❌ Error resolving task: {e}")
return False
def fail_task(self, task_id: str, error: str) -> bool:
"""
Error state with diagnostic metadata.
"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
cursor.execute("""
UPDATE tasks SET
state = ?,
metadata = ?,
updated_at = ?
WHERE task_id = ?
""", (
FlowState.FAILED.value,
json.dumps({'error': error}),
time.time(),
task_id
))
conn.commit()
conn.close()
self.log(f"❌ FAILED task {task_id}: {error}")
return True
except Exception as e:
self.log(f"❌ Error failing task: {e}")
return False
def get_task_status(self, task_id: str) -> Dict:
"""Get current task status"""
try:
conn = sqlite3.connect(self.flow_db)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
return dict(row)
except Exception as e:
self.log(f"❌ Error getting task status: {e}")
return None
def _store_finding_in_kg(self, task_id: str, results: Dict) -> str:
"""Store findings in research KG"""
try:
conn = sqlite3.connect(self.research_kg)
cursor = conn.cursor()
finding_id = str(uuid.uuid4())
now = time.time()
cursor.execute("""
INSERT INTO entities
(id, name, type, domain, content, metadata, created_at, updated_at, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
finding_id,
f"Task Results: {task_id[:20]}",
"task_result",
"flow",
json.dumps(results),
json.dumps({'task_id': task_id}),
now,
now,
'luzia_flow'
))
conn.commit()
conn.close()
return finding_id
except Exception as e:
self.log(f"❌ Error storing finding: {e}")
return None
def _emit_event(self, task_id: str, from_state: Optional[FlowState], to_state: FlowState, metadata: Dict = None):
"""Emit state transition event"""
try:
event = FlowEvent(task_id, from_state, to_state, metadata)
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO events (task_id, from_state, to_state, timestamp, metadata)
VALUES (?, ?, ?, ?, ?)
""", (
event.task_id,
event.from_state.value if event.from_state else None,
event.to_state.value,
event.timestamp,
json.dumps(event.metadata)
))
conn.commit()
conn.close()
except Exception as e:
self.log(f"❌ Error emitting event: {e}")
def get_flow_status(self) -> Dict:
"""Get overall flow statistics"""
try:
conn = sqlite3.connect(self.flow_db)
cursor = conn.cursor()
# Count by state
cursor.execute("""
SELECT state, COUNT(*) as count FROM tasks GROUP BY state
""")
state_counts = {row[0]: row[1] for row in cursor.fetchall()}
# Total tasks
cursor.execute("SELECT COUNT(*) FROM tasks")
total = cursor.fetchone()[0]
# Events count
cursor.execute("SELECT COUNT(*) FROM events")
total_events = cursor.fetchone()[0]
conn.close()
return {
'total_tasks': total,
'state_distribution': state_counts,
'total_events': total_events,
}
except Exception as e:
self.log(f"❌ Error getting flow status: {e}")
return {}
if __name__ == '__main__':
flow = LuziaUnifiedFlow()
print(json.dumps(flow.get_flow_status(), indent=2))