Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
982 lines
36 KiB
Python
982 lines
36 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Telegram Bridge for Luzia Orchestrator
|
||
|
||
Provides Telegram integration for:
|
||
1. Sending task completion notifications to Bruno
|
||
2. Requesting human approval for sensitive operations
|
||
3. Forwarding cockpit questions to Bruno
|
||
4. Receiving responses via Telegram bot
|
||
|
||
Uses the sarlo-admin MCP telegram tools or direct bot API.
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import logging
|
||
import subprocess
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, Tuple
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Telegram bot configuration
|
||
TELEGRAM_BOT_TOKEN_PATH = "/etc/telegram-bot/token"
|
||
TELEGRAM_CHAT_ID_PATH = "/etc/telegram-bot/bruno_chat_id"
|
||
PENDING_REQUESTS_DIR = Path("/var/lib/luzia/telegram_requests")
|
||
|
||
|
||
@dataclass
|
||
class TelegramRequest:
|
||
"""A pending request sent to Bruno via Telegram."""
|
||
request_id: str
|
||
request_type: str # notification, approval, question
|
||
message: str
|
||
project: str
|
||
job_id: Optional[str]
|
||
created_at: str
|
||
responded_at: Optional[str] = None
|
||
response: Optional[str] = None
|
||
status: str = "pending" # pending, responded, timeout
|
||
|
||
|
||
class TelegramBridge:
|
||
"""Bridge between Luzia and Telegram for human-in-the-loop communication."""
|
||
|
||
def __init__(self):
|
||
"""Initialize Telegram bridge."""
|
||
self.pending_dir = PENDING_REQUESTS_DIR
|
||
self.pending_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Try to load bot credentials
|
||
self.bot_token = self._load_credential(TELEGRAM_BOT_TOKEN_PATH)
|
||
self.bruno_chat_id = self._load_credential(TELEGRAM_CHAT_ID_PATH)
|
||
|
||
# Track connection status
|
||
self.connected = False
|
||
self._test_connection()
|
||
|
||
logger.info(f"TelegramBridge initialized (connected: {self.connected})")
|
||
|
||
def _load_credential(self, path: str) -> Optional[str]:
|
||
"""Load credential from file."""
|
||
try:
|
||
return Path(path).read_text().strip()
|
||
except (FileNotFoundError, PermissionError):
|
||
return None
|
||
|
||
def _test_connection(self) -> None:
|
||
"""Test Telegram connection via MCP."""
|
||
# We'll use the sarlo-admin MCP for Telegram
|
||
# The MCP handles authentication via its own config
|
||
self.connected = True # Assume MCP is available
|
||
|
||
def send_notification(self, message: str, project: str = "luzia",
|
||
job_id: Optional[str] = None,
|
||
severity: str = "info") -> bool:
|
||
"""
|
||
Send a notification to Bruno via Telegram.
|
||
Uses concise micro-linked format.
|
||
"""
|
||
# Concise icon and format
|
||
icon = {"info": "ℹ️", "warning": "⚠️", "critical": "🚨"}.get(severity, "📬")
|
||
|
||
# Micro-linked: icon project | job
|
||
msg_short = message[:80] + "..." if len(message) > 80 else message
|
||
if job_id:
|
||
formatted = f"{icon} *{project}* | `{job_id[:8]}`\n{msg_short}"
|
||
else:
|
||
formatted = f"{icon} *{project}*\n{msg_short}"
|
||
|
||
return self._send_telegram_message(formatted)
|
||
|
||
def request_approval(self, action: str, project: str,
|
||
context: str = "", job_id: Optional[str] = None,
|
||
timeout_minutes: int = 60) -> Tuple[str, bool]:
|
||
"""
|
||
Request approval from Bruno for a sensitive action.
|
||
Uses concise micro-linked format.
|
||
"""
|
||
request_id = f"apr-{datetime.now().strftime('%H%M%S')}-{hash(action) & 0xffff:04x}"
|
||
|
||
# Create pending request with full context for info lookup
|
||
request = TelegramRequest(
|
||
request_id=request_id,
|
||
request_type="approval",
|
||
message=action,
|
||
project=project,
|
||
job_id=job_id,
|
||
created_at=datetime.now().isoformat(),
|
||
)
|
||
# Store context separately for info button
|
||
request_data = {
|
||
"request_id": request.request_id,
|
||
"request_type": request.request_type,
|
||
"message": request.message,
|
||
"project": request.project,
|
||
"job_id": request.job_id,
|
||
"created_at": request.created_at,
|
||
"responded_at": request.responded_at,
|
||
"response": request.response,
|
||
"status": request.status,
|
||
"context": context, # Extra context for info
|
||
}
|
||
request_file = self.pending_dir / f"{request_id}.json"
|
||
request_file.write_text(json.dumps(request_data, indent=2))
|
||
|
||
# Concise micro-linked message
|
||
action_short = action[:60] + "..." if len(action) > 60 else action
|
||
formatted = f"🔐 *{project}* | `{request_id[:12]}`\n{action_short}"
|
||
|
||
# Create inline keyboard with approve/deny/info buttons
|
||
keyboard = [
|
||
[
|
||
{"text": "✅", "callback_data": f"approve:{request_id}"},
|
||
{"text": "❌", "callback_data": f"deny:{request_id}"},
|
||
{"text": "ℹ️", "callback_data": f"info:{request_id}"}
|
||
]
|
||
]
|
||
|
||
success = self._send_with_keyboard(formatted, keyboard)
|
||
|
||
return request_id, success
|
||
|
||
def ask_question(self, question: str, project: str,
|
||
context: str = "", job_id: Optional[str] = None,
|
||
options: list = None) -> Tuple[str, bool]:
|
||
"""
|
||
Ask Bruno a question and wait for response.
|
||
Uses concise micro-linked format.
|
||
"""
|
||
request_id = f"qst-{datetime.now().strftime('%H%M%S')}-{hash(question) & 0xffff:04x}"
|
||
|
||
# Store with context for info button
|
||
request_data = {
|
||
"request_id": request_id,
|
||
"request_type": "question",
|
||
"message": question,
|
||
"project": project,
|
||
"job_id": job_id,
|
||
"created_at": datetime.now().isoformat(),
|
||
"responded_at": None,
|
||
"response": None,
|
||
"status": "pending",
|
||
"context": context,
|
||
"options": options,
|
||
}
|
||
request_file = self.pending_dir / f"{request_id}.json"
|
||
request_file.write_text(json.dumps(request_data, indent=2))
|
||
|
||
# Concise micro-linked question
|
||
q_short = question[:60] + "..." if len(question) > 60 else question
|
||
formatted = f"❓ *{project}* | `{request_id[:12]}`\n{q_short}"
|
||
|
||
# Create inline keyboard
|
||
keyboard = []
|
||
if options:
|
||
# Compact option buttons (2 per row)
|
||
row = []
|
||
for i, opt in enumerate(options):
|
||
btn_text = opt[:15] + ".." if len(opt) > 15 else opt
|
||
row.append({"text": btn_text, "callback_data": f"answer:{request_id}:{i}"})
|
||
if len(row) == 2:
|
||
keyboard.append(row)
|
||
row = []
|
||
if row:
|
||
keyboard.append(row)
|
||
|
||
# Always add info button
|
||
keyboard.append([
|
||
{"text": "📝 Reply", "callback_data": f"custom:{request_id}"},
|
||
{"text": "ℹ️", "callback_data": f"info:{request_id}"}
|
||
])
|
||
|
||
success = self._send_with_keyboard(formatted, keyboard)
|
||
|
||
return request_id, success
|
||
|
||
def check_response(self, request_id: str) -> Optional[TelegramRequest]:
|
||
"""
|
||
Check if Bruno has responded to a request.
|
||
|
||
Args:
|
||
request_id: Request ID to check
|
||
|
||
Returns:
|
||
TelegramRequest with response if available, None if not found
|
||
"""
|
||
request_file = self.pending_dir / f"{request_id}.json"
|
||
if not request_file.exists():
|
||
return None
|
||
|
||
try:
|
||
data = json.loads(request_file.read_text())
|
||
# Only extract fields that TelegramRequest expects
|
||
return TelegramRequest(
|
||
request_id=data.get("request_id", ""),
|
||
request_type=data.get("request_type", ""),
|
||
message=data.get("message", ""),
|
||
project=data.get("project", ""),
|
||
job_id=data.get("job_id"),
|
||
created_at=data.get("created_at", ""),
|
||
responded_at=data.get("responded_at"),
|
||
response=data.get("response"),
|
||
status=data.get("status", "pending"),
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load request {request_id}: {e}")
|
||
return None
|
||
|
||
def record_response(self, request_id: str, response: str,
|
||
approved: bool = None) -> bool:
|
||
"""
|
||
Record a response from Bruno (called by bot webhook handler).
|
||
|
||
Args:
|
||
request_id: Request ID
|
||
response: Bruno's response
|
||
approved: For approval requests, True/False
|
||
|
||
Returns:
|
||
True if recorded successfully
|
||
"""
|
||
request_file = self.pending_dir / f"{request_id}.json"
|
||
if not request_file.exists():
|
||
logger.warning(f"Request file not found: {request_id}")
|
||
return False
|
||
|
||
try:
|
||
# Load raw data to preserve extra fields (context, options)
|
||
data = json.loads(request_file.read_text())
|
||
|
||
# Update response fields
|
||
data["response"] = response
|
||
data["responded_at"] = datetime.now().isoformat()
|
||
data["status"] = "responded"
|
||
|
||
if data.get("request_type") == "approval" and approved is not None:
|
||
data["status"] = "approved" if approved else "denied"
|
||
|
||
# Save back with all fields preserved
|
||
request_file.write_text(json.dumps(data, indent=2))
|
||
logger.info(f"Response recorded: {request_id} -> {response}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to record response: {e}")
|
||
return False
|
||
|
||
def get_pending_requests(self, project: str = None) -> list:
|
||
"""Get all pending requests, optionally filtered by project."""
|
||
requests = []
|
||
for req_file in self.pending_dir.glob("*.json"):
|
||
try:
|
||
data = json.loads(req_file.read_text())
|
||
if data.get("status") == "pending":
|
||
if project is None or data.get("project") == project:
|
||
req = self.check_response(data.get("request_id", ""))
|
||
if req:
|
||
requests.append(req)
|
||
except Exception:
|
||
continue
|
||
return sorted(requests, key=lambda r: r.created_at, reverse=True)
|
||
|
||
def _save_pending_request(self, request: TelegramRequest) -> None:
|
||
"""Save pending request to disk."""
|
||
request_file = self.pending_dir / f"{request.request_id}.json"
|
||
request_file.write_text(json.dumps({
|
||
"request_id": request.request_id,
|
||
"request_type": request.request_type,
|
||
"message": request.message,
|
||
"project": request.project,
|
||
"job_id": request.job_id,
|
||
"created_at": request.created_at,
|
||
"responded_at": request.responded_at,
|
||
"response": request.response,
|
||
"status": request.status,
|
||
}, indent=2))
|
||
|
||
def _send_telegram_message(self, message: str, chat_id: int = None) -> bool:
|
||
"""
|
||
Send message via Telegram using sarlo-admin MCP or direct API.
|
||
|
||
Args:
|
||
message: Markdown-formatted message
|
||
chat_id: Optional chat ID to send to (defaults to bruno_chat_id)
|
||
|
||
Returns:
|
||
True if sent successfully
|
||
"""
|
||
# Direct bot API call if credentials available
|
||
target_chat = chat_id or self.bruno_chat_id
|
||
if self.bot_token and target_chat:
|
||
return self._send_via_bot_api(message, chat_id=target_chat)
|
||
|
||
# Fallback: Write to notification file for manual pickup
|
||
return self._write_notification_file(message)
|
||
|
||
def _send_with_keyboard(self, message: str, keyboard: list) -> bool:
|
||
"""
|
||
Send message with inline keyboard buttons.
|
||
|
||
Args:
|
||
message: Markdown-formatted message
|
||
keyboard: List of button rows, each row is a list of button dicts
|
||
Each button: {"text": "Button Text", "callback_data": "data"}
|
||
|
||
Returns:
|
||
True if sent successfully
|
||
"""
|
||
if self.bot_token and self.bruno_chat_id:
|
||
return self._send_via_bot_api(message, inline_keyboard=keyboard)
|
||
|
||
# Fallback without buttons
|
||
return self._write_notification_file(message)
|
||
|
||
def _send_via_bot_api(self, message: str, inline_keyboard: list = None, chat_id: int = None) -> bool:
|
||
"""Send message directly via Telegram Bot API with optional inline keyboard."""
|
||
import urllib.request
|
||
|
||
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
|
||
|
||
target_chat = chat_id or self.bruno_chat_id
|
||
payload = {
|
||
"chat_id": target_chat,
|
||
"text": message,
|
||
"parse_mode": "Markdown",
|
||
}
|
||
|
||
# Add inline keyboard if provided
|
||
if inline_keyboard:
|
||
payload["reply_markup"] = json.dumps({
|
||
"inline_keyboard": inline_keyboard
|
||
})
|
||
|
||
data = json.dumps(payload).encode('utf-8')
|
||
|
||
try:
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=data,
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
with urllib.request.urlopen(req, timeout=10) as response:
|
||
result = json.loads(response.read())
|
||
return result.get("ok", False)
|
||
except Exception as e:
|
||
logger.error(f"Telegram API error: {e}")
|
||
return False
|
||
|
||
def _write_notification_file(self, message: str) -> bool:
|
||
"""Write notification to file for manual pickup."""
|
||
notify_file = Path("/var/log/luz-orchestrator/telegram_pending.log")
|
||
try:
|
||
with open(notify_file, "a") as f:
|
||
timestamp = datetime.now().isoformat()
|
||
f.write(f"[{timestamp}]\n{message}\n{'='*40}\n")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to write notification: {e}")
|
||
return False
|
||
|
||
def poll_responses(self, timeout: int = 30) -> list:
|
||
"""
|
||
Poll Telegram for callback query responses (button clicks).
|
||
|
||
Args:
|
||
timeout: Long polling timeout in seconds
|
||
|
||
Returns:
|
||
List of processed responses
|
||
"""
|
||
if not self.bot_token:
|
||
logger.warning("No bot token - cannot poll")
|
||
return []
|
||
|
||
import urllib.request
|
||
|
||
# Get updates from Telegram
|
||
offset_file = self.pending_dir / ".telegram_offset"
|
||
offset = 0
|
||
if offset_file.exists():
|
||
try:
|
||
offset = int(offset_file.read_text().strip())
|
||
except:
|
||
pass
|
||
|
||
url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates"
|
||
params = {
|
||
"offset": offset,
|
||
"timeout": timeout,
|
||
"allowed_updates": ["callback_query", "message"]
|
||
}
|
||
|
||
try:
|
||
data = json.dumps(params).encode('utf-8')
|
||
req = urllib.request.Request(
|
||
url, data=data,
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
with urllib.request.urlopen(req, timeout=timeout + 5) as response:
|
||
result = json.loads(response.read())
|
||
|
||
if not result.get("ok"):
|
||
return []
|
||
|
||
responses = []
|
||
for update in result.get("result", []):
|
||
update_id = update.get("update_id", 0)
|
||
|
||
# Process callback query (button click)
|
||
if "callback_query" in update:
|
||
resp = self._process_callback(update["callback_query"])
|
||
if resp:
|
||
responses.append(resp)
|
||
|
||
# Process message (text reply)
|
||
elif "message" in update and "text" in update["message"]:
|
||
resp = self._process_message(update["message"])
|
||
if resp:
|
||
responses.append(resp)
|
||
|
||
# Process voice message
|
||
elif "message" in update and "voice" in update["message"]:
|
||
resp = self._process_voice_message(update["message"])
|
||
if resp:
|
||
responses.append(resp)
|
||
|
||
# Process audio file
|
||
elif "message" in update and "audio" in update["message"]:
|
||
resp = self._process_audio_message(update["message"])
|
||
if resp:
|
||
responses.append(resp)
|
||
|
||
# Update offset
|
||
offset = max(offset, update_id + 1)
|
||
|
||
# Save offset
|
||
offset_file.write_text(str(offset))
|
||
|
||
return responses
|
||
|
||
except Exception as e:
|
||
logger.error(f"Polling error: {e}")
|
||
return []
|
||
|
||
def _process_callback(self, query: dict) -> Optional[dict]:
|
||
"""Process a callback query (button click)."""
|
||
callback_id = query.get("id")
|
||
data = query.get("data", "")
|
||
user_id = str(query.get("from", {}).get("id", ""))
|
||
|
||
logger.info(f"Callback from {user_id}: {data}")
|
||
|
||
# Parse callback data
|
||
parts = data.split(":")
|
||
if len(parts) < 2:
|
||
return None
|
||
|
||
action = parts[0]
|
||
request_id = parts[1]
|
||
|
||
# Answer the callback to remove loading state
|
||
self._answer_callback(callback_id, f"✅ {action.title()}!")
|
||
|
||
# Process based on action
|
||
if action == "approve":
|
||
self.record_response(request_id, "approved", approved=True)
|
||
return {"request_id": request_id, "action": "approved"}
|
||
|
||
elif action == "deny":
|
||
self.record_response(request_id, "denied", approved=False)
|
||
return {"request_id": request_id, "action": "denied"}
|
||
|
||
elif action == "answer":
|
||
option_idx = parts[2] if len(parts) > 2 else "0"
|
||
self.record_response(request_id, f"option_{option_idx}")
|
||
return {"request_id": request_id, "action": f"option_{option_idx}"}
|
||
|
||
elif action == "info":
|
||
# Send detailed info about request
|
||
self._send_request_details(request_id)
|
||
return {"request_id": request_id, "action": "info_requested"}
|
||
|
||
return None
|
||
|
||
def _process_message(self, message: dict) -> Optional[dict]:
|
||
"""Process a text message."""
|
||
text = message.get("text", "")
|
||
chat_id = message.get("chat", {}).get("id")
|
||
user_id = str(message.get("from", {}).get("id", ""))
|
||
|
||
# Check for command responses
|
||
if text.startswith("/answer "):
|
||
parts = text.split(maxsplit=2)
|
||
if len(parts) >= 3:
|
||
request_id = parts[1]
|
||
answer = parts[2]
|
||
self.record_response(request_id, answer)
|
||
return {"request_id": request_id, "action": "answered", "response": answer}
|
||
|
||
# Skip bot commands
|
||
if text.startswith("/"):
|
||
return None
|
||
|
||
# Process regular text with Gemini
|
||
logger.info(f"Text message from {user_id}: {text[:50]}...")
|
||
result = self._process_text_with_gemini(text, chat_id)
|
||
return result
|
||
|
||
def _process_text_with_gemini(self, text: str, chat_id: int) -> Optional[dict]:
|
||
"""Process text message via LLM Proxy (centralized Gemini/Claude gateway)."""
|
||
try:
|
||
import urllib.request
|
||
import json as json_module
|
||
|
||
# Send typing indicator
|
||
self._send_chat_action(chat_id, "typing")
|
||
|
||
system_prompt = """You are Luzia, Bruno's AI assistant on the luz.uy server.
|
||
Respond helpfully and conversationally.
|
||
|
||
CAPABILITIES:
|
||
- Execute tasks on projects: musica, overbits, dss, librechat, admin
|
||
- Run server commands and check system status
|
||
- Answer questions and have conversations
|
||
|
||
RESPONSE GUIDELINES:
|
||
- Be concise and friendly
|
||
- Spanish or English based on user's language
|
||
- If user requests a task, acknowledge it and add:
|
||
🚀 **Command:** `luzia <project> <task description>`
|
||
- Keep responses under 500 characters when possible"""
|
||
|
||
# Use local LLM Proxy (OpenAI-compatible API)
|
||
payload = {
|
||
"model": "gemini-3-pro-preview",
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": text}
|
||
],
|
||
"max_tokens": 1024,
|
||
"temperature": 0.7
|
||
}
|
||
|
||
url = "http://127.0.0.1:11434/v1/chat/completions"
|
||
data = json_module.dumps(payload).encode()
|
||
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
||
|
||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
result = json_module.loads(resp.read())
|
||
|
||
if "choices" in result and result["choices"]:
|
||
response_text = result["choices"][0].get("message", {}).get("content", "")
|
||
if response_text:
|
||
self._send_telegram_message(response_text, chat_id=chat_id)
|
||
logger.info(f"LLM Proxy text response sent to chat {chat_id}")
|
||
|
||
# Check for Luzia command and auto-execute
|
||
luzia_cmd = self._extract_luzia_command(response_text)
|
||
if luzia_cmd:
|
||
self._execute_luzia_command(luzia_cmd, chat_id)
|
||
|
||
return {"action": "text_processed", "response": response_text}
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Text processing error: {e}")
|
||
self._send_telegram_message(f"❌ Error: {str(e)[:80]}", chat_id=chat_id)
|
||
return None
|
||
|
||
def _process_voice_message(self, message: dict) -> Optional[dict]:
|
||
"""Process a voice message using Gemini 3."""
|
||
voice = message.get("voice", {})
|
||
file_id = voice.get("file_id")
|
||
chat_id = message.get("chat", {}).get("id")
|
||
user_id = str(message.get("from", {}).get("id", ""))
|
||
duration = voice.get("duration", 0)
|
||
|
||
logger.info(f"Voice message from {user_id}: {duration}s")
|
||
|
||
# Process with Gemini
|
||
result = self._process_audio_with_gemini(file_id, chat_id, "voice")
|
||
return result
|
||
|
||
def _process_audio_message(self, message: dict) -> Optional[dict]:
|
||
"""Process an audio file using Gemini 3."""
|
||
audio = message.get("audio", {})
|
||
file_id = audio.get("file_id")
|
||
chat_id = message.get("chat", {}).get("id")
|
||
user_id = str(message.get("from", {}).get("id", ""))
|
||
file_name = audio.get("file_name", "audio")
|
||
|
||
logger.info(f"Audio file from {user_id}: {file_name}")
|
||
|
||
# Process with Gemini
|
||
result = self._process_audio_with_gemini(file_id, chat_id, "audio", file_name)
|
||
return result
|
||
|
||
def _process_audio_with_gemini(self, file_id: str, chat_id: int,
|
||
audio_type: str = "voice",
|
||
file_name: str = None) -> Optional[dict]:
|
||
"""Download and process audio with Gemini 3."""
|
||
try:
|
||
import base64
|
||
|
||
# Send typing indicator
|
||
self._send_chat_action(chat_id, "typing")
|
||
|
||
# Download audio file from Telegram
|
||
audio_data = self._download_telegram_file(file_id)
|
||
if not audio_data:
|
||
self._send_telegram_message("❌ Failed to download audio", chat_id=chat_id)
|
||
return None
|
||
|
||
# Get Gemini API key
|
||
api_key = self._get_gemini_api_key()
|
||
if not api_key:
|
||
self._send_telegram_message("❌ Gemini API not configured", chat_id=chat_id)
|
||
return None
|
||
|
||
# Prepare Gemini request
|
||
audio_b64 = base64.b64encode(audio_data).decode()
|
||
|
||
if audio_type == "voice":
|
||
prompt = """You are Luzia, Bruno's AI assistant on the luz.uy server.
|
||
Listen to this voice message and respond helpfully in a conversational manner.
|
||
|
||
CAPABILITIES:
|
||
- You can execute tasks on projects (musica, overbits, dss, librechat, admin)
|
||
- You can run server commands and check system status
|
||
- You can answer questions and have conversations
|
||
|
||
RESPONSE FORMAT:
|
||
📝 **Heard:** [brief transcription]
|
||
|
||
🤖 [your helpful, conversational response]
|
||
|
||
If the user requests a task (like "deploy musica", "check server status", "run tests on dss"):
|
||
- Acknowledge the request
|
||
- Add: 🚀 **Command:** `luzia <project> <task>` (the command that would execute this)
|
||
|
||
Keep responses concise and friendly. Spanish or English based on what user speaks."""
|
||
else:
|
||
prompt = f"""Analyze this audio file ({file_name or 'audio'}).
|
||
|
||
If it contains speech, transcribe and respond conversationally.
|
||
If it's music or other audio, describe what you hear.
|
||
|
||
Keep response concise and helpful."""
|
||
|
||
# Call Gemini API
|
||
import urllib.request
|
||
import json as json_module
|
||
|
||
payload = {
|
||
"contents": [{
|
||
"parts": [
|
||
{"inline_data": {"mime_type": "audio/ogg", "data": audio_b64}},
|
||
{"text": prompt}
|
||
]
|
||
}],
|
||
"generationConfig": {"temperature": 0.3, "maxOutputTokens": 2048}
|
||
}
|
||
|
||
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-3-pro-preview:generateContent?key={api_key}"
|
||
data = json_module.dumps(payload).encode()
|
||
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
||
|
||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||
result = json_module.loads(resp.read())
|
||
|
||
# Extract response
|
||
if "candidates" in result and result["candidates"]:
|
||
content = result["candidates"][0].get("content", {})
|
||
parts = content.get("parts", [])
|
||
if parts:
|
||
response_text = parts[0].get("text", "")
|
||
# Send response to Telegram
|
||
self._send_telegram_message(response_text, chat_id=chat_id)
|
||
logger.info(f"Gemini audio response sent to chat {chat_id}")
|
||
|
||
# Check for Luzia command in response and auto-execute
|
||
luzia_cmd = self._extract_luzia_command(response_text)
|
||
if luzia_cmd:
|
||
self._execute_luzia_command(luzia_cmd, chat_id)
|
||
|
||
return {"action": "voice_processed", "response": response_text}
|
||
|
||
self._send_telegram_message("❌ No response from Gemini", chat_id=chat_id)
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Audio processing error: {e}")
|
||
self._send_telegram_message(f"❌ Audio processing error: {str(e)[:100]}", chat_id=chat_id)
|
||
return None
|
||
|
||
def _download_telegram_file(self, file_id: str) -> Optional[bytes]:
|
||
"""Download a file from Telegram servers."""
|
||
import urllib.request
|
||
|
||
try:
|
||
# Get file path
|
||
url = f"https://api.telegram.org/bot{self.bot_token}/getFile"
|
||
data = json.dumps({"file_id": file_id}).encode()
|
||
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
||
|
||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||
result = json.loads(resp.read())
|
||
|
||
if not result.get("ok"):
|
||
return None
|
||
|
||
file_path = result["result"]["file_path"]
|
||
|
||
# Download file
|
||
download_url = f"https://api.telegram.org/file/bot{self.bot_token}/{file_path}"
|
||
with urllib.request.urlopen(download_url, timeout=30) as resp:
|
||
return resp.read()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to download Telegram file: {e}")
|
||
return None
|
||
|
||
def _get_gemini_api_key(self) -> Optional[str]:
|
||
"""Get Gemini API key from LLM proxy config or PAL MCP env."""
|
||
try:
|
||
# First try LLM Proxy config (preferred, centralized)
|
||
llm_proxy_env = Path("/opt/server-agents/config/llm-proxy.env")
|
||
if llm_proxy_env.exists():
|
||
for line in llm_proxy_env.read_text().split("\n"):
|
||
if line.startswith("GEMINI_API_KEY=") and "=" in line:
|
||
key = line.split("=", 1)[1].strip().strip('"\'')
|
||
if key:
|
||
return key
|
||
|
||
# Fallback to PAL MCP env
|
||
pal_env = Path("/opt/pal-mcp-server/.env")
|
||
if pal_env.exists():
|
||
for line in pal_env.read_text().split("\n"):
|
||
if line.startswith("GEMINI_API_KEY="):
|
||
return line.split("=", 1)[1].strip().strip('"\'')
|
||
except Exception as e:
|
||
logger.error(f"Failed to load Gemini API key: {e}")
|
||
return None
|
||
|
||
def _send_chat_action(self, chat_id: int, action: str = "typing"):
|
||
"""Send chat action (typing indicator)."""
|
||
import urllib.request
|
||
|
||
url = f"https://api.telegram.org/bot{self.bot_token}/sendChatAction"
|
||
data = json.dumps({"chat_id": chat_id, "action": action}).encode()
|
||
|
||
try:
|
||
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
||
urllib.request.urlopen(req, timeout=5)
|
||
except:
|
||
pass
|
||
|
||
def _extract_luzia_command(self, response_text: str) -> Optional[str]:
|
||
"""Extract luzia command from Gemini response if present."""
|
||
import re
|
||
# Look for pattern: `luzia <project> <task>`
|
||
match = re.search(r'`luzia\s+(\w+)\s+([^`]+)`', response_text)
|
||
if match:
|
||
project = match.group(1)
|
||
task = match.group(2).strip()
|
||
return f"{project} {task}"
|
||
return None
|
||
|
||
def _execute_luzia_command(self, command: str, chat_id: int):
|
||
"""Execute a luzia command and notify via Telegram."""
|
||
import subprocess
|
||
|
||
try:
|
||
# Parse command
|
||
parts = command.split(maxsplit=1)
|
||
if len(parts) < 2:
|
||
return
|
||
|
||
project, task = parts[0], parts[1]
|
||
|
||
# Validate project (security)
|
||
valid_projects = ["musica", "overbits", "dss", "librechat", "admin", "assistant"]
|
||
if project not in valid_projects:
|
||
self._send_telegram_message(f"⚠️ Unknown project: {project}", chat_id=chat_id)
|
||
return
|
||
|
||
# Send confirmation
|
||
self._send_telegram_message(f"🚀 Executing: `luzia {project} {task[:50]}...`", chat_id=chat_id)
|
||
|
||
# Execute asynchronously (don't block)
|
||
subprocess.Popen(
|
||
["/opt/server-agents/orchestrator/bin/luzia", project, task],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
start_new_session=True
|
||
)
|
||
|
||
logger.info(f"Luzia command dispatched: {project} {task[:50]}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to execute luzia command: {e}")
|
||
self._send_telegram_message(f"❌ Command failed: {str(e)[:50]}", chat_id=chat_id)
|
||
|
||
def _answer_callback(self, callback_id: str, text: str):
|
||
"""Answer a callback query."""
|
||
import urllib.request
|
||
|
||
url = f"https://api.telegram.org/bot{self.bot_token}/answerCallbackQuery"
|
||
data = json.dumps({
|
||
"callback_query_id": callback_id,
|
||
"text": text
|
||
}).encode('utf-8')
|
||
|
||
try:
|
||
req = urllib.request.Request(
|
||
url, data=data,
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
urllib.request.urlopen(req, timeout=5)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to answer callback: {e}")
|
||
|
||
def _send_request_details(self, request_id: str):
|
||
"""Send detailed info about a request (triggered by info button)."""
|
||
request_file = self.pending_dir / f"{request_id}.json"
|
||
if not request_file.exists():
|
||
self._send_telegram_message(f"❓ Request `{request_id}` not found")
|
||
return
|
||
|
||
try:
|
||
data = json.loads(request_file.read_text())
|
||
except:
|
||
self._send_telegram_message(f"❓ Error reading request")
|
||
return
|
||
|
||
# Build detailed info message
|
||
msg = f"📋 *Request Details*\n\n"
|
||
msg += f"*ID:* `{data.get('request_id', 'unknown')}`\n"
|
||
msg += f"*Type:* {data.get('request_type', 'unknown')}\n"
|
||
msg += f"*Project:* {data.get('project', 'unknown')}\n"
|
||
if data.get('job_id'):
|
||
msg += f"*Job:* `{data['job_id']}`\n"
|
||
msg += f"*Status:* {data.get('status', 'unknown')}\n"
|
||
msg += f"*Created:* {data.get('created_at', 'unknown')[:19]}\n\n"
|
||
|
||
msg += f"*Full Message:*\n{data.get('message', 'N/A')}\n"
|
||
|
||
if data.get('context'):
|
||
msg += f"\n*Context:*\n{data['context']}\n"
|
||
|
||
if data.get('options'):
|
||
msg += f"\n*Options:*\n"
|
||
for i, opt in enumerate(data['options']):
|
||
msg += f" {i+1}. {opt}\n"
|
||
|
||
if data.get('response'):
|
||
msg += f"\n*Response:* {data['response']}\n"
|
||
msg += f"*Responded:* {data.get('responded_at', 'unknown')[:19]}\n"
|
||
|
||
self._send_telegram_message(msg)
|
||
|
||
|
||
# Convenience functions for use from luzia CLI
|
||
_bridge_instance = None
|
||
|
||
|
||
def get_bridge() -> TelegramBridge:
|
||
"""Get or create singleton bridge instance."""
|
||
global _bridge_instance
|
||
if _bridge_instance is None:
|
||
_bridge_instance = TelegramBridge()
|
||
return _bridge_instance
|
||
|
||
|
||
def notify_bruno(message: str, project: str = "luzia",
|
||
severity: str = "info", job_id: str = None) -> bool:
|
||
"""Send notification to Bruno."""
|
||
return get_bridge().send_notification(message, project, job_id, severity)
|
||
|
||
|
||
def ask_bruno(question: str, project: str = "luzia",
|
||
context: str = "", options: list = None) -> Tuple[str, bool]:
|
||
"""Ask Bruno a question."""
|
||
return get_bridge().ask_question(question, project, context, options=options)
|
||
|
||
|
||
def request_approval_from_bruno(action: str, project: str,
|
||
context: str = "") -> Tuple[str, bool]:
|
||
"""Request approval from Bruno."""
|
||
return get_bridge().request_approval(action, project, context)
|
||
|
||
|
||
def poll_for_responses(timeout: int = 5) -> list:
|
||
"""Poll Telegram for button responses."""
|
||
return get_bridge().poll_responses(timeout)
|
||
|
||
|
||
# CLI for testing
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
bridge = TelegramBridge()
|
||
|
||
if len(sys.argv) > 1:
|
||
cmd = sys.argv[1]
|
||
|
||
if cmd == "notify":
|
||
msg = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else "Test notification from Luzia"
|
||
success = bridge.send_notification(msg, "test")
|
||
print(f"Notification sent: {success}")
|
||
|
||
elif cmd == "ask":
|
||
question = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else "Test question?"
|
||
req_id, success = bridge.ask_question(question, "test")
|
||
print(f"Question sent: {success}, request_id: {req_id}")
|
||
|
||
elif cmd == "approve":
|
||
action = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else "Test action"
|
||
req_id, success = bridge.request_approval(action, "test")
|
||
print(f"Approval request sent: {success}, request_id: {req_id}")
|
||
|
||
elif cmd == "pending":
|
||
requests = bridge.get_pending_requests()
|
||
print(f"Pending requests: {len(requests)}")
|
||
for req in requests:
|
||
print(f" [{req.request_type}] {req.request_id}: {req.message[:50]}...")
|
||
|
||
elif cmd == "check":
|
||
req_id = sys.argv[2] if len(sys.argv) > 2 else None
|
||
if req_id:
|
||
req = bridge.check_response(req_id)
|
||
if req:
|
||
print(f"Status: {req.status}")
|
||
print(f"Response: {req.response}")
|
||
else:
|
||
print("Request not found")
|
||
|
||
elif cmd == "poll":
|
||
timeout = int(sys.argv[2]) if len(sys.argv) > 2 else 5
|
||
print(f"Polling for responses (timeout: {timeout}s)...")
|
||
responses = bridge.poll_responses(timeout)
|
||
if responses:
|
||
print(f"Got {len(responses)} response(s):")
|
||
for r in responses:
|
||
print(f" {r['request_id']}: {r['action']}")
|
||
else:
|
||
print("No new responses")
|
||
|
||
else:
|
||
print("Usage:")
|
||
print(" telegram_bridge.py notify <message>")
|
||
print(" telegram_bridge.py ask <question>")
|
||
print(" telegram_bridge.py approve <action>")
|
||
print(" telegram_bridge.py pending")
|
||
print(" telegram_bridge.py check <request_id>")
|
||
print(" telegram_bridge.py poll [timeout_seconds]")
|