#!/usr/bin/env python3 """ Telegram Webhook Handler for Luzia Handles incoming callback queries (button clicks), text messages, and voice/audio messages from Bruno. Voice messages are transcribed and processed using Gemini 3. Runs as a simple HTTP server that receives Telegram webhook updates. """ import json import logging import os import tempfile import base64 from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path from datetime import datetime from typing import Optional, Tuple import urllib.request logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # Configuration WEBHOOK_PORT = int(os.environ.get("LUZIA_WEBHOOK_PORT", "8765")) BOT_TOKEN_PATH = "/etc/telegram-bot/token" PENDING_REQUESTS_DIR = Path("/var/lib/luzia/telegram_requests") AUDIO_CACHE_DIR = Path("/var/lib/luzia/audio_cache") AUTHORIZED_USER_ID = "50639169" # Bruno's Telegram user ID # Gemini configuration for audio processing GEMINI_MODEL = "gemini-2.0-flash" # Supports audio input class TelegramWebhookHandler(BaseHTTPRequestHandler): """Handle incoming Telegram webhook updates.""" def log_message(self, format, *args): """Custom logging.""" logger.info(f"Webhook: {args[0]}") def do_POST(self): """Handle POST request from Telegram.""" content_length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_length) try: update = json.loads(body.decode('utf-8')) self._process_update(update) self._send_response(200, {"ok": True}) except Exception as e: logger.error(f"Error processing update: {e}") self._send_response(500, {"ok": False, "error": str(e)}) def _send_response(self, status: int, data: dict): """Send JSON response.""" self.send_response(status) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(data).encode()) def _process_update(self, update: dict): """Process a Telegram update.""" # Handle callback query (button click) if "callback_query" in update: self._handle_callback_query(update["callback_query"]) # Handle voice message elif "message" in update and "voice" in update["message"]: self._handle_voice_message(update["message"]) # Handle audio file elif "message" in update and "audio" in update["message"]: self._handle_audio_message(update["message"]) # Handle text message elif "message" in update and "text" in update["message"]: self._handle_message(update["message"]) def _handle_voice_message(self, message: dict): """Handle voice message - transcribe and process with Gemini.""" user_id = str(message.get("from", {}).get("id", "")) chat_id = message.get("chat", {}).get("id") voice = message.get("voice", {}) file_id = voice.get("file_id") duration = voice.get("duration", 0) logger.info(f"Voice message from {user_id}: {duration}s, file_id={file_id[:20]}...") if user_id != AUTHORIZED_USER_ID: return # Send typing indicator self._send_chat_action(chat_id, "typing") # Download and process audio try: audio_path = self._download_telegram_file(file_id) if not audio_path: self._send_reply(chat_id, "❌ Failed to download voice message") return # Process with Gemini response = self._process_audio_with_gemini(audio_path, "voice") # Send response if response: self._send_reply(chat_id, response) else: self._send_reply(chat_id, "❌ Failed to process voice message") # Cleanup Path(audio_path).unlink(missing_ok=True) except Exception as e: logger.error(f"Error processing voice message: {e}") self._send_reply(chat_id, f"❌ Error: {str(e)[:100]}") def _handle_audio_message(self, message: dict): """Handle audio file - transcribe and process with Gemini.""" user_id = str(message.get("from", {}).get("id", "")) chat_id = message.get("chat", {}).get("id") audio = message.get("audio", {}) file_id = audio.get("file_id") file_name = audio.get("file_name", "audio") duration = audio.get("duration", 0) logger.info(f"Audio file from {user_id}: {file_name}, {duration}s") if user_id != AUTHORIZED_USER_ID: return # Send typing indicator self._send_chat_action(chat_id, "typing") # Download and process audio try: audio_path = self._download_telegram_file(file_id) if not audio_path: self._send_reply(chat_id, "❌ Failed to download audio file") return # Process with Gemini response = self._process_audio_with_gemini(audio_path, "audio", file_name) # Send response if response: self._send_reply(chat_id, response) else: self._send_reply(chat_id, "❌ Failed to process audio file") # Cleanup Path(audio_path).unlink(missing_ok=True) except Exception as e: logger.error(f"Error processing audio file: {e}") self._send_reply(chat_id, f"❌ Error: {str(e)[:100]}") def _download_telegram_file(self, file_id: str) -> Optional[str]: """Download a file from Telegram servers.""" bot_token = self._get_bot_token() if not bot_token: return None try: # Get file path from Telegram url = f"https://api.telegram.org/bot{bot_token}/getFile" data = json.dumps({"file_id": file_id}).encode() req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read()) if not result.get("ok"): return None file_path = result["result"]["file_path"] # Download the file download_url = f"https://api.telegram.org/file/bot{bot_token}/{file_path}" # Create cache directory AUDIO_CACHE_DIR.mkdir(parents=True, exist_ok=True) # Save to temp file ext = Path(file_path).suffix or ".ogg" local_path = AUDIO_CACHE_DIR / f"{file_id[:16]}{ext}" urllib.request.urlretrieve(download_url, local_path) logger.info(f"Downloaded audio to {local_path}") return str(local_path) except Exception as e: logger.error(f"Failed to download file: {e}") return None def _process_audio_with_gemini(self, audio_path: str, audio_type: str = "voice", file_name: str = None) -> Optional[str]: """Process audio with Gemini 3 for transcription and response.""" try: import google.generativeai as genai # Configure Gemini - check multiple sources for API key api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") if not api_key: # Try to load from PAL MCP .env file pal_env = Path("/opt/pal-mcp-server/.env") if pal_env.exists(): for line in pal_env.read_text().split("\n"): if line.startswith("GEMINI_API_KEY="): api_key = line.split("=", 1)[1].strip().strip('"\'') break if not api_key: # Try shared credentials creds_file = Path("/etc/shared-ai-credentials/gemini/api_key.txt") if creds_file.exists(): api_key = creds_file.read_text().strip() if not api_key: logger.error("No Google API key found") return "❌ Gemini API not configured" genai.configure(api_key=api_key) # Read audio file with open(audio_path, "rb") as f: audio_data = f.read() # Determine MIME type ext = Path(audio_path).suffix.lower() mime_types = { ".ogg": "audio/ogg", ".mp3": "audio/mp3", ".wav": "audio/wav", ".m4a": "audio/mp4", ".oga": "audio/ogg", } mime_type = mime_types.get(ext, "audio/ogg") # Create the model model = genai.GenerativeModel(GEMINI_MODEL) # Create audio part audio_part = { "inline_data": { "mime_type": mime_type, "data": base64.b64encode(audio_data).decode() } } # Prompt for the model if audio_type == "voice": prompt = """You are Bruno's AI assistant (Luzia). Listen to this voice message and respond helpfully. First, transcribe what was said, then provide a helpful response. Format: 📝 **Transcription:** [what was said] 🤖 **Response:** [your helpful response] If this is a command or task request, acknowledge it and explain what actions would be needed.""" else: prompt = f"""Analyze this audio file ({file_name or 'audio'}). If it contains speech, transcribe it. If it's music or other audio, describe what you hear. Provide any relevant insights or actions that might be helpful.""" # Generate response response = model.generate_content([prompt, audio_part]) if response and response.text: logger.info(f"Gemini processed audio successfully") return response.text else: return "❌ No response from Gemini" except ImportError: logger.error("google-generativeai not installed") return "❌ Gemini SDK not installed. Run: pip install google-generativeai" except Exception as e: logger.error(f"Gemini processing failed: {e}") return f"❌ Gemini error: {str(e)[:200]}" def _send_chat_action(self, chat_id: int, action: str = "typing"): """Send chat action (typing indicator).""" bot_token = self._get_bot_token() if not bot_token: return url = f"https://api.telegram.org/bot{bot_token}/sendChatAction" data = json.dumps({ "chat_id": chat_id, "action": action }).encode() try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) urllib.request.urlopen(req, timeout=5) except Exception as e: logger.debug(f"Failed to send chat action: {e}") def _handle_callback_query(self, query: dict): """Handle inline keyboard button click.""" callback_id = query.get("id") user_id = str(query.get("from", {}).get("id", "")) data = query.get("data", "") logger.info(f"Callback query from {user_id}: {data}") # Verify user authorization if user_id != AUTHORIZED_USER_ID: self._answer_callback(callback_id, "Unauthorized") return # Parse callback data: action:request_id[:option_index] parts = data.split(":") if len(parts) < 2: self._answer_callback(callback_id, "Invalid callback data") return action = parts[0] request_id = parts[1] option_index = parts[2] if len(parts) > 2 else None # Process action if action == "approve": self._record_response(request_id, "approved", approved=True) self._answer_callback(callback_id, "✅ Approved!") self._notify_luzia(request_id, "approved") elif action == "deny": self._record_response(request_id, "denied", approved=False) self._answer_callback(callback_id, "❌ Denied") self._notify_luzia(request_id, "denied") elif action == "answer": # Get the original options from the request request = self._load_request(request_id) if request and option_index: # For now, record the option index as the response self._record_response(request_id, f"option_{option_index}") self._answer_callback(callback_id, "✅ Answer recorded") self._notify_luzia(request_id, f"option_{option_index}") else: self._answer_callback(callback_id, "Request not found") elif action == "custom": self._answer_callback(callback_id, "Reply to this message with your answer") # Set a flag to expect a text reply self._set_expecting_reply(request_id) else: self._answer_callback(callback_id, "Unknown action") def _handle_message(self, message: dict): """Handle text message from Bruno.""" user_id = str(message.get("from", {}).get("id", "")) text = message.get("text", "") chat_id = message.get("chat", {}).get("id") logger.info(f"Message from {user_id}: {text[:50]}...") if user_id != AUTHORIZED_USER_ID: return # Check for command responses if text.startswith("/approve "): request_id = text.split()[1] if len(text.split()) > 1 else None if request_id: self._record_response(request_id, "approved", approved=True) self._send_reply(chat_id, f"✅ Request {request_id} approved") self._notify_luzia(request_id, "approved") elif text.startswith("/deny "): parts = text.split(maxsplit=2) request_id = parts[1] if len(parts) > 1 else None reason = parts[2] if len(parts) > 2 else "No reason given" if request_id: self._record_response(request_id, f"denied: {reason}", approved=False) self._send_reply(chat_id, f"❌ Request {request_id} denied") self._notify_luzia(request_id, f"denied: {reason}") elif text.startswith("/answer "): parts = text.split(maxsplit=2) request_id = parts[1] if len(parts) > 1 else None answer = parts[2] if len(parts) > 2 else "" if request_id and answer: self._record_response(request_id, answer) self._send_reply(chat_id, f"✅ Answer recorded for {request_id}") self._notify_luzia(request_id, answer) elif text.startswith("/pending"): pending = self._get_pending_requests() if pending: msg = "*Pending Requests:*\n\n" for req in pending[:10]: msg += f"• `{req['request_id']}` [{req['request_type']}]\n" msg += f" {req['message'][:50]}...\n\n" self._send_reply(chat_id, msg) else: self._send_reply(chat_id, "No pending requests") elif text.startswith("/help"): help_text = """*Luzia Telegram Commands:* /approve - Approve a request /deny - Deny a request /answer - Answer a question /pending - Show pending requests /help - Show this help""" self._send_reply(chat_id, help_text) # Check if we're expecting a custom reply else: expecting = self._check_expecting_reply() if expecting: self._record_response(expecting, text) self._send_reply(chat_id, f"✅ Answer recorded") self._notify_luzia(expecting, text) self._clear_expecting_reply() def _answer_callback(self, callback_id: str, text: str): """Answer a callback query (acknowledge button click).""" bot_token = self._get_bot_token() if not bot_token: return url = f"https://api.telegram.org/bot{bot_token}/answerCallbackQuery" data = json.dumps({ "callback_query_id": callback_id, "text": text, "show_alert": False }).encode() try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) urllib.request.urlopen(req, timeout=5) except Exception as e: logger.error(f"Failed to answer callback: {e}") def _send_reply(self, chat_id: int, text: str): """Send a text reply.""" bot_token = self._get_bot_token() if not bot_token: return url = f"https://api.telegram.org/bot{bot_token}/sendMessage" data = json.dumps({ "chat_id": chat_id, "text": text, "parse_mode": "Markdown" }).encode() try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) urllib.request.urlopen(req, timeout=5) except Exception as e: logger.error(f"Failed to send reply: {e}") def _get_bot_token(self) -> Optional[str]: """Get bot token from file.""" try: return Path(BOT_TOKEN_PATH).read_text().strip() except Exception: return None def _load_request(self, request_id: str) -> Optional[dict]: """Load a pending request.""" request_file = PENDING_REQUESTS_DIR / f"{request_id}.json" if request_file.exists(): try: return json.loads(request_file.read_text()) except Exception: pass return None def _record_response(self, request_id: str, response: str, approved: bool = None): """Record a response to a pending request.""" request_file = PENDING_REQUESTS_DIR / f"{request_id}.json" if not request_file.exists(): logger.warning(f"Request not found: {request_id}") return try: data = json.loads(request_file.read_text()) data["response"] = response data["responded_at"] = datetime.now().isoformat() data["status"] = "approved" if approved == True else "denied" if approved == False else "responded" request_file.write_text(json.dumps(data, indent=2)) logger.info(f"Response recorded for {request_id}: {response[:50]}") except Exception as e: logger.error(f"Failed to record response: {e}") def _notify_luzia(self, request_id: str, response: str): """Notify Luzia that a response was received.""" # Write to a notification file that Luzia can watch notify_file = Path("/var/lib/luzia/telegram_responses.log") try: with open(notify_file, "a") as f: f.write(f"{datetime.now().isoformat()}|{request_id}|{response}\n") except Exception as e: logger.error(f"Failed to notify luzia: {e}") def _get_pending_requests(self) -> list: """Get list of pending requests.""" pending = [] for req_file in PENDING_REQUESTS_DIR.glob("*.json"): try: data = json.loads(req_file.read_text()) if data.get("status") == "pending": pending.append(data) except Exception: continue return sorted(pending, key=lambda r: r.get("created_at", ""), reverse=True) def _set_expecting_reply(self, request_id: str): """Set a flag that we're expecting a text reply.""" flag_file = PENDING_REQUESTS_DIR / ".expecting_reply" flag_file.write_text(request_id) def _check_expecting_reply(self) -> Optional[str]: """Check if we're expecting a text reply.""" flag_file = PENDING_REQUESTS_DIR / ".expecting_reply" if flag_file.exists(): return flag_file.read_text().strip() return None def _clear_expecting_reply(self): """Clear the expecting reply flag.""" flag_file = PENDING_REQUESTS_DIR / ".expecting_reply" if flag_file.exists(): flag_file.unlink() def run_webhook_server(port: int = WEBHOOK_PORT): """Run the webhook server.""" PENDING_REQUESTS_DIR.mkdir(parents=True, exist_ok=True) server = HTTPServer(('0.0.0.0', port), TelegramWebhookHandler) logger.info(f"Luzia Telegram webhook server starting on port {port}") logger.info(f"Set webhook URL to: https://your-domain.com/telegram-webhook") try: server.serve_forever() except KeyboardInterrupt: logger.info("Shutting down webhook server") server.shutdown() if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "--setup": # Setup webhook with Telegram bot_token = Path(BOT_TOKEN_PATH).read_text().strip() webhook_url = sys.argv[2] if len(sys.argv) > 2 else None if not webhook_url: print("Usage: telegram_webhook.py --setup ") print("Example: telegram_webhook.py --setup https://luz.uy/luzia-webhook") sys.exit(1) url = f"https://api.telegram.org/bot{bot_token}/setWebhook" data = json.dumps({"url": webhook_url}).encode() try: req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req) as response: result = json.loads(response.read()) print(f"Webhook setup: {result}") except Exception as e: print(f"Failed to setup webhook: {e}") sys.exit(1) else: run_webhook_server()