#!/usr/bin/env python3 """ Luzia Unified Flow - Agentic Operating System Orchestrates all Luzia capabilities: - Request approval (routine → auto-approve, complex → escalate) - Research tasks (security/speed/complexity filtering → tool routing) - Task dispatch (send work to projects) - Knowledge consolidation (extract findings → research KG) - Agent collaboration (multi-agent coordination) Architecture: 1. Ingestion & Intent Mapping 2. Triage & Triangulation (Research Agent) 3. Governance Gate (Approval Orchestrator) 4. Strategic Execution (Task Dispatch & Collaboration) 5. Harvesting & Graphing (Knowledge Consolidation) 6. Closure & Insights """ import json import sqlite3 import uuid import time from pathlib import Path from datetime import datetime from typing import Optional, Dict, List from enum import Enum from dataclasses import dataclass, asdict class FlowState(Enum): """Unified Luzia Flow - Finite State Machine""" RECEIVED = "received" # Task captured; awaiting analysis ANALYZING = "analyzing" # Research Agent running filters AWAITING_APPROVAL = "awaiting_approval" # Blocked by Governance Gate STRATEGIZING = "strategizing" # Multi-agent collaboration decomposing EXECUTING = "executing" # Task Dispatch sending to projects CONSOLIDATING = "consolidating" # Data extraction into KG FINALIZING = "finalizing" # Synthesis of multi-project results RESOLVED = "resolved" # Task complete; findings stored FAILED = "failed" # Error state with diagnostics class TaskSource(Enum): """Where did the task come from?""" USER_SUBMISSION = "user" # Direct user input PROJECT_REQUEST = "project" # From another project AUTOMATION = "automation" # Automated trigger APPROVAL_ESCALATION = "escalation" # From approval system @dataclass class TaskMetadata: """Metadata for a task moving through the flow""" task_id: str source: TaskSource submitter: str # user/project name submission_time: float description: str tags: List[str] # For categorization # Analysis results security_level: Optional[str] = None # critical, sensitive, internal, public speed_requirement: Optional[str] = None # interactive, responsive, thorough, research complexity_level: Optional[str] = None # trivial, straightforward, complex, exploratory recommended_tool: Optional[str] = None # chat, debug, thinkdeep, codereview, consensus, planner # Approval tracking requires_approval: bool = False approved_by: Optional[str] = None approval_reason: Optional[str] = None approval_time: Optional[float] = None # Execution tracking assigned_projects: List[str] = None # Which projects are handling sub-tasks execution_results: Dict = None # KG integration findings_entity_id: Optional[str] = None # Reference in research KG related_entities: List[str] = None # Links to related research class FlowEvent: """State transition event""" def __init__(self, task_id: str, from_state: FlowState, to_state: FlowState, metadata: Dict = None): self.task_id = task_id self.from_state = from_state self.to_state = to_state self.timestamp = datetime.now().isoformat() self.metadata = metadata or {} def to_dict(self): return { 'task_id': self.task_id, 'from_state': self.from_state.value if self.from_state else None, 'to_state': self.to_state.value, 'timestamp': self.timestamp, 'metadata': self.metadata, } class LuziaUnifiedFlow: """Main orchestrator for unified Luzia flow""" def __init__(self): self.flow_db = Path("/opt/server-agents/state/luzia-flow.db") self.log_file = Path("/opt/server-agents/logs/luzia-flow.log") self.log_file.parent.mkdir(parents=True, exist_ok=True) self.research_kg = Path("/etc/luz-knowledge/research.db") # Initialize database self._init_db() def _init_db(self): """Initialize flow state database""" try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() # Tasks table - tracks all tasks through the flow cursor.execute(""" CREATE TABLE IF NOT EXISTS tasks ( task_id TEXT PRIMARY KEY, state TEXT NOT NULL, source TEXT NOT NULL, submitter TEXT NOT NULL, description TEXT, tags TEXT, security_level TEXT, speed_requirement TEXT, complexity_level TEXT, recommended_tool TEXT, requires_approval INTEGER, approved_by TEXT, assigned_projects TEXT, findings_entity_id TEXT, created_at REAL, updated_at REAL, metadata TEXT ) """) # Events table - audit trail of state transitions cursor.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, from_state TEXT, to_state TEXT, timestamp TEXT, metadata TEXT, FOREIGN KEY(task_id) REFERENCES tasks(task_id) ) """) conn.commit() conn.close() except Exception as e: self.log(f"❌ Error initializing DB: {e}") def log(self, message): """Log flow event""" timestamp = datetime.now().isoformat() log_entry = f"[{timestamp}] {message}\n" with open(self.log_file, 'a') as f: f.write(log_entry) print(message) def receive_task(self, description: str, source: TaskSource, submitter: str, tags: List[str] = None) -> str: """ Phase 1: Ingestion & Intent Mapping User submits task, system captures it. """ task_id = f"task_{source.value}_{int(time.time() * 1000)}" try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() now = time.time() cursor.execute(""" INSERT INTO tasks (task_id, state, source, submitter, description, tags, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, FlowState.RECEIVED.value, source.value, submitter, description, json.dumps(tags or []), now, now )) conn.commit() conn.close() self.log(f"📥 RECEIVED task {task_id}: {description[:50]}...") self._emit_event(task_id, None, FlowState.RECEIVED) return task_id except Exception as e: self.log(f"❌ Error receiving task: {e}") return None def analyze_task(self, task_id: str, analysis: Dict) -> bool: """ Phase 2: Triage & Triangulation Research Agent analyzes security/speed/complexity. """ try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() # Extract analysis results security = analysis.get('security') speed = analysis.get('speed') complexity = analysis.get('complexity') tool = analysis.get('recommended_tool') requires_approval = analysis.get('requires_approval', False) # Determine next state next_state = FlowState.AWAITING_APPROVAL if requires_approval else FlowState.STRATEGIZING cursor.execute(""" UPDATE tasks SET state = ?, security_level = ?, speed_requirement = ?, complexity_level = ?, recommended_tool = ?, requires_approval = ?, updated_at = ? WHERE task_id = ? """, ( next_state.value, security, speed, complexity, tool, 1 if requires_approval else 0, time.time(), task_id )) conn.commit() conn.close() self.log(f"🔍 ANALYZING task {task_id}") self.log(f" Security: {security} | Speed: {speed} | Complexity: {complexity}") self.log(f" Recommended tool: {tool}") self.log(f" Requires approval: {requires_approval}") self._emit_event(task_id, FlowState.RECEIVED, FlowState.ANALYZING, analysis) self._emit_event(task_id, FlowState.ANALYZING, next_state) return True except Exception as e: self.log(f"❌ Error analyzing task: {e}") return False def approve_task(self, task_id: str, approved_by: str, reason: str = "") -> bool: """ Phase 3: Governance Gate Approval orchestrator approves/escalates tasks. """ try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() cursor.execute(""" UPDATE tasks SET state = ?, approved_by = ?, updated_at = ? WHERE task_id = ? """, ( FlowState.STRATEGIZING.value, approved_by, time.time(), task_id )) conn.commit() conn.close() self.log(f"✅ APPROVED task {task_id} by {approved_by}") self._emit_event(task_id, FlowState.AWAITING_APPROVAL, FlowState.STRATEGIZING, {'approved_by': approved_by, 'reason': reason}) return True except Exception as e: self.log(f"❌ Error approving task: {e}") return False def assign_projects(self, task_id: str, projects: List[str]) -> bool: """ Phase 4a: Strategic Execution Task dispatcher assigns work to projects. """ try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() cursor.execute(""" UPDATE tasks SET state = ?, assigned_projects = ?, updated_at = ? WHERE task_id = ? """, ( FlowState.EXECUTING.value, json.dumps(projects), time.time(), task_id )) conn.commit() conn.close() self.log(f"🚀 EXECUTING task {task_id}") self.log(f" Assigned to: {', '.join(projects)}") self._emit_event(task_id, FlowState.STRATEGIZING, FlowState.EXECUTING, {'assigned_projects': projects}) return True except Exception as e: self.log(f"❌ Error assigning projects: {e}") return False def consolidate_results(self, task_id: str, results: Dict) -> bool: """ Phase 5: Harvesting & Graphing Knowledge consolidation extracts findings into research KG. """ try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() # Create finding entity in research KG findings_id = self._store_finding_in_kg(task_id, results) cursor.execute(""" UPDATE tasks SET state = ?, execution_results = ?, findings_entity_id = ?, updated_at = ? WHERE task_id = ? """, ( FlowState.CONSOLIDATING.value, json.dumps(results), findings_id, time.time(), task_id )) conn.commit() conn.close() self.log(f"📊 CONSOLIDATING task {task_id}") self.log(f" Findings stored in KG: {findings_id}") self._emit_event(task_id, FlowState.EXECUTING, FlowState.CONSOLIDATING, {'findings_id': findings_id}) return True except Exception as e: self.log(f"❌ Error consolidating results: {e}") return False def resolve_task(self, task_id: str) -> bool: """ Phase 6: Closure & Insights Task complete, user receives findings plus related research. """ try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() # Get task details cursor.execute("SELECT findings_entity_id FROM tasks WHERE task_id = ?", (task_id,)) result = cursor.fetchone() cursor.execute(""" UPDATE tasks SET state = ?, updated_at = ? WHERE task_id = ? """, ( FlowState.RESOLVED.value, time.time(), task_id )) conn.commit() conn.close() self.log(f"✨ RESOLVED task {task_id}") self._emit_event(task_id, FlowState.CONSOLIDATING, FlowState.RESOLVED) return True except Exception as e: self.log(f"❌ Error resolving task: {e}") return False def fail_task(self, task_id: str, error: str) -> bool: """ Error state with diagnostic metadata. """ try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() cursor.execute(""" UPDATE tasks SET state = ?, metadata = ?, updated_at = ? WHERE task_id = ? """, ( FlowState.FAILED.value, json.dumps({'error': error}), time.time(), task_id )) conn.commit() conn.close() self.log(f"❌ FAILED task {task_id}: {error}") return True except Exception as e: self.log(f"❌ Error failing task: {e}") return False def get_task_status(self, task_id: str) -> Dict: """Get current task status""" try: conn = sqlite3.connect(self.flow_db) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) row = cursor.fetchone() conn.close() if not row: return None return dict(row) except Exception as e: self.log(f"❌ Error getting task status: {e}") return None def _store_finding_in_kg(self, task_id: str, results: Dict) -> str: """Store findings in research KG""" try: conn = sqlite3.connect(self.research_kg) cursor = conn.cursor() finding_id = str(uuid.uuid4()) now = time.time() cursor.execute(""" INSERT INTO entities (id, name, type, domain, content, metadata, created_at, updated_at, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( finding_id, f"Task Results: {task_id[:20]}", "task_result", "flow", json.dumps(results), json.dumps({'task_id': task_id}), now, now, 'luzia_flow' )) conn.commit() conn.close() return finding_id except Exception as e: self.log(f"❌ Error storing finding: {e}") return None def _emit_event(self, task_id: str, from_state: Optional[FlowState], to_state: FlowState, metadata: Dict = None): """Emit state transition event""" try: event = FlowEvent(task_id, from_state, to_state, metadata) conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() cursor.execute(""" INSERT INTO events (task_id, from_state, to_state, timestamp, metadata) VALUES (?, ?, ?, ?, ?) """, ( event.task_id, event.from_state.value if event.from_state else None, event.to_state.value, event.timestamp, json.dumps(event.metadata) )) conn.commit() conn.close() except Exception as e: self.log(f"❌ Error emitting event: {e}") def get_flow_status(self) -> Dict: """Get overall flow statistics""" try: conn = sqlite3.connect(self.flow_db) cursor = conn.cursor() # Count by state cursor.execute(""" SELECT state, COUNT(*) as count FROM tasks GROUP BY state """) state_counts = {row[0]: row[1] for row in cursor.fetchall()} # Total tasks cursor.execute("SELECT COUNT(*) FROM tasks") total = cursor.fetchone()[0] # Events count cursor.execute("SELECT COUNT(*) FROM events") total_events = cursor.fetchone()[0] conn.close() return { 'total_tasks': total, 'state_distribution': state_counts, 'total_events': total_events, } except Exception as e: self.log(f"❌ Error getting flow status: {e}") return {} if __name__ == '__main__': flow = LuziaUnifiedFlow() print(json.dumps(flow.get_flow_status(), indent=2))