#!/usr/bin/env python3 """ Telegram Bridge for Luzia Orchestrator Provides Telegram integration for: 1. Sending task completion notifications to Bruno 2. Requesting human approval for sensitive operations 3. Forwarding cockpit questions to Bruno 4. Receiving responses via Telegram bot Uses the sarlo-admin MCP telegram tools or direct bot API. """ import os import json import logging import subprocess import time from pathlib import Path from typing import Optional, Dict, Any, Tuple from dataclasses import dataclass from datetime import datetime logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # Telegram bot configuration TELEGRAM_BOT_TOKEN_PATH = "/etc/telegram-bot/token" TELEGRAM_CHAT_ID_PATH = "/etc/telegram-bot/bruno_chat_id" PENDING_REQUESTS_DIR = Path("/var/lib/luzia/telegram_requests") @dataclass class TelegramRequest: """A pending request sent to Bruno via Telegram.""" request_id: str request_type: str # notification, approval, question message: str project: str job_id: Optional[str] created_at: str responded_at: Optional[str] = None response: Optional[str] = None status: str = "pending" # pending, responded, timeout class TelegramBridge: """Bridge between Luzia and Telegram for human-in-the-loop communication.""" def __init__(self): """Initialize Telegram bridge.""" self.pending_dir = PENDING_REQUESTS_DIR self.pending_dir.mkdir(parents=True, exist_ok=True) # Try to load bot credentials self.bot_token = self._load_credential(TELEGRAM_BOT_TOKEN_PATH) self.bruno_chat_id = self._load_credential(TELEGRAM_CHAT_ID_PATH) # Track connection status self.connected = False self._test_connection() logger.info(f"TelegramBridge initialized (connected: {self.connected})") def _load_credential(self, path: str) -> Optional[str]: """Load credential from file.""" try: return Path(path).read_text().strip() except (FileNotFoundError, PermissionError): return None def _test_connection(self) -> None: """Test Telegram connection via MCP.""" # We'll use the sarlo-admin MCP for Telegram # The MCP handles authentication via its own config self.connected = True # Assume MCP is available def send_notification(self, message: str, project: str = "luzia", job_id: Optional[str] = None, severity: str = "info") -> bool: """ Send a notification to Bruno via Telegram. Uses concise micro-linked format. """ # Concise icon and format icon = {"info": "â„šī¸", "warning": "âš ī¸", "critical": "🚨"}.get(severity, "đŸ“Ŧ") # Micro-linked: icon project | job msg_short = message[:80] + "..." if len(message) > 80 else message if job_id: formatted = f"{icon} *{project}* | `{job_id[:8]}`\n{msg_short}" else: formatted = f"{icon} *{project}*\n{msg_short}" return self._send_telegram_message(formatted) def request_approval(self, action: str, project: str, context: str = "", job_id: Optional[str] = None, timeout_minutes: int = 60) -> Tuple[str, bool]: """ Request approval from Bruno for a sensitive action. Uses concise micro-linked format. """ request_id = f"apr-{datetime.now().strftime('%H%M%S')}-{hash(action) & 0xffff:04x}" # Create pending request with full context for info lookup request = TelegramRequest( request_id=request_id, request_type="approval", message=action, project=project, job_id=job_id, created_at=datetime.now().isoformat(), ) # Store context separately for info button request_data = { "request_id": request.request_id, "request_type": request.request_type, "message": request.message, "project": request.project, "job_id": request.job_id, "created_at": request.created_at, "responded_at": request.responded_at, "response": request.response, "status": request.status, "context": context, # Extra context for info } request_file = self.pending_dir / f"{request_id}.json" request_file.write_text(json.dumps(request_data, indent=2)) # Concise micro-linked message action_short = action[:60] + "..." if len(action) > 60 else action formatted = f"🔐 *{project}* | `{request_id[:12]}`\n{action_short}" # Create inline keyboard with approve/deny/info buttons keyboard = [ [ {"text": "✅", "callback_data": f"approve:{request_id}"}, {"text": "❌", "callback_data": f"deny:{request_id}"}, {"text": "â„šī¸", "callback_data": f"info:{request_id}"} ] ] success = self._send_with_keyboard(formatted, keyboard) return request_id, success def ask_question(self, question: str, project: str, context: str = "", job_id: Optional[str] = None, options: list = None) -> Tuple[str, bool]: """ Ask Bruno a question and wait for response. Uses concise micro-linked format. """ request_id = f"qst-{datetime.now().strftime('%H%M%S')}-{hash(question) & 0xffff:04x}" # Store with context for info button request_data = { "request_id": request_id, "request_type": "question", "message": question, "project": project, "job_id": job_id, "created_at": datetime.now().isoformat(), "responded_at": None, "response": None, "status": "pending", "context": context, "options": options, } request_file = self.pending_dir / f"{request_id}.json" request_file.write_text(json.dumps(request_data, indent=2)) # Concise micro-linked question q_short = question[:60] + "..." if len(question) > 60 else question formatted = f"❓ *{project}* | `{request_id[:12]}`\n{q_short}" # Create inline keyboard keyboard = [] if options: # Compact option buttons (2 per row) row = [] for i, opt in enumerate(options): btn_text = opt[:15] + ".." if len(opt) > 15 else opt row.append({"text": btn_text, "callback_data": f"answer:{request_id}:{i}"}) if len(row) == 2: keyboard.append(row) row = [] if row: keyboard.append(row) # Always add info button keyboard.append([ {"text": "📝 Reply", "callback_data": f"custom:{request_id}"}, {"text": "â„šī¸", "callback_data": f"info:{request_id}"} ]) success = self._send_with_keyboard(formatted, keyboard) return request_id, success def check_response(self, request_id: str) -> Optional[TelegramRequest]: """ Check if Bruno has responded to a request. Args: request_id: Request ID to check Returns: TelegramRequest with response if available, None if not found """ request_file = self.pending_dir / f"{request_id}.json" if not request_file.exists(): return None try: data = json.loads(request_file.read_text()) # Only extract fields that TelegramRequest expects return TelegramRequest( request_id=data.get("request_id", ""), request_type=data.get("request_type", ""), message=data.get("message", ""), project=data.get("project", ""), job_id=data.get("job_id"), created_at=data.get("created_at", ""), responded_at=data.get("responded_at"), response=data.get("response"), status=data.get("status", "pending"), ) except Exception as e: logger.warning(f"Failed to load request {request_id}: {e}") return None def record_response(self, request_id: str, response: str, approved: bool = None) -> bool: """ Record a response from Bruno (called by bot webhook handler). Args: request_id: Request ID response: Bruno's response approved: For approval requests, True/False Returns: True if recorded successfully """ request_file = self.pending_dir / f"{request_id}.json" if not request_file.exists(): logger.warning(f"Request file not found: {request_id}") return False try: # Load raw data to preserve extra fields (context, options) data = json.loads(request_file.read_text()) # Update response fields data["response"] = response data["responded_at"] = datetime.now().isoformat() data["status"] = "responded" if data.get("request_type") == "approval" and approved is not None: data["status"] = "approved" if approved else "denied" # Save back with all fields preserved request_file.write_text(json.dumps(data, indent=2)) logger.info(f"Response recorded: {request_id} -> {response}") return True except Exception as e: logger.error(f"Failed to record response: {e}") return False def get_pending_requests(self, project: str = None) -> list: """Get all pending requests, optionally filtered by project.""" requests = [] for req_file in self.pending_dir.glob("*.json"): try: data = json.loads(req_file.read_text()) if data.get("status") == "pending": if project is None or data.get("project") == project: req = self.check_response(data.get("request_id", "")) if req: requests.append(req) except Exception: continue return sorted(requests, key=lambda r: r.created_at, reverse=True) def _save_pending_request(self, request: TelegramRequest) -> None: """Save pending request to disk.""" request_file = self.pending_dir / f"{request.request_id}.json" request_file.write_text(json.dumps({ "request_id": request.request_id, "request_type": request.request_type, "message": request.message, "project": request.project, "job_id": request.job_id, "created_at": request.created_at, "responded_at": request.responded_at, "response": request.response, "status": request.status, }, indent=2)) def _send_telegram_message(self, message: str, chat_id: int = None) -> bool: """ Send message via Telegram using sarlo-admin MCP or direct API. Args: message: Markdown-formatted message chat_id: Optional chat ID to send to (defaults to bruno_chat_id) Returns: True if sent successfully """ # Direct bot API call if credentials available target_chat = chat_id or self.bruno_chat_id if self.bot_token and target_chat: return self._send_via_bot_api(message, chat_id=target_chat) # Fallback: Write to notification file for manual pickup return self._write_notification_file(message) def _send_with_keyboard(self, message: str, keyboard: list) -> bool: """ Send message with inline keyboard buttons. Args: message: Markdown-formatted message keyboard: List of button rows, each row is a list of button dicts Each button: {"text": "Button Text", "callback_data": "data"} Returns: True if sent successfully """ if self.bot_token and self.bruno_chat_id: return self._send_via_bot_api(message, inline_keyboard=keyboard) # Fallback without buttons return self._write_notification_file(message) def _send_via_bot_api(self, message: str, inline_keyboard: list = None, chat_id: int = None) -> bool: """Send message directly via Telegram Bot API with optional inline keyboard.""" import urllib.request url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" target_chat = chat_id or self.bruno_chat_id payload = { "chat_id": target_chat, "text": message, "parse_mode": "Markdown", } # Add inline keyboard if provided if inline_keyboard: payload["reply_markup"] = json.dumps({ "inline_keyboard": inline_keyboard }) data = json.dumps(payload).encode('utf-8') try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req, timeout=10) as response: result = json.loads(response.read()) return result.get("ok", False) except Exception as e: logger.error(f"Telegram API error: {e}") return False def _write_notification_file(self, message: str) -> bool: """Write notification to file for manual pickup.""" notify_file = Path("/var/log/luz-orchestrator/telegram_pending.log") try: with open(notify_file, "a") as f: timestamp = datetime.now().isoformat() f.write(f"[{timestamp}]\n{message}\n{'='*40}\n") return True except Exception as e: logger.error(f"Failed to write notification: {e}") return False def poll_responses(self, timeout: int = 30) -> list: """ Poll Telegram for callback query responses (button clicks). Args: timeout: Long polling timeout in seconds Returns: List of processed responses """ if not self.bot_token: logger.warning("No bot token - cannot poll") return [] import urllib.request # Get updates from Telegram offset_file = self.pending_dir / ".telegram_offset" offset = 0 if offset_file.exists(): try: offset = int(offset_file.read_text().strip()) except: pass url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates" params = { "offset": offset, "timeout": timeout, "allowed_updates": ["callback_query", "message"] } try: data = json.dumps(params).encode('utf-8') req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req, timeout=timeout + 5) as response: result = json.loads(response.read()) if not result.get("ok"): return [] responses = [] for update in result.get("result", []): update_id = update.get("update_id", 0) # Process callback query (button click) if "callback_query" in update: resp = self._process_callback(update["callback_query"]) if resp: responses.append(resp) # Process message (text reply) elif "message" in update and "text" in update["message"]: resp = self._process_message(update["message"]) if resp: responses.append(resp) # Process voice message elif "message" in update and "voice" in update["message"]: resp = self._process_voice_message(update["message"]) if resp: responses.append(resp) # Process audio file elif "message" in update and "audio" in update["message"]: resp = self._process_audio_message(update["message"]) if resp: responses.append(resp) # Update offset offset = max(offset, update_id + 1) # Save offset offset_file.write_text(str(offset)) return responses except Exception as e: logger.error(f"Polling error: {e}") return [] def _process_callback(self, query: dict) -> Optional[dict]: """Process a callback query (button click).""" callback_id = query.get("id") data = query.get("data", "") user_id = str(query.get("from", {}).get("id", "")) logger.info(f"Callback from {user_id}: {data}") # Parse callback data parts = data.split(":") if len(parts) < 2: return None action = parts[0] request_id = parts[1] # Answer the callback to remove loading state self._answer_callback(callback_id, f"✅ {action.title()}!") # Process based on action if action == "approve": self.record_response(request_id, "approved", approved=True) return {"request_id": request_id, "action": "approved"} elif action == "deny": self.record_response(request_id, "denied", approved=False) return {"request_id": request_id, "action": "denied"} elif action == "answer": option_idx = parts[2] if len(parts) > 2 else "0" self.record_response(request_id, f"option_{option_idx}") return {"request_id": request_id, "action": f"option_{option_idx}"} elif action == "info": # Send detailed info about request self._send_request_details(request_id) return {"request_id": request_id, "action": "info_requested"} return None def _process_message(self, message: dict) -> Optional[dict]: """Process a text message.""" text = message.get("text", "") chat_id = message.get("chat", {}).get("id") user_id = str(message.get("from", {}).get("id", "")) # Check for command responses if text.startswith("/answer "): parts = text.split(maxsplit=2) if len(parts) >= 3: request_id = parts[1] answer = parts[2] self.record_response(request_id, answer) return {"request_id": request_id, "action": "answered", "response": answer} # Skip bot commands if text.startswith("/"): return None # Process regular text with Gemini logger.info(f"Text message from {user_id}: {text[:50]}...") result = self._process_text_with_gemini(text, chat_id) return result def _process_text_with_gemini(self, text: str, chat_id: int) -> Optional[dict]: """Process text message via LLM Proxy (centralized Gemini/Claude gateway).""" try: import urllib.request import json as json_module # Send typing indicator self._send_chat_action(chat_id, "typing") system_prompt = """You are Luzia, Bruno's AI assistant on the luz.uy server. Respond helpfully and conversationally. CAPABILITIES: - Execute tasks on projects: musica, overbits, dss, librechat, admin - Run server commands and check system status - Answer questions and have conversations RESPONSE GUIDELINES: - Be concise and friendly - Spanish or English based on user's language - If user requests a task, acknowledge it and add: 🚀 **Command:** `luzia ` - Keep responses under 500 characters when possible""" # Use local LLM Proxy (OpenAI-compatible API) payload = { "model": "gemini-3-pro-preview", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": text} ], "max_tokens": 1024, "temperature": 0.7 } url = "http://127.0.0.1:11434/v1/chat/completions" data = json_module.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=30) as resp: result = json_module.loads(resp.read()) if "choices" in result and result["choices"]: response_text = result["choices"][0].get("message", {}).get("content", "") if response_text: self._send_telegram_message(response_text, chat_id=chat_id) logger.info(f"LLM Proxy text response sent to chat {chat_id}") # Check for Luzia command and auto-execute luzia_cmd = self._extract_luzia_command(response_text) if luzia_cmd: self._execute_luzia_command(luzia_cmd, chat_id) return {"action": "text_processed", "response": response_text} return None except Exception as e: logger.error(f"Text processing error: {e}") self._send_telegram_message(f"❌ Error: {str(e)[:80]}", chat_id=chat_id) return None def _process_voice_message(self, message: dict) -> Optional[dict]: """Process a voice message using Gemini 3.""" voice = message.get("voice", {}) file_id = voice.get("file_id") chat_id = message.get("chat", {}).get("id") user_id = str(message.get("from", {}).get("id", "")) duration = voice.get("duration", 0) logger.info(f"Voice message from {user_id}: {duration}s") # Process with Gemini result = self._process_audio_with_gemini(file_id, chat_id, "voice") return result def _process_audio_message(self, message: dict) -> Optional[dict]: """Process an audio file using Gemini 3.""" audio = message.get("audio", {}) file_id = audio.get("file_id") chat_id = message.get("chat", {}).get("id") user_id = str(message.get("from", {}).get("id", "")) file_name = audio.get("file_name", "audio") logger.info(f"Audio file from {user_id}: {file_name}") # Process with Gemini result = self._process_audio_with_gemini(file_id, chat_id, "audio", file_name) return result def _process_audio_with_gemini(self, file_id: str, chat_id: int, audio_type: str = "voice", file_name: str = None) -> Optional[dict]: """Download and process audio with Gemini 3.""" try: import base64 # Send typing indicator self._send_chat_action(chat_id, "typing") # Download audio file from Telegram audio_data = self._download_telegram_file(file_id) if not audio_data: self._send_telegram_message("❌ Failed to download audio", chat_id=chat_id) return None # Get Gemini API key api_key = self._get_gemini_api_key() if not api_key: self._send_telegram_message("❌ Gemini API not configured", chat_id=chat_id) return None # Prepare Gemini request audio_b64 = base64.b64encode(audio_data).decode() if audio_type == "voice": prompt = """You are Luzia, Bruno's AI assistant on the luz.uy server. Listen to this voice message and respond helpfully in a conversational manner. CAPABILITIES: - You can execute tasks on projects (musica, overbits, dss, librechat, admin) - You can run server commands and check system status - You can answer questions and have conversations RESPONSE FORMAT: 📝 **Heard:** [brief transcription] 🤖 [your helpful, conversational response] If the user requests a task (like "deploy musica", "check server status", "run tests on dss"): - Acknowledge the request - Add: 🚀 **Command:** `luzia ` (the command that would execute this) Keep responses concise and friendly. Spanish or English based on what user speaks.""" else: prompt = f"""Analyze this audio file ({file_name or 'audio'}). If it contains speech, transcribe and respond conversationally. If it's music or other audio, describe what you hear. Keep response concise and helpful.""" # Call Gemini API import urllib.request import json as json_module payload = { "contents": [{ "parts": [ {"inline_data": {"mime_type": "audio/ogg", "data": audio_b64}}, {"text": prompt} ] }], "generationConfig": {"temperature": 0.3, "maxOutputTokens": 2048} } url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-3-pro-preview:generateContent?key={api_key}" data = json_module.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=60) as resp: result = json_module.loads(resp.read()) # Extract response if "candidates" in result and result["candidates"]: content = result["candidates"][0].get("content", {}) parts = content.get("parts", []) if parts: response_text = parts[0].get("text", "") # Send response to Telegram self._send_telegram_message(response_text, chat_id=chat_id) logger.info(f"Gemini audio response sent to chat {chat_id}") # Check for Luzia command in response and auto-execute luzia_cmd = self._extract_luzia_command(response_text) if luzia_cmd: self._execute_luzia_command(luzia_cmd, chat_id) return {"action": "voice_processed", "response": response_text} self._send_telegram_message("❌ No response from Gemini", chat_id=chat_id) return None except Exception as e: logger.error(f"Audio processing error: {e}") self._send_telegram_message(f"❌ Audio processing error: {str(e)[:100]}", chat_id=chat_id) return None def _download_telegram_file(self, file_id: str) -> Optional[bytes]: """Download a file from Telegram servers.""" import urllib.request try: # Get file path url = f"https://api.telegram.org/bot{self.bot_token}/getFile" data = json.dumps({"file_id": file_id}).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read()) if not result.get("ok"): return None file_path = result["result"]["file_path"] # Download file download_url = f"https://api.telegram.org/file/bot{self.bot_token}/{file_path}" with urllib.request.urlopen(download_url, timeout=30) as resp: return resp.read() except Exception as e: logger.error(f"Failed to download Telegram file: {e}") return None def _get_gemini_api_key(self) -> Optional[str]: """Get Gemini API key from LLM proxy config or PAL MCP env.""" try: # First try LLM Proxy config (preferred, centralized) llm_proxy_env = Path("/opt/server-agents/config/llm-proxy.env") if llm_proxy_env.exists(): for line in llm_proxy_env.read_text().split("\n"): if line.startswith("GEMINI_API_KEY=") and "=" in line: key = line.split("=", 1)[1].strip().strip('"\'') if key: return key # Fallback to PAL MCP env pal_env = Path("/opt/pal-mcp-server/.env") if pal_env.exists(): for line in pal_env.read_text().split("\n"): if line.startswith("GEMINI_API_KEY="): return line.split("=", 1)[1].strip().strip('"\'') except Exception as e: logger.error(f"Failed to load Gemini API key: {e}") return None def _send_chat_action(self, chat_id: int, action: str = "typing"): """Send chat action (typing indicator).""" import urllib.request url = f"https://api.telegram.org/bot{self.bot_token}/sendChatAction" data = json.dumps({"chat_id": chat_id, "action": action}).encode() try: req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) urllib.request.urlopen(req, timeout=5) except: pass def _extract_luzia_command(self, response_text: str) -> Optional[str]: """Extract luzia command from Gemini response if present.""" import re # Look for pattern: `luzia ` match = re.search(r'`luzia\s+(\w+)\s+([^`]+)`', response_text) if match: project = match.group(1) task = match.group(2).strip() return f"{project} {task}" return None def _execute_luzia_command(self, command: str, chat_id: int): """Execute a luzia command and notify via Telegram.""" import subprocess try: # Parse command parts = command.split(maxsplit=1) if len(parts) < 2: return project, task = parts[0], parts[1] # Validate project (security) valid_projects = ["musica", "overbits", "dss", "librechat", "admin", "assistant"] if project not in valid_projects: self._send_telegram_message(f"âš ī¸ Unknown project: {project}", chat_id=chat_id) return # Send confirmation self._send_telegram_message(f"🚀 Executing: `luzia {project} {task[:50]}...`", chat_id=chat_id) # Execute asynchronously (don't block) subprocess.Popen( ["/opt/server-agents/orchestrator/bin/luzia", project, task], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True ) logger.info(f"Luzia command dispatched: {project} {task[:50]}") except Exception as e: logger.error(f"Failed to execute luzia command: {e}") self._send_telegram_message(f"❌ Command failed: {str(e)[:50]}", chat_id=chat_id) def _answer_callback(self, callback_id: str, text: str): """Answer a callback query.""" import urllib.request url = f"https://api.telegram.org/bot{self.bot_token}/answerCallbackQuery" data = json.dumps({ "callback_query_id": callback_id, "text": text }).encode('utf-8') try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) urllib.request.urlopen(req, timeout=5) except Exception as e: logger.warning(f"Failed to answer callback: {e}") def _send_request_details(self, request_id: str): """Send detailed info about a request (triggered by info button).""" request_file = self.pending_dir / f"{request_id}.json" if not request_file.exists(): self._send_telegram_message(f"❓ Request `{request_id}` not found") return try: data = json.loads(request_file.read_text()) except: self._send_telegram_message(f"❓ Error reading request") return # Build detailed info message msg = f"📋 *Request Details*\n\n" msg += f"*ID:* `{data.get('request_id', 'unknown')}`\n" msg += f"*Type:* {data.get('request_type', 'unknown')}\n" msg += f"*Project:* {data.get('project', 'unknown')}\n" if data.get('job_id'): msg += f"*Job:* `{data['job_id']}`\n" msg += f"*Status:* {data.get('status', 'unknown')}\n" msg += f"*Created:* {data.get('created_at', 'unknown')[:19]}\n\n" msg += f"*Full Message:*\n{data.get('message', 'N/A')}\n" if data.get('context'): msg += f"\n*Context:*\n{data['context']}\n" if data.get('options'): msg += f"\n*Options:*\n" for i, opt in enumerate(data['options']): msg += f" {i+1}. {opt}\n" if data.get('response'): msg += f"\n*Response:* {data['response']}\n" msg += f"*Responded:* {data.get('responded_at', 'unknown')[:19]}\n" self._send_telegram_message(msg) # Convenience functions for use from luzia CLI _bridge_instance = None def get_bridge() -> TelegramBridge: """Get or create singleton bridge instance.""" global _bridge_instance if _bridge_instance is None: _bridge_instance = TelegramBridge() return _bridge_instance def notify_bruno(message: str, project: str = "luzia", severity: str = "info", job_id: str = None) -> bool: """Send notification to Bruno.""" return get_bridge().send_notification(message, project, job_id, severity) def ask_bruno(question: str, project: str = "luzia", context: str = "", options: list = None) -> Tuple[str, bool]: """Ask Bruno a question.""" return get_bridge().ask_question(question, project, context, options=options) def request_approval_from_bruno(action: str, project: str, context: str = "") -> Tuple[str, bool]: """Request approval from Bruno.""" return get_bridge().request_approval(action, project, context) def poll_for_responses(timeout: int = 5) -> list: """Poll Telegram for button responses.""" return get_bridge().poll_responses(timeout) # CLI for testing if __name__ == "__main__": import sys bridge = TelegramBridge() if len(sys.argv) > 1: cmd = sys.argv[1] if cmd == "notify": msg = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else "Test notification from Luzia" success = bridge.send_notification(msg, "test") print(f"Notification sent: {success}") elif cmd == "ask": question = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else "Test question?" req_id, success = bridge.ask_question(question, "test") print(f"Question sent: {success}, request_id: {req_id}") elif cmd == "approve": action = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else "Test action" req_id, success = bridge.request_approval(action, "test") print(f"Approval request sent: {success}, request_id: {req_id}") elif cmd == "pending": requests = bridge.get_pending_requests() print(f"Pending requests: {len(requests)}") for req in requests: print(f" [{req.request_type}] {req.request_id}: {req.message[:50]}...") elif cmd == "check": req_id = sys.argv[2] if len(sys.argv) > 2 else None if req_id: req = bridge.check_response(req_id) if req: print(f"Status: {req.status}") print(f"Response: {req.response}") else: print("Request not found") elif cmd == "poll": timeout = int(sys.argv[2]) if len(sys.argv) > 2 else 5 print(f"Polling for responses (timeout: {timeout}s)...") responses = bridge.poll_responses(timeout) if responses: print(f"Got {len(responses)} response(s):") for r in responses: print(f" {r['request_id']}: {r['action']}") else: print("No new responses") else: print("Usage:") print(" telegram_bridge.py notify ") print(" telegram_bridge.py ask ") print(" telegram_bridge.py approve ") print(" telegram_bridge.py pending") print(" telegram_bridge.py check ") print(" telegram_bridge.py poll [timeout_seconds]")