#!/usr/bin/env python3 """ Luzia Claude Interface Bridge Bridges status events from Luzia to Claude CLI with formatting Usage: bridge = LuziaClaudeBridge(publisher) asyncio.create_task(bridge.stream_status_updates()) """ import asyncio import json from datetime import datetime from typing import Optional, Callable, List, Dict, Any from dataclasses import dataclass import logging from luzia_status_publisher_impl import LuziaStatusPublisher, StatusMessage, StatusMessageType logger = logging.getLogger(__name__) @dataclass class BufferedMessage: """Message stored in buffer""" timestamp: int text: str message_type: str task_id: str severity: str project: str class LuziaClaudeBridge: """ Bridges Luzia status events to Claude CLI interface Formats messages for terminal display with emojis/colors """ def __init__( self, status_publisher: LuziaStatusPublisher, output_fn: Optional[Callable] = None, max_buffer_size: int = 50 ): """ Initialize Claude bridge Args: status_publisher: LuziaStatusPublisher instance output_fn: Optional output function (default: print) max_buffer_size: Max messages to keep in buffer """ self.publisher = status_publisher self.output_fn = output_fn or self._default_output self.message_buffer: List[BufferedMessage] = [] self.max_buffer_size = max_buffer_size self.last_task_display: Dict[str, str] = {} self.task_groups: Dict[str, List[str]] = {} # project -> [task_ids] def _default_output(self, message: str): """Default output to stdout""" print(message) async def handle_status_event(self, msg: StatusMessage): """Handle incoming status message from Luzia""" display_text = msg.to_compact_display() # Create buffered message buffered = BufferedMessage( timestamp=msg.timestamp, text=display_text, message_type=msg.type.value, task_id=msg.task_id, severity=msg.severity.value, project=msg.project ) # Buffer message self.message_buffer.append(buffered) # Keep buffer size reasonable if len(self.message_buffer) > self.max_buffer_size: self.message_buffer = self.message_buffer[-self.max_buffer_size:] # Track task groups if msg.project not in self.task_groups: self.task_groups[msg.project] = [] if msg.task_id not in self.task_groups[msg.project]: self.task_groups[msg.project].append(msg.task_id) # Output to user self.output_fn(display_text) # Log to file if configured await self._log_event(msg) async def _log_event(self, msg: StatusMessage, log_file: Optional[str] = None): """Optionally log event to file""" if not log_file: log_file = "/tmp/luzia_status.jsonl" try: with open(log_file, "a") as f: f.write(msg.to_json() + "\n") except Exception as e: logger.error(f"Failed to log event: {e}") async def stream_status_updates(self): """ Main async loop for streaming updates to Claude Run this as a background task """ try: async for event in self.publisher.get_events_stream(): await self.handle_status_event(event) except asyncio.CancelledError: logger.info("Status streaming cancelled") except Exception as e: logger.error(f"Error in status streaming: {e}") def get_recent_updates(self, limit: int = 10) -> str: """Get last N updates formatted for display""" recent = self.message_buffer[-limit:] if self.message_buffer else [] if not recent: return "No recent updates" result = "📋 Recent Luzia Activity:\n" + "─" * 48 + "\n" for msg in recent: # Add timestamp ts = datetime.fromtimestamp(msg.timestamp) result += f"[{ts.strftime('%H:%M:%S')}] {msg.text}\n" return result def get_dashboard(self) -> str: """Get current system dashboard""" summary = self.publisher.get_active_tasks_summary() # Build dashboard dashboard_lines = [ "╔════════════════════════════════════════╗", "║ LUZIA STATUS DASHBOARD ║", "╚════════════════════════════════════════╝", "", f"Active Tasks: {summary['active_count']}", "" ] # Show active tasks by project if summary["active_count"] > 0: dashboard_lines.append("Active by Project:") for project, task_id in summary["tasks"].items(): dashboard_lines.append(f" • {project}: {task_id}") dashboard_lines.append("") # Show recent updates dashboard_lines.append(self.get_recent_updates(5)) return "\n".join(dashboard_lines) def get_task_summary(self, task_id: str) -> Optional[str]: """Get summary for specific task""" # Find all messages for this task task_messages = [m for m in self.message_buffer if m.task_id == task_id] if not task_messages: return None # Show timeline result = f"Task: {task_id}\n" + "─" * 40 + "\n" for msg in task_messages: ts = datetime.fromtimestamp(msg.timestamp) result += f"[{ts.strftime('%H:%M:%S')}] {msg.message_type}\n" result += f" → {msg.text}\n" return result def get_project_summary(self, project: str) -> str: """Get summary for specific project""" project_messages = [m for m in self.message_buffer if m.project == project] if not project_messages: return f"No activity for project: {project}" result = f"Project: {project}\n" + "─" * 40 + "\n" # Count by type type_counts = {} for msg in project_messages: type_counts[msg.message_type] = type_counts.get(msg.message_type, 0) + 1 result += "Summary:\n" for msg_type, count in type_counts.items(): result += f" • {msg_type}: {count}\n" result += "\nRecent:\n" for msg in project_messages[-5:]: ts = datetime.fromtimestamp(msg.timestamp) result += f" [{ts.strftime('%H:%M:%S')}] {msg.text.split(chr(10))[0]}\n" return result def get_alerts_only(self) -> str: """Get only warning and error messages""" alerts = [m for m in self.message_buffer if m.severity in ("warning", "error", "critical")] if not alerts: return "✅ No alerts" result = "⚠️ ALERTS\n" + "─" * 40 + "\n" for alert in alerts: ts = datetime.fromtimestamp(alert.timestamp) result += f"[{ts.strftime('%H:%M:%S')}] [{alert.severity.upper()}] {alert.project}\n" result += f" {alert.text}\n" return result def export_to_json(self, filepath: str): """Export message history to JSON""" data = [ { "timestamp": m.timestamp, "type": m.message_type, "project": m.project, "task_id": m.task_id, "severity": m.severity, "text": m.text } for m in self.message_buffer ] with open(filepath, "w") as f: json.dump(data, f, indent=2) logger.info(f"Exported {len(data)} messages to {filepath}") def export_to_markdown(self, filepath: str): """Export message history to Markdown""" lines = [ "# Luzia Status Report", f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "", f"Total Messages: {len(self.message_buffer)}", "", "## Timeline", "" ] # Group by project by_project = {} for msg in self.message_buffer: if msg.project not in by_project: by_project[msg.project] = [] by_project[msg.project].append(msg) for project in sorted(by_project.keys()): lines.append(f"### {project}") lines.append("") for msg in by_project[project]: ts = datetime.fromtimestamp(msg.timestamp) lines.append(f"**{ts.strftime('%H:%M:%S')}** - `{msg.message_type}`") lines.append(f"> {msg.text}") lines.append("") with open(filepath, "w") as f: f.write("\n".join(lines)) logger.info(f"Exported to {filepath}") class CLIStatusHelper: """Helper for CLI commands related to status""" def __init__(self, bridge: LuziaClaudeBridge): self.bridge = bridge async def handle_command(self, command: str, args: List[str]) -> str: """ Handle CLI status commands Commands: status - Show dashboard status - Show specific task status --project - Show project summary status --alerts - Show only alerts status --recent - Show last n updates status --export json - Export to JSON status --export markdown - Export to Markdown """ if not args: return self.bridge.get_dashboard() if args[0].startswith("--"): flag = args[0] if flag == "--alerts": return self.bridge.get_alerts_only() elif flag == "--recent": limit = int(args[1]) if len(args) > 1 else 10 return self.bridge.get_recent_updates(limit) elif flag == "--project": project = args[1] if len(args) > 1 else None if project: return self.bridge.get_project_summary(project) return "Usage: status --project " elif flag == "--export": format_type = args[1] if len(args) > 1 else "json" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if format_type == "json": filepath = f"/tmp/luzia_status_{timestamp}.json" self.bridge.export_to_json(filepath) return f"✅ Exported to {filepath}" elif format_type == "markdown": filepath = f"/tmp/luzia_status_{timestamp}.md" self.bridge.export_to_markdown(filepath) return f"✅ Exported to {filepath}" # Treat as task ID task_id = args[0] result = self.bridge.get_task_summary(task_id) return result or f"Task not found: {task_id}" async def example_usage(): """Example usage of Claude bridge""" # Create publisher publisher = LuziaStatusPublisher() publisher.set_verbosity("normal") # Create bridge bridge = LuziaClaudeBridge(publisher) # Start streaming in background stream_task = asyncio.create_task(bridge.stream_status_updates()) # Simulate some events await publisher.publish_task_started( task_id="test-001", project="musica", description="Test audio engine", estimated_duration_seconds=60 ) await asyncio.sleep(1) await publisher.publish_progress( task_id="test-001", progress_percent=50, current_step=2, total_steps=4, current_step_name="Testing synthesis", elapsed_seconds=30, estimated_remaining_seconds=30 ) await asyncio.sleep(1) # Show dashboard print("\n" + bridge.get_dashboard()) # Show alerts print("\n" + bridge.get_alerts_only()) # Cancel streaming stream_task.cancel() if __name__ == "__main__": asyncio.run(example_usage())