Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
566 lines
18 KiB
Python
Executable File
566 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Luzia Unified Flow - Agentic Operating System
|
|
|
|
Orchestrates all Luzia capabilities:
|
|
- Request approval (routine → auto-approve, complex → escalate)
|
|
- Research tasks (security/speed/complexity filtering → tool routing)
|
|
- Task dispatch (send work to projects)
|
|
- Knowledge consolidation (extract findings → research KG)
|
|
- Agent collaboration (multi-agent coordination)
|
|
|
|
Architecture:
|
|
1. Ingestion & Intent Mapping
|
|
2. Triage & Triangulation (Research Agent)
|
|
3. Governance Gate (Approval Orchestrator)
|
|
4. Strategic Execution (Task Dispatch & Collaboration)
|
|
5. Harvesting & Graphing (Knowledge Consolidation)
|
|
6. Closure & Insights
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import uuid
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, List
|
|
from enum import Enum
|
|
from dataclasses import dataclass, asdict
|
|
|
|
|
|
class FlowState(Enum):
|
|
"""Unified Luzia Flow - Finite State Machine"""
|
|
RECEIVED = "received" # Task captured; awaiting analysis
|
|
ANALYZING = "analyzing" # Research Agent running filters
|
|
AWAITING_APPROVAL = "awaiting_approval" # Blocked by Governance Gate
|
|
STRATEGIZING = "strategizing" # Multi-agent collaboration decomposing
|
|
EXECUTING = "executing" # Task Dispatch sending to projects
|
|
CONSOLIDATING = "consolidating" # Data extraction into KG
|
|
FINALIZING = "finalizing" # Synthesis of multi-project results
|
|
RESOLVED = "resolved" # Task complete; findings stored
|
|
FAILED = "failed" # Error state with diagnostics
|
|
|
|
|
|
class TaskSource(Enum):
|
|
"""Where did the task come from?"""
|
|
USER_SUBMISSION = "user" # Direct user input
|
|
PROJECT_REQUEST = "project" # From another project
|
|
AUTOMATION = "automation" # Automated trigger
|
|
APPROVAL_ESCALATION = "escalation" # From approval system
|
|
|
|
|
|
@dataclass
|
|
class TaskMetadata:
|
|
"""Metadata for a task moving through the flow"""
|
|
task_id: str
|
|
source: TaskSource
|
|
submitter: str # user/project name
|
|
submission_time: float
|
|
description: str
|
|
tags: List[str] # For categorization
|
|
|
|
# Analysis results
|
|
security_level: Optional[str] = None # critical, sensitive, internal, public
|
|
speed_requirement: Optional[str] = None # interactive, responsive, thorough, research
|
|
complexity_level: Optional[str] = None # trivial, straightforward, complex, exploratory
|
|
recommended_tool: Optional[str] = None # chat, debug, thinkdeep, codereview, consensus, planner
|
|
|
|
# Approval tracking
|
|
requires_approval: bool = False
|
|
approved_by: Optional[str] = None
|
|
approval_reason: Optional[str] = None
|
|
approval_time: Optional[float] = None
|
|
|
|
# Execution tracking
|
|
assigned_projects: List[str] = None # Which projects are handling sub-tasks
|
|
execution_results: Dict = None
|
|
|
|
# KG integration
|
|
findings_entity_id: Optional[str] = None # Reference in research KG
|
|
related_entities: List[str] = None # Links to related research
|
|
|
|
|
|
class FlowEvent:
|
|
"""State transition event"""
|
|
def __init__(self, task_id: str, from_state: FlowState, to_state: FlowState, metadata: Dict = None):
|
|
self.task_id = task_id
|
|
self.from_state = from_state
|
|
self.to_state = to_state
|
|
self.timestamp = datetime.now().isoformat()
|
|
self.metadata = metadata or {}
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'task_id': self.task_id,
|
|
'from_state': self.from_state.value if self.from_state else None,
|
|
'to_state': self.to_state.value,
|
|
'timestamp': self.timestamp,
|
|
'metadata': self.metadata,
|
|
}
|
|
|
|
|
|
class LuziaUnifiedFlow:
|
|
"""Main orchestrator for unified Luzia flow"""
|
|
|
|
def __init__(self):
|
|
self.flow_db = Path("/opt/server-agents/state/luzia-flow.db")
|
|
self.log_file = Path("/opt/server-agents/logs/luzia-flow.log")
|
|
self.log_file.parent.mkdir(parents=True, exist_ok=True)
|
|
self.research_kg = Path("/etc/luz-knowledge/research.db")
|
|
|
|
# Initialize database
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize flow state database"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Tasks table - tracks all tasks through the flow
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS tasks (
|
|
task_id TEXT PRIMARY KEY,
|
|
state TEXT NOT NULL,
|
|
source TEXT NOT NULL,
|
|
submitter TEXT NOT NULL,
|
|
description TEXT,
|
|
tags TEXT,
|
|
security_level TEXT,
|
|
speed_requirement TEXT,
|
|
complexity_level TEXT,
|
|
recommended_tool TEXT,
|
|
requires_approval INTEGER,
|
|
approved_by TEXT,
|
|
assigned_projects TEXT,
|
|
findings_entity_id TEXT,
|
|
created_at REAL,
|
|
updated_at REAL,
|
|
metadata TEXT
|
|
)
|
|
""")
|
|
|
|
# Events table - audit trail of state transitions
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
from_state TEXT,
|
|
to_state TEXT,
|
|
timestamp TEXT,
|
|
metadata TEXT,
|
|
FOREIGN KEY(task_id) REFERENCES tasks(task_id)
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
self.log(f"❌ Error initializing DB: {e}")
|
|
|
|
def log(self, message):
|
|
"""Log flow event"""
|
|
timestamp = datetime.now().isoformat()
|
|
log_entry = f"[{timestamp}] {message}\n"
|
|
with open(self.log_file, 'a') as f:
|
|
f.write(log_entry)
|
|
print(message)
|
|
|
|
def receive_task(self, description: str, source: TaskSource, submitter: str, tags: List[str] = None) -> str:
|
|
"""
|
|
Phase 1: Ingestion & Intent Mapping
|
|
User submits task, system captures it.
|
|
"""
|
|
task_id = f"task_{source.value}_{int(time.time() * 1000)}"
|
|
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
now = time.time()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO tasks
|
|
(task_id, state, source, submitter, description, tags, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
task_id,
|
|
FlowState.RECEIVED.value,
|
|
source.value,
|
|
submitter,
|
|
description,
|
|
json.dumps(tags or []),
|
|
now,
|
|
now
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"📥 RECEIVED task {task_id}: {description[:50]}...")
|
|
self._emit_event(task_id, None, FlowState.RECEIVED)
|
|
|
|
return task_id
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error receiving task: {e}")
|
|
return None
|
|
|
|
def analyze_task(self, task_id: str, analysis: Dict) -> bool:
|
|
"""
|
|
Phase 2: Triage & Triangulation
|
|
Research Agent analyzes security/speed/complexity.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Extract analysis results
|
|
security = analysis.get('security')
|
|
speed = analysis.get('speed')
|
|
complexity = analysis.get('complexity')
|
|
tool = analysis.get('recommended_tool')
|
|
requires_approval = analysis.get('requires_approval', False)
|
|
|
|
# Determine next state
|
|
next_state = FlowState.AWAITING_APPROVAL if requires_approval else FlowState.STRATEGIZING
|
|
|
|
cursor.execute("""
|
|
UPDATE tasks SET
|
|
state = ?,
|
|
security_level = ?,
|
|
speed_requirement = ?,
|
|
complexity_level = ?,
|
|
recommended_tool = ?,
|
|
requires_approval = ?,
|
|
updated_at = ?
|
|
WHERE task_id = ?
|
|
""", (
|
|
next_state.value,
|
|
security,
|
|
speed,
|
|
complexity,
|
|
tool,
|
|
1 if requires_approval else 0,
|
|
time.time(),
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"🔍 ANALYZING task {task_id}")
|
|
self.log(f" Security: {security} | Speed: {speed} | Complexity: {complexity}")
|
|
self.log(f" Recommended tool: {tool}")
|
|
self.log(f" Requires approval: {requires_approval}")
|
|
|
|
self._emit_event(task_id, FlowState.RECEIVED, FlowState.ANALYZING, analysis)
|
|
self._emit_event(task_id, FlowState.ANALYZING, next_state)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error analyzing task: {e}")
|
|
return False
|
|
|
|
def approve_task(self, task_id: str, approved_by: str, reason: str = "") -> bool:
|
|
"""
|
|
Phase 3: Governance Gate
|
|
Approval orchestrator approves/escalates tasks.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE tasks SET
|
|
state = ?,
|
|
approved_by = ?,
|
|
updated_at = ?
|
|
WHERE task_id = ?
|
|
""", (
|
|
FlowState.STRATEGIZING.value,
|
|
approved_by,
|
|
time.time(),
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"✅ APPROVED task {task_id} by {approved_by}")
|
|
self._emit_event(task_id, FlowState.AWAITING_APPROVAL, FlowState.STRATEGIZING,
|
|
{'approved_by': approved_by, 'reason': reason})
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error approving task: {e}")
|
|
return False
|
|
|
|
def assign_projects(self, task_id: str, projects: List[str]) -> bool:
|
|
"""
|
|
Phase 4a: Strategic Execution
|
|
Task dispatcher assigns work to projects.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE tasks SET
|
|
state = ?,
|
|
assigned_projects = ?,
|
|
updated_at = ?
|
|
WHERE task_id = ?
|
|
""", (
|
|
FlowState.EXECUTING.value,
|
|
json.dumps(projects),
|
|
time.time(),
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"🚀 EXECUTING task {task_id}")
|
|
self.log(f" Assigned to: {', '.join(projects)}")
|
|
|
|
self._emit_event(task_id, FlowState.STRATEGIZING, FlowState.EXECUTING,
|
|
{'assigned_projects': projects})
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error assigning projects: {e}")
|
|
return False
|
|
|
|
def consolidate_results(self, task_id: str, results: Dict) -> bool:
|
|
"""
|
|
Phase 5: Harvesting & Graphing
|
|
Knowledge consolidation extracts findings into research KG.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Create finding entity in research KG
|
|
findings_id = self._store_finding_in_kg(task_id, results)
|
|
|
|
cursor.execute("""
|
|
UPDATE tasks SET
|
|
state = ?,
|
|
execution_results = ?,
|
|
findings_entity_id = ?,
|
|
updated_at = ?
|
|
WHERE task_id = ?
|
|
""", (
|
|
FlowState.CONSOLIDATING.value,
|
|
json.dumps(results),
|
|
findings_id,
|
|
time.time(),
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"📊 CONSOLIDATING task {task_id}")
|
|
self.log(f" Findings stored in KG: {findings_id}")
|
|
|
|
self._emit_event(task_id, FlowState.EXECUTING, FlowState.CONSOLIDATING,
|
|
{'findings_id': findings_id})
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error consolidating results: {e}")
|
|
return False
|
|
|
|
def resolve_task(self, task_id: str) -> bool:
|
|
"""
|
|
Phase 6: Closure & Insights
|
|
Task complete, user receives findings plus related research.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Get task details
|
|
cursor.execute("SELECT findings_entity_id FROM tasks WHERE task_id = ?", (task_id,))
|
|
result = cursor.fetchone()
|
|
|
|
cursor.execute("""
|
|
UPDATE tasks SET
|
|
state = ?,
|
|
updated_at = ?
|
|
WHERE task_id = ?
|
|
""", (
|
|
FlowState.RESOLVED.value,
|
|
time.time(),
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"✨ RESOLVED task {task_id}")
|
|
self._emit_event(task_id, FlowState.CONSOLIDATING, FlowState.RESOLVED)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error resolving task: {e}")
|
|
return False
|
|
|
|
def fail_task(self, task_id: str, error: str) -> bool:
|
|
"""
|
|
Error state with diagnostic metadata.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE tasks SET
|
|
state = ?,
|
|
metadata = ?,
|
|
updated_at = ?
|
|
WHERE task_id = ?
|
|
""", (
|
|
FlowState.FAILED.value,
|
|
json.dumps({'error': error}),
|
|
time.time(),
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.log(f"❌ FAILED task {task_id}: {error}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error failing task: {e}")
|
|
return False
|
|
|
|
def get_task_status(self, task_id: str) -> Dict:
|
|
"""Get current task status"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,))
|
|
row = cursor.fetchone()
|
|
|
|
conn.close()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
return dict(row)
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error getting task status: {e}")
|
|
return None
|
|
|
|
def _store_finding_in_kg(self, task_id: str, results: Dict) -> str:
|
|
"""Store findings in research KG"""
|
|
try:
|
|
conn = sqlite3.connect(self.research_kg)
|
|
cursor = conn.cursor()
|
|
|
|
finding_id = str(uuid.uuid4())
|
|
now = time.time()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO entities
|
|
(id, name, type, domain, content, metadata, created_at, updated_at, source)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
finding_id,
|
|
f"Task Results: {task_id[:20]}",
|
|
"task_result",
|
|
"flow",
|
|
json.dumps(results),
|
|
json.dumps({'task_id': task_id}),
|
|
now,
|
|
now,
|
|
'luzia_flow'
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return finding_id
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error storing finding: {e}")
|
|
return None
|
|
|
|
def _emit_event(self, task_id: str, from_state: Optional[FlowState], to_state: FlowState, metadata: Dict = None):
|
|
"""Emit state transition event"""
|
|
try:
|
|
event = FlowEvent(task_id, from_state, to_state, metadata)
|
|
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO events (task_id, from_state, to_state, timestamp, metadata)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
event.task_id,
|
|
event.from_state.value if event.from_state else None,
|
|
event.to_state.value,
|
|
event.timestamp,
|
|
json.dumps(event.metadata)
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error emitting event: {e}")
|
|
|
|
def get_flow_status(self) -> Dict:
|
|
"""Get overall flow statistics"""
|
|
try:
|
|
conn = sqlite3.connect(self.flow_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Count by state
|
|
cursor.execute("""
|
|
SELECT state, COUNT(*) as count FROM tasks GROUP BY state
|
|
""")
|
|
|
|
state_counts = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
# Total tasks
|
|
cursor.execute("SELECT COUNT(*) FROM tasks")
|
|
total = cursor.fetchone()[0]
|
|
|
|
# Events count
|
|
cursor.execute("SELECT COUNT(*) FROM events")
|
|
total_events = cursor.fetchone()[0]
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
'total_tasks': total,
|
|
'state_distribution': state_counts,
|
|
'total_events': total_events,
|
|
}
|
|
|
|
except Exception as e:
|
|
self.log(f"❌ Error getting flow status: {e}")
|
|
return {}
|
|
|
|
|
|
if __name__ == '__main__':
|
|
flow = LuziaUnifiedFlow()
|
|
print(json.dumps(flow.get_flow_status(), indent=2))
|