# ============================================================ # analyzer_agent_gradio/app.py — Telegram Analyzer Agent (Gradio/Scheduler Only) # Uses Replit Proxy for Telegram API access. # ============================================================ import huggingface_hub import os import json import asyncio from datetime import datetime import logging import threading from functools import wraps from typing import List, Dict, Any, Optional import time # <<< إضافة مكتبة time للـ loop import requests # <<< إضافة مكتبة requests للاتصال بالجسر import gradio as gr from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # LLM & minimal telegram dependencies import torch from transformers import AutoTokenizer, AutoModelForCausalLM import telegram # لا نحتاج لـ telegram.ext هنا لأننا أزلنا الـ Handlers from llama_cpp import Llama from huggingface_hub import hf_hub_download # ---------------- Logging ---------------- logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger("analyzer") # ---------------- Env & config ---------------- TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN") TG_CHANNEL = os.getenv("TG_CHANNEL") # المتغير الجديد: رابط تطبيق الجسر على Replit REPLIT_PROXY_URL = os.getenv("REPLIT_PROXY_URL") LOG_PATH = os.getenv("ANALYZER_LOG", "analyzer_log.json") POSTS_LIMIT = int(os.getenv("ANALYZER_LIMIT", "80")) MAMBA_MODEL_PATH = os.getenv("MAMBA_MODEL_PATH", "state-spaces/mamba-1.4b-hf") # ---------------- Initialization Check ---------------- if not all([TG_BOT_TOKEN, TG_CHANNEL, REPLIT_PROXY_URL]): log.error("Telegram Bot Token, Channel ID, or Replit Proxy URL are missing in environment variables.") IS_SERVICE_READY = False STATUS_MESSAGE = "❌ النظام غير جاهز: بيانات اعتماد البوت، معرف القناة، أو رابط الجسر مفقودة." else: IS_SERVICE_READY = True STATUS_MESSAGE = "✅ النظام جاهز للتحليل (Gradio & Scheduler)." # ---------------- Helpers for Non-Async Blocking Operations ---------------- def async_wrap_blocking(func): """Wraps a synchronous function to be run in a separate thread.""" @wraps(func) async def wrapper(*args, **kwargs): return await asyncio.to_thread(func, *args, **kwargs) return wrapper # ---------------- Load Mamba ---------------- log.info("Loading Mamba model...") try: mamba_tok = AutoTokenizer.from_pretrained(MAMBA_MODEL_PATH) mamba_model = AutoModelForCausalLM.from_pretrained( MAMBA_MODEL_PATH, torch_dtype=torch.float16, low_cpu_mem_usage=True ) except Exception as e: log.error(f"Failed to load Mamba model: {e}") mamba_tok, mamba_model = None, None IS_SERVICE_READY = False STATUS_MESSAGE = "❌ فشل تحميل نموذج Mamba." # ---------------- Load GGUF LLM (Zephyr 7B) ---------------- log.info("Loading LLM interpreter (GGUF + llama.cpp)...") llm: Optional[Llama] = None try: LLM_GGUF_REPO = "TheBloke/zephyr-7B-beta-GGUF" LLM_GGUF_FILE = "zephyr-7b-beta.Q6_K.gguf" LLM_LOCAL_PATH = os.getenv("LLM_GGUF_PATH", f"./{LLM_GGUF_FILE}") if not os.path.exists(LLM_LOCAL_PATH): log.info("Downloading GGUF model from HuggingFace...") LLM_LOCAL_PATH = hf_hub_download( repo_id=LLM_GGUF_REPO, filename=LLM_GGUF_FILE ) llm = Llama( model_path=LLM_LOCAL_PATH, n_ctx=4096, n_threads=4, n_gpu_layers=0 ) log.info("GGUF model loaded successfully.") except Exception as e: log.error(f"Failed to load GGUF model: {e}") IS_SERVICE_READY = False STATUS_MESSAGE = "❌ فشل تحميل نموذج GGUF." # ---------------- Core Helpers ---------------- def save_log(entry: Dict[str, Any]): """Saves the analysis entry to a JSON log file.""" logs = [] if os.path.exists(LOG_PATH): try: with open(LOG_PATH, "r", encoding="utf-8") as f: logs = json.load(f) except Exception: logs = [] logs.insert(0, entry) with open(LOG_PATH, "w", encoding="utf-8") as f: json.dump(logs, f, ensure_ascii=False, indent=2) def encode_stats_for_mamba(posts: List[Dict[str, Any]]) -> str: """Encodes the list of posts into a single string for Mamba analysis.""" lines = [] for p in posts: if p.get('chat_id'): line = ( f"Channel Name:{p.get('title')} | Members:{p.get('members')} | " f"Admins:{p.get('admins_count')}" ) lines.append(line) break if not lines: return "No sufficient data for sequential analysis. Analyzing overall channel status." return "\n".join(lines) @async_wrap_blocking def run_mamba(text: str) -> str: """Synchronous Mamba generation function.""" if mamba_model is None or mamba_tok is None: return "Error: Mamba model not loaded." inp = mamba_tok(text, return_tensors="pt") with torch.no_grad(): out = mamba_model.generate(**inp, max_new_tokens=64, do_sample=False) return mamba_tok.decode(out[0], skip_special_tokens=True) @async_wrap_blocking def interpret_with_llm(mamba_output: str) -> str: """Synchronous LLM interpretation function using llama.cpp.""" if llm is None: return "Error: LLM model not loaded." prompt = ( "هذه نتائج تحليل إحصائي لقناة تلغرام (معلومات عامة فقط):\n" f"{mamba_output}\n\n" "حلل أداء القناة الأساسي (عدد الأعضاء والمشرفين).\n" "استخرج:\n" "- نقاط القوة (مثل عدد الأعضاء)\n" "- نقاط الضعف (مثل نقص البيانات الزمنية أو التفاعلات)\n" "- استراتيجيات لزيادة الاشتراكات والتفاعل\n" "اكتب التحليل بالعربية وبشكل مرتب ومختصر، مع ذكر القيود على البيانات المتاحة." ) res = llm( prompt, max_tokens=250, temperature=0.3, top_p=0.95 ) return res["choices"][0]["text"].strip() # ---------------- FETCH TELEGRAM STATS (VIA REPLIT PROXY) ---------------- async def fetch_telegram_stats(limit: int = POSTS_LIMIT) -> List[Dict[str, Any]]: """ Fetches general channel statistics by routing requests through the Replit Proxy. """ if not IS_SERVICE_READY or not REPLIT_PROXY_URL: raise RuntimeError(STATUS_MESSAGE) async def fetch_via_proxy(method: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Sends a request to the Replit proxy and returns the Telegram response result.""" url = f"{REPLIT_PROXY_URL}/route_telegram/{method}" try: # نستخدم to_thread لجعل طلب requests متزامن (Blocking) يعمل في خيط منفصل response = await asyncio.to_thread( requests.post, url, json=data if data is not None else {}, timeout=200 ) response.raise_for_status() json_response = response.json() if not json_response.get('ok'): # يتم إلقاء خطأ Telegram API إذا كان الرد سلبياً error_description = json_response.get('description', 'Unknown API Error') raise requests.exceptions.HTTPError( f"Telegram API Error via Proxy: {error_description}", response=response ) return json_response['result'] except requests.exceptions.HTTPError as e: error_details = str(e) log.error(f"Proxy/Telegram HTTP Error during {method}: {error_details}") try: error_data = e.response.json() error_msg = error_data.get("description", error_details) except: error_msg = error_details if 'unauthorized' in error_msg.lower() or 'forbidden' in error_msg.lower(): raise RuntimeError(f"فشل مصادقة Telegram (عبر الجسر): {error_msg}. تأكد من صحة TG_BOT_TOKEN في الجسر.") else: raise RuntimeError(f"خطأ في جلب البيانات عبر الجسر ({method}): {error_msg}.") except Exception as e: log.error(f"An unexpected error occurred during proxy fetch: {e}") raise RuntimeError(f"خطأ غير متوقع في الاتصال بالجسر: {e}.") # --- 1. جلب معلومات القناة الأساسية (getChat) --- chat_info_data = await fetch_via_proxy( "getChat", data={"chat_id": TG_CHANNEL} ) # --- 2. جلب المشرفين (getChatAdministrators) --- admins_list = await fetch_via_proxy( "getChatAdministrators", data={"chat_id": TG_CHANNEL} ) admins_count = len(admins_list) # تحويل البيانات إلى تنسيق موحد للتحليل stats = { "chat_id": chat_info_data.get('id'), "title": chat_info_data.get('title'), "members": chat_info_data.get('members_count', 'N/A'), "admins_count": admins_count, "description": chat_info_data.get('description'), "date": datetime.utcnow().isoformat(), } log.info(f"Successfully fetched stats (via proxy) for channel {stats['title']} with {stats['members']} members.") return [stats] # ---------------- Main Analysis Pipeline ---------------- async def run_analysis_pipeline(limit: int = POSTS_LIMIT) -> Dict[str, Any]: """Runs the full analysis pipeline.""" log.info("Running analysis job...") # 1. جلب البيانات try: posts = await fetch_telegram_stats(limit=limit) except RuntimeError as e: log.warning(f"Failed to fetch stats: {e}") error_detail = str(e) entry = {"time": datetime.utcnow().isoformat(), "error": f"fetch_error: {error_detail}"} save_log(entry) return {"status": "error", "message": error_detail, "log_entry": entry} if not posts: log.warning("No channel stats found for analysis.") entry = {"time": datetime.utcnow().isoformat(), "error": "no_stats"} save_log(entry) return {"status": "warning", "message": "لم يتم العثور على إحصائيات للقناة.", "log_entry": entry} # 2. تشغيل Mamba stats_text = encode_stats_for_mamba(posts) mamba_out = await run_mamba(stats_text) # 3. تفسير LLM interpretation = await interpret_with_llm(mamba_out) entry = { "time": datetime.utcnow().isoformat(), "posts_count": 1, "channel_title": posts[0].get('title'), "channel_members": posts[0].get('members'), "advice": interpretation } save_log(entry) log.info("Analysis saved.") # تنسيق الخرج output_message = ( f"**✅ اكتمل التحليل بنجاح!**\n\n" f"**القناة:** {posts[0].get('title')}\n" f"**عدد الأعضاء:** {posts[0].get('members')}\n" f"**وقت التحليل:** {entry['time']}\n\n" f"--- **توصيات الذكاء الاصطناعي** ---\n" f"{interpretation}\n\n" f"--- **إخراج Mamba الخام** ---\n" f"`{mamba_out.strip()}`" ) return {"status": "success", "message": output_message, "log_entry": entry} # ---------------- Gradio Interface Functions ---------------- async def gradio_run_once(limit: int) -> str: """Gradio function to run the analysis manually.""" result = await run_analysis_pipeline(limit=limit) if result.get("status") == "success": return result["message"] else: return f"**⚠️ فشل التحليل:** {result['message']}" def gradio_get_logs() -> str: """Gradio function to display logs.""" logs = [] if os.path.exists(LOG_PATH): try: with open(LOG_PATH, "r", encoding="utf-8") as f: logs = json.load(f) except Exception as e: log.error(f"Error reading log file: {e}") return "حدث خطأ أثناء قراءة ملف السجلات." if not logs: return "لا توجد سجلات تحليل متاحة." output = "## 📄 سجلات التحليل الأخيرة\n" for i, entry in enumerate(logs): output += f"### {i+1}. تحليل بتاريخ {entry.get('time', 'N/A')}\n" if entry.get('error'): output += f"**❌ خطأ:** {entry['error']}\n" else: output += f"**✅ القناة:** {entry.get('channel_title', 'N/A')}\n" output += f"**💡 النصيحة:**\n```\n{entry.get('advice', 'N/A')[:300]}...\n```\n" output += "---\n" return output # ---------------- Scheduler ---------------- def daily_job_wrapper(): """Synchronous wrapper to run the async job in the scheduler.""" log.info("Running scheduled analysis job via Gradio wrapper...") try: # تشغيل الدالة async في حلقة الحدث الخاصة بها result = asyncio.run(run_analysis_pipeline()) log.info(f"Scheduled job completed. Status: {result.get('status')}") except Exception as e: log.error(f"Error in scheduled job: {e}") # ---------------- Gradio Interface Definition ---------------- # تم وضع التعريف هنا لاستخدامه لاحقاً في خيط منفصل with gr.Blocks(title="Telegram Channel Analyzer Agent") as demo: gr.Markdown("# 🤖 وكيل تحليل قناة Telegram (Gradio & Scheduler)") gr.Markdown(f"**حالة الخدمة:** {STATUS_MESSAGE}") gr.Markdown(f"**القناة المستهدفة:** `{TG_CHANNEL}`") gr.Markdown(f"**جسر Replit:** `{REPLIT_PROXY_URL}`") gr.Markdown("---") with gr.Tab("تشغيل التحليل يدوياً"): gr.Markdown("## ⚙️ تشغيل لمرة واحدة - جلب إحصائيات القناة الأساسية") limit_input = gr.Slider( minimum=10, maximum=300, step=10, value=POSTS_LIMIT, label="الحد الأقصى للمنشورات (غير مستخدم مع Bot API)" ) run_button = gr.Button("🚀 بدء تحليل معلومات القناة الآن") output_textbox = gr.Markdown(label="نتيجة التحليل") run_button.click( fn=gradio_run_once, inputs=[limit_input], outputs=[output_textbox] ) with gr.Tab("سجلات التحليل"): gr.Markdown("## 📋 سجلات التحليل المحفوظة") log_button = gr.Button("🔄 تحديث السجلات") log_output = gr.Markdown(label="السجلات") log_button.click( fn=gradio_get_logs, inputs=[], outputs=[log_output] ) demo.load( fn=gradio_get_logs, inputs=[], outputs=[log_output] ) # ---------------- Main Entry Point (Gradio/Scheduler) ---------------- if __name__ == "__main__": if not IS_SERVICE_READY: log.error("Service is not ready. Exiting.") exit(1) # <<< تم حذف كل ما يتعلق بـ Application و Handlers (Webhook/Polling) >>> # 1. تشغيل Gradio في خيط منفصل def start_gradio(): log.info("Starting Gradio Interface in a separate thread...") # استخدام المنفذ الذي يحدده المضيف (عادةً Render) PORT = int(os.environ.get("PORT", "7860")) try: demo.launch(server_name="0.0.0.0", server_port=PORT) except Exception as e: log.error(f"Failed to start Gradio: {e}") threading.Thread(target=start_gradio, daemon=True).start() # 2. بدء المجدول للمهمة الدورية scheduler.add_job(daily_job_wrapper, "cron", hour=21, minute=10) scheduler.start() log.info("Scheduler started for daily analysis.") # 3. إبقاء الخيط الرئيسي حياً لضمان استمرار عمل المجدول و Gradio try: log.info("Keeping main thread alive for Gradio and Scheduler...") # حلقة بسيطة لإبقاء البرنامج قيد التشغيل (بديل لـ run_polling) while True: time.sleep(1) except (KeyboardInterrupt, SystemExit): log.info("Exiting gracefully.") except Exception as e: log.error(f"Error in main loop: {e}") log.info("Analyzer Agent stopped.")