#!/usr/bin/env python3 """ MCP Task Queue Integration - Single Source of Truth Registers all dispatched agents in MCP task queue for unified tracking. """ import json import sqlite3 import uuid from pathlib import Path from datetime import datetime MCP_DB = Path("/opt/server-agents/state/task_queue.db") def create_task_links_table(): """Create linking table between job and MCP task systems.""" if not MCP_DB.exists(): print(f"Warning: MCP DB not found at {MCP_DB}") return False try: conn = sqlite3.connect(str(MCP_DB)) cursor = conn.cursor() # Create task_links table if not exists cursor.execute(""" CREATE TABLE IF NOT EXISTS task_links ( id TEXT PRIMARY KEY, job_id TEXT NOT NULL UNIQUE, mcp_task_id TEXT, conductor_path TEXT, project TEXT NOT NULL, task_title TEXT, dispatch_time INTEGER, created_at INTEGER, FOREIGN KEY (mcp_task_id) REFERENCES tasks(id) ) """) # Create job_metadata_extended table cursor.execute(""" CREATE TABLE IF NOT EXISTS job_metadata_extended ( job_id TEXT PRIMARY KEY, claude_session_id TEXT, dispatch_timestamp TEXT, completion_timestamp TEXT, system_load_dispatch TEXT, system_load_completion TEXT, memory_percent_dispatch INTEGER, memory_percent_completion INTEGER, exit_code INTEGER, output_size_bytes INTEGER ) """) conn.commit() conn.close() return True except Exception as e: print(f"Error creating task_links table: {e}") return False def register_job_in_mcp_queue(job_id, project, task_title, claude_session_id=None): """Register a dispatched job in MCP task queue.""" if not MCP_DB.exists(): return None try: conn = sqlite3.connect(str(MCP_DB)) cursor = conn.cursor() # Create MCP task entry mcp_task_id = f"t_{uuid.uuid4().hex[:12]}" now = int(datetime.now().timestamp()) cursor.execute(""" INSERT INTO tasks (id, title, description, status, project, created_at, created_by) VALUES (?, ?, ?, ?, ?, ?, ?) """, (mcp_task_id, task_title, f"Job: {job_id}", 0, project, now, "luzia_orchestrator")) # Link job to MCP task link_id = f"link_{uuid.uuid4().hex[:8]}" cursor.execute(""" INSERT INTO task_links (id, job_id, mcp_task_id, project, task_title, dispatch_time, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (link_id, job_id, mcp_task_id, project, task_title, now, now)) # Record extended metadata if claude_session_id: cursor.execute(""" INSERT INTO job_metadata_extended (job_id, claude_session_id, dispatch_timestamp) VALUES (?, ?, ?) """, (job_id, claude_session_id, datetime.now().isoformat())) conn.commit() conn.close() return { 'mcp_task_id': mcp_task_id, 'link_id': link_id, 'job_id': job_id, } except Exception as e: print(f"Error registering job: {e}") return None def update_job_completion(job_id, exit_code, output_size=0, claude_session_id=None): """Update job completion status in both systems.""" if not MCP_DB.exists(): return False try: conn = sqlite3.connect(str(MCP_DB)) cursor = conn.cursor() now = int(datetime.now().timestamp()) # Get MCP task ID from link cursor.execute("SELECT mcp_task_id FROM task_links WHERE job_id = ?", (job_id,)) result = cursor.fetchone() if result: mcp_task_id = result[0] # Map exit code to status status = 2 if exit_code == 0 else 3 # 2=completed, 3=failed # Update MCP task cursor.execute(""" UPDATE tasks SET status = ?, completed_at = ?, exit_code = ? WHERE id = ? """, (status, now, exit_code, mcp_task_id)) # Update extended metadata cursor.execute(""" INSERT OR REPLACE INTO job_metadata_extended (job_id, completion_timestamp, exit_code, output_size_bytes) VALUES (?, ?, ?, ?) """, (job_id, datetime.now().isoformat(), exit_code, output_size)) conn.commit() conn.close() return True except Exception as e: print(f"Error updating job: {e}") return False if __name__ == "__main__": create_task_links_table() print("✓ MCP task queue integration initialized")