Files
luzia/lib/mcp_task_integration.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

151 lines
4.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
MCP Task Queue Integration - Single Source of Truth
Registers all dispatched agents in MCP task queue for unified tracking.
"""
import json
import sqlite3
import uuid
from pathlib import Path
from datetime import datetime
MCP_DB = Path("/opt/server-agents/state/task_queue.db")
def create_task_links_table():
"""Create linking table between job and MCP task systems."""
if not MCP_DB.exists():
print(f"Warning: MCP DB not found at {MCP_DB}")
return False
try:
conn = sqlite3.connect(str(MCP_DB))
cursor = conn.cursor()
# Create task_links table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_links (
id TEXT PRIMARY KEY,
job_id TEXT NOT NULL UNIQUE,
mcp_task_id TEXT,
conductor_path TEXT,
project TEXT NOT NULL,
task_title TEXT,
dispatch_time INTEGER,
created_at INTEGER,
FOREIGN KEY (mcp_task_id) REFERENCES tasks(id)
)
""")
# Create job_metadata_extended table
cursor.execute("""
CREATE TABLE IF NOT EXISTS job_metadata_extended (
job_id TEXT PRIMARY KEY,
claude_session_id TEXT,
dispatch_timestamp TEXT,
completion_timestamp TEXT,
system_load_dispatch TEXT,
system_load_completion TEXT,
memory_percent_dispatch INTEGER,
memory_percent_completion INTEGER,
exit_code INTEGER,
output_size_bytes INTEGER
)
""")
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Error creating task_links table: {e}")
return False
def register_job_in_mcp_queue(job_id, project, task_title, claude_session_id=None):
"""Register a dispatched job in MCP task queue."""
if not MCP_DB.exists():
return None
try:
conn = sqlite3.connect(str(MCP_DB))
cursor = conn.cursor()
# Create MCP task entry
mcp_task_id = f"t_{uuid.uuid4().hex[:12]}"
now = int(datetime.now().timestamp())
cursor.execute("""
INSERT INTO tasks (id, title, description, status, project, created_at, created_by)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (mcp_task_id, task_title, f"Job: {job_id}", 0, project, now, "luzia_orchestrator"))
# Link job to MCP task
link_id = f"link_{uuid.uuid4().hex[:8]}"
cursor.execute("""
INSERT INTO task_links (id, job_id, mcp_task_id, project, task_title, dispatch_time, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (link_id, job_id, mcp_task_id, project, task_title, now, now))
# Record extended metadata
if claude_session_id:
cursor.execute("""
INSERT INTO job_metadata_extended (job_id, claude_session_id, dispatch_timestamp)
VALUES (?, ?, ?)
""", (job_id, claude_session_id, datetime.now().isoformat()))
conn.commit()
conn.close()
return {
'mcp_task_id': mcp_task_id,
'link_id': link_id,
'job_id': job_id,
}
except Exception as e:
print(f"Error registering job: {e}")
return None
def update_job_completion(job_id, exit_code, output_size=0, claude_session_id=None):
"""Update job completion status in both systems."""
if not MCP_DB.exists():
return False
try:
conn = sqlite3.connect(str(MCP_DB))
cursor = conn.cursor()
now = int(datetime.now().timestamp())
# Get MCP task ID from link
cursor.execute("SELECT mcp_task_id FROM task_links WHERE job_id = ?", (job_id,))
result = cursor.fetchone()
if result:
mcp_task_id = result[0]
# Map exit code to status
status = 2 if exit_code == 0 else 3 # 2=completed, 3=failed
# Update MCP task
cursor.execute("""
UPDATE tasks SET status = ?, completed_at = ?, exit_code = ?
WHERE id = ?
""", (status, now, exit_code, mcp_task_id))
# Update extended metadata
cursor.execute("""
INSERT OR REPLACE INTO job_metadata_extended
(job_id, completion_timestamp, exit_code, output_size_bytes)
VALUES (?, ?, ?, ?)
""", (job_id, datetime.now().isoformat(), exit_code, output_size))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Error updating job: {e}")
return False
if __name__ == "__main__":
create_task_links_table()
print("✓ MCP task queue integration initialized")