Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
151 lines
4.9 KiB
Python
Executable File
151 lines
4.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
MCP Task Queue Integration - Single Source of Truth
|
|
Registers all dispatched agents in MCP task queue for unified tracking.
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import uuid
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
MCP_DB = Path("/opt/server-agents/state/task_queue.db")
|
|
|
|
def create_task_links_table():
|
|
"""Create linking table between job and MCP task systems."""
|
|
if not MCP_DB.exists():
|
|
print(f"Warning: MCP DB not found at {MCP_DB}")
|
|
return False
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(MCP_DB))
|
|
cursor = conn.cursor()
|
|
|
|
# Create task_links table if not exists
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS task_links (
|
|
id TEXT PRIMARY KEY,
|
|
job_id TEXT NOT NULL UNIQUE,
|
|
mcp_task_id TEXT,
|
|
conductor_path TEXT,
|
|
project TEXT NOT NULL,
|
|
task_title TEXT,
|
|
dispatch_time INTEGER,
|
|
created_at INTEGER,
|
|
FOREIGN KEY (mcp_task_id) REFERENCES tasks(id)
|
|
)
|
|
""")
|
|
|
|
# Create job_metadata_extended table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS job_metadata_extended (
|
|
job_id TEXT PRIMARY KEY,
|
|
claude_session_id TEXT,
|
|
dispatch_timestamp TEXT,
|
|
completion_timestamp TEXT,
|
|
system_load_dispatch TEXT,
|
|
system_load_completion TEXT,
|
|
memory_percent_dispatch INTEGER,
|
|
memory_percent_completion INTEGER,
|
|
exit_code INTEGER,
|
|
output_size_bytes INTEGER
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error creating task_links table: {e}")
|
|
return False
|
|
|
|
def register_job_in_mcp_queue(job_id, project, task_title, claude_session_id=None):
|
|
"""Register a dispatched job in MCP task queue."""
|
|
if not MCP_DB.exists():
|
|
return None
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(MCP_DB))
|
|
cursor = conn.cursor()
|
|
|
|
# Create MCP task entry
|
|
mcp_task_id = f"t_{uuid.uuid4().hex[:12]}"
|
|
now = int(datetime.now().timestamp())
|
|
|
|
cursor.execute("""
|
|
INSERT INTO tasks (id, title, description, status, project, created_at, created_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (mcp_task_id, task_title, f"Job: {job_id}", 0, project, now, "luzia_orchestrator"))
|
|
|
|
# Link job to MCP task
|
|
link_id = f"link_{uuid.uuid4().hex[:8]}"
|
|
cursor.execute("""
|
|
INSERT INTO task_links (id, job_id, mcp_task_id, project, task_title, dispatch_time, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (link_id, job_id, mcp_task_id, project, task_title, now, now))
|
|
|
|
# Record extended metadata
|
|
if claude_session_id:
|
|
cursor.execute("""
|
|
INSERT INTO job_metadata_extended (job_id, claude_session_id, dispatch_timestamp)
|
|
VALUES (?, ?, ?)
|
|
""", (job_id, claude_session_id, datetime.now().isoformat()))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
'mcp_task_id': mcp_task_id,
|
|
'link_id': link_id,
|
|
'job_id': job_id,
|
|
}
|
|
except Exception as e:
|
|
print(f"Error registering job: {e}")
|
|
return None
|
|
|
|
def update_job_completion(job_id, exit_code, output_size=0, claude_session_id=None):
|
|
"""Update job completion status in both systems."""
|
|
if not MCP_DB.exists():
|
|
return False
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(MCP_DB))
|
|
cursor = conn.cursor()
|
|
|
|
now = int(datetime.now().timestamp())
|
|
|
|
# Get MCP task ID from link
|
|
cursor.execute("SELECT mcp_task_id FROM task_links WHERE job_id = ?", (job_id,))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
mcp_task_id = result[0]
|
|
|
|
# Map exit code to status
|
|
status = 2 if exit_code == 0 else 3 # 2=completed, 3=failed
|
|
|
|
# Update MCP task
|
|
cursor.execute("""
|
|
UPDATE tasks SET status = ?, completed_at = ?, exit_code = ?
|
|
WHERE id = ?
|
|
""", (status, now, exit_code, mcp_task_id))
|
|
|
|
# Update extended metadata
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO job_metadata_extended
|
|
(job_id, completion_timestamp, exit_code, output_size_bytes)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (job_id, datetime.now().isoformat(), exit_code, output_size))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error updating job: {e}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
create_task_links_table()
|
|
print("✓ MCP task queue integration initialized")
|