Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
337 lines
9.6 KiB
Python
337 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Luzia Pending Requests Migrator
|
|
|
|
Migrates all pending requests from pending-requests.json to the task queue
|
|
with appropriate priority levels.
|
|
|
|
Features:
|
|
- Batch migration of historical requests
|
|
- Priority detection (URGENT keywords, approval status)
|
|
- Metadata preservation
|
|
- Dry-run mode for validation
|
|
- Migration tracking
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
|
from luzia_queue_manager import LuziaQueueManager, TaskPriority, TaskStatus
|
|
|
|
|
|
class PendingRequestsMigrator:
|
|
"""Migrate requests from pending-requests.json to task queue"""
|
|
|
|
def __init__(self, queue_manager: Optional[LuziaQueueManager] = None):
|
|
"""
|
|
Initialize migrator.
|
|
|
|
Args:
|
|
queue_manager: LuziaQueueManager instance
|
|
"""
|
|
self.queue_manager = queue_manager or LuziaQueueManager()
|
|
self.pending_file = Path("/opt/server-agents/state/pending-requests.json")
|
|
|
|
def load_pending_requests(self) -> Dict:
|
|
"""
|
|
Load pending requests from file.
|
|
|
|
Returns:
|
|
Request data dict
|
|
"""
|
|
if not self.pending_file.exists():
|
|
raise FileNotFoundError(f"File not found: {self.pending_file}")
|
|
|
|
with open(self.pending_file) as f:
|
|
return json.load(f)
|
|
|
|
def migrate_all(self, dry_run: bool = False) -> Tuple[int, int, List[str]]:
|
|
"""
|
|
Migrate all pending requests to queue.
|
|
|
|
Args:
|
|
dry_run: If True, don't actually insert tasks
|
|
|
|
Returns:
|
|
(success_count, error_count, error_messages)
|
|
"""
|
|
try:
|
|
data = self.load_pending_requests()
|
|
except Exception as e:
|
|
return 0, 1, [f"Failed to load pending requests: {e}"]
|
|
|
|
pending_list = data.get("pending", [])
|
|
if not pending_list:
|
|
return 0, 0, []
|
|
|
|
success_count = 0
|
|
error_count = 0
|
|
error_messages = []
|
|
|
|
for req in pending_list:
|
|
try:
|
|
result = self._migrate_single(req, dry_run=dry_run)
|
|
if result:
|
|
success_count += 1
|
|
else:
|
|
error_count += 1
|
|
error_messages.append(f"Failed to migrate request {req.get('id')}")
|
|
except Exception as e:
|
|
error_count += 1
|
|
error_messages.append(f"Error migrating {req.get('id')}: {e}")
|
|
|
|
return success_count, error_count, error_messages
|
|
|
|
def _migrate_single(self, req: Dict, dry_run: bool = False) -> bool:
|
|
"""
|
|
Migrate a single request.
|
|
|
|
Args:
|
|
req: Request dict
|
|
dry_run: If True, don't actually insert
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
# Extract request fields
|
|
req_id = req.get("id")
|
|
req_type = req.get("type")
|
|
user = req.get("user", "unknown")
|
|
reason = req.get("reason", "")
|
|
parameter = req.get("parameter", "")
|
|
status = req.get("status", "pending")
|
|
|
|
# Skip already completed or cancelled requests
|
|
if status in ["completed", "cancelled"]:
|
|
return True
|
|
|
|
# Determine priority
|
|
priority = self._determine_priority(reason, status)
|
|
|
|
# Build task description
|
|
if parameter:
|
|
task_desc = f"{req_type}: {parameter}"
|
|
else:
|
|
task_desc = f"{req_type} from {user}"
|
|
|
|
# Limit description length
|
|
task_desc = task_desc[:200]
|
|
|
|
# Build metadata
|
|
metadata = {
|
|
"original_request_id": req_id,
|
|
"request_type": req_type,
|
|
"user": user,
|
|
"request_status": status,
|
|
"parameter": parameter,
|
|
"migrated_at": datetime.now().isoformat(),
|
|
}
|
|
|
|
# If dry run, just return True without inserting
|
|
if dry_run:
|
|
return True
|
|
|
|
# Insert into queue
|
|
try:
|
|
task_id = self.queue_manager.enqueue_task(
|
|
project=user,
|
|
task=task_desc,
|
|
priority=priority,
|
|
metadata=metadata,
|
|
)
|
|
print(f"Migrated {req_id} -> {task_id}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error migrating {req_id}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
@staticmethod
|
|
def _determine_priority(reason: str, status: str) -> TaskPriority:
|
|
"""
|
|
Determine task priority based on request content.
|
|
|
|
Args:
|
|
reason: Request reason/description
|
|
status: Current request status
|
|
|
|
Returns:
|
|
TaskPriority level
|
|
"""
|
|
# Check for URGENT keyword
|
|
if "URGENT" in reason.upper() or "CRITICAL" in reason.upper():
|
|
return TaskPriority.HIGH
|
|
|
|
# Check for approval status (approved = higher priority)
|
|
if status in ["approved", "approved_by"]:
|
|
return TaskPriority.HIGH
|
|
|
|
# Default to normal
|
|
return TaskPriority.NORMAL
|
|
|
|
def get_migration_summary(self) -> Dict:
|
|
"""
|
|
Get summary of what would be migrated.
|
|
|
|
Returns:
|
|
Summary statistics
|
|
"""
|
|
try:
|
|
data = self.load_pending_requests()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
pending_list = data.get("pending", [])
|
|
|
|
# Categorize requests
|
|
by_status = {}
|
|
by_type = {}
|
|
by_priority = {}
|
|
|
|
for req in pending_list:
|
|
# Skip completed/cancelled
|
|
if req.get("status") in ["completed", "cancelled"]:
|
|
continue
|
|
|
|
status = req.get("status", "unknown")
|
|
by_status[status] = by_status.get(status, 0) + 1
|
|
|
|
req_type = req.get("type", "unknown")
|
|
by_type[req_type] = by_type.get(req_type, 0) + 1
|
|
|
|
reason = req.get("reason", "")
|
|
priority = self._determine_priority(reason, status)
|
|
priority_name = priority.name
|
|
by_priority[priority_name] = by_priority.get(priority_name, 0) + 1
|
|
|
|
return {
|
|
"total_pending": len(pending_list),
|
|
"by_status": by_status,
|
|
"by_type": by_type,
|
|
"by_priority": by_priority,
|
|
}
|
|
|
|
@staticmethod
|
|
def backup_original(backup_dir: Optional[Path] = None) -> bool:
|
|
"""
|
|
Backup original pending-requests.json before migration.
|
|
|
|
Args:
|
|
backup_dir: Directory to store backup
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
source = Path("/opt/server-agents/state/pending-requests.json")
|
|
if not source.exists():
|
|
return False
|
|
|
|
backup_dir = backup_dir or Path("/opt/server-agents/state/backups")
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_file = backup_dir / f"pending-requests.{timestamp}.json"
|
|
|
|
try:
|
|
with open(source) as f:
|
|
data = json.load(f)
|
|
|
|
with open(backup_file, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Backup failed: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""CLI entry point for migration"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Migrate pending requests to task queue"
|
|
)
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would be migrated")
|
|
parser.add_argument("--backup", action="store_true", help="Backup original file")
|
|
parser.add_argument("--summary", action="store_true", help="Show migration summary")
|
|
parser.add_argument("--force", action="store_true", help="Skip confirmations")
|
|
|
|
args = parser.parse_args()
|
|
|
|
migrator = PendingRequestsMigrator()
|
|
|
|
# Show summary
|
|
print("\n" + "="*70)
|
|
print("PENDING REQUESTS MIGRATION".center(70))
|
|
print("="*70 + "\n")
|
|
|
|
summary = migrator.get_migration_summary()
|
|
|
|
if "error" in summary:
|
|
print(f"Error: {summary['error']}")
|
|
return 1
|
|
|
|
print(f"Pending Requests: {summary.get('total_pending', 0)}\n")
|
|
|
|
if summary.get("by_status"):
|
|
print("By Status:")
|
|
for status, count in summary["by_status"].items():
|
|
print(f" {status:12s}: {count}")
|
|
print()
|
|
|
|
if summary.get("by_type"):
|
|
print("By Type:")
|
|
for req_type, count in summary["by_type"].items():
|
|
print(f" {req_type:20s}: {count}")
|
|
print()
|
|
|
|
if summary.get("by_priority"):
|
|
print("By Priority:")
|
|
for priority, count in summary["by_priority"].items():
|
|
print(f" {priority:12s}: {count}")
|
|
print()
|
|
|
|
if args.summary:
|
|
return 0
|
|
|
|
# Backup if requested
|
|
if args.backup:
|
|
print("Backing up original file...")
|
|
if migrator.backup_original():
|
|
print("Backup created successfully\n")
|
|
else:
|
|
print("Warning: Backup failed\n")
|
|
|
|
# Confirm migration
|
|
if not args.force and not args.dry_run:
|
|
response = input("Proceed with migration? [y/N] ")
|
|
if response.lower() != "y":
|
|
print("Cancelled")
|
|
return 1
|
|
|
|
# Run migration
|
|
dry_run = args.dry_run
|
|
print(f"\nRunning migration ({'dry-run' if dry_run else 'for real'})...\n")
|
|
|
|
success, errors, error_msgs = migrator.migrate_all(dry_run=dry_run)
|
|
|
|
print(f"\nResults:")
|
|
print(f" Successful: {success}")
|
|
print(f" Failed: {errors}")
|
|
|
|
if error_msgs:
|
|
print(f"\nErrors:")
|
|
for msg in error_msgs:
|
|
print(f" - {msg}")
|
|
|
|
print("\n" + "="*70 + "\n")
|
|
|
|
return 0 if errors == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|