Spaces:
Sleeping
Sleeping
| # ============================================================ | |
| # analyzer_agent_gradio/app.py — Telegram Analyzer Agent (Gradio/Scheduler Only) | |
| # Uses Replit Proxy for Telegram API access. | |
| # ============================================================ | |
| import huggingface_hub | |
| import os | |
| import json | |
| import asyncio | |
| from datetime import datetime | |
| import logging | |
| import threading | |
| from functools import wraps | |
| from typing import List, Dict, Any, Optional | |
| import time # <<< إضافة مكتبة time للـ loop | |
| import requests # <<< إضافة مكتبة requests للاتصال بالجسر | |
| import gradio as gr | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| scheduler = BackgroundScheduler() | |
| # LLM & minimal telegram dependencies | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import telegram | |
| # لا نحتاج لـ telegram.ext هنا لأننا أزلنا الـ Handlers | |
| from llama_cpp import Llama | |
| from huggingface_hub import hf_hub_download | |
| # ---------------- Logging ---------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| log = logging.getLogger("analyzer") | |
| # ---------------- Env & config ---------------- | |
| TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN") | |
| TG_CHANNEL = os.getenv("TG_CHANNEL") | |
| # المتغير الجديد: رابط تطبيق الجسر على Replit | |
| REPLIT_PROXY_URL = os.getenv("REPLIT_PROXY_URL") | |
| LOG_PATH = os.getenv("ANALYZER_LOG", "analyzer_log.json") | |
| POSTS_LIMIT = int(os.getenv("ANALYZER_LIMIT", "80")) | |
| MAMBA_MODEL_PATH = os.getenv("MAMBA_MODEL_PATH", "state-spaces/mamba-1.4b-hf") | |
| # ---------------- Initialization Check ---------------- | |
| if not all([TG_BOT_TOKEN, TG_CHANNEL, REPLIT_PROXY_URL]): | |
| log.error("Telegram Bot Token, Channel ID, or Replit Proxy URL are missing in environment variables.") | |
| IS_SERVICE_READY = False | |
| STATUS_MESSAGE = "❌ النظام غير جاهز: بيانات اعتماد البوت، معرف القناة، أو رابط الجسر مفقودة." | |
| else: | |
| IS_SERVICE_READY = True | |
| STATUS_MESSAGE = "✅ النظام جاهز للتحليل (Gradio & Scheduler)." | |
| # ---------------- Helpers for Non-Async Blocking Operations ---------------- | |
| def async_wrap_blocking(func): | |
| """Wraps a synchronous function to be run in a separate thread.""" | |
| async def wrapper(*args, **kwargs): | |
| return await asyncio.to_thread(func, *args, **kwargs) | |
| return wrapper | |
| # ---------------- Load Mamba ---------------- | |
| log.info("Loading Mamba model...") | |
| try: | |
| mamba_tok = AutoTokenizer.from_pretrained(MAMBA_MODEL_PATH) | |
| mamba_model = AutoModelForCausalLM.from_pretrained( | |
| MAMBA_MODEL_PATH, | |
| torch_dtype=torch.float16, | |
| low_cpu_mem_usage=True | |
| ) | |
| except Exception as e: | |
| log.error(f"Failed to load Mamba model: {e}") | |
| mamba_tok, mamba_model = None, None | |
| IS_SERVICE_READY = False | |
| STATUS_MESSAGE = "❌ فشل تحميل نموذج Mamba." | |
| # ---------------- Load GGUF LLM (Zephyr 7B) ---------------- | |
| log.info("Loading LLM interpreter (GGUF + llama.cpp)...") | |
| llm: Optional[Llama] = None | |
| try: | |
| LLM_GGUF_REPO = "TheBloke/zephyr-7B-beta-GGUF" | |
| LLM_GGUF_FILE = "zephyr-7b-beta.Q6_K.gguf" | |
| LLM_LOCAL_PATH = os.getenv("LLM_GGUF_PATH", f"./{LLM_GGUF_FILE}") | |
| if not os.path.exists(LLM_LOCAL_PATH): | |
| log.info("Downloading GGUF model from HuggingFace...") | |
| LLM_LOCAL_PATH = hf_hub_download( | |
| repo_id=LLM_GGUF_REPO, | |
| filename=LLM_GGUF_FILE | |
| ) | |
| llm = Llama( | |
| model_path=LLM_LOCAL_PATH, | |
| n_ctx=4096, | |
| n_threads=4, | |
| n_gpu_layers=0 | |
| ) | |
| log.info("GGUF model loaded successfully.") | |
| except Exception as e: | |
| log.error(f"Failed to load GGUF model: {e}") | |
| IS_SERVICE_READY = False | |
| STATUS_MESSAGE = "❌ فشل تحميل نموذج GGUF." | |
| # ---------------- Core Helpers ---------------- | |
| def save_log(entry: Dict[str, Any]): | |
| """Saves the analysis entry to a JSON log file.""" | |
| logs = [] | |
| if os.path.exists(LOG_PATH): | |
| try: | |
| with open(LOG_PATH, "r", encoding="utf-8") as f: | |
| logs = json.load(f) | |
| except Exception: | |
| logs = [] | |
| logs.insert(0, entry) | |
| with open(LOG_PATH, "w", encoding="utf-8") as f: | |
| json.dump(logs, f, ensure_ascii=False, indent=2) | |
| def encode_stats_for_mamba(posts: List[Dict[str, Any]]) -> str: | |
| """Encodes the list of posts into a single string for Mamba analysis.""" | |
| lines = [] | |
| for p in posts: | |
| if p.get('chat_id'): | |
| line = ( | |
| f"Channel Name:{p.get('title')} | Members:{p.get('members')} | " | |
| f"Admins:{p.get('admins_count')}" | |
| ) | |
| lines.append(line) | |
| break | |
| if not lines: | |
| return "No sufficient data for sequential analysis. Analyzing overall channel status." | |
| return "\n".join(lines) | |
| def run_mamba(text: str) -> str: | |
| """Synchronous Mamba generation function.""" | |
| if mamba_model is None or mamba_tok is None: | |
| return "Error: Mamba model not loaded." | |
| inp = mamba_tok(text, return_tensors="pt") | |
| with torch.no_grad(): | |
| out = mamba_model.generate(**inp, max_new_tokens=64, do_sample=False) | |
| return mamba_tok.decode(out[0], skip_special_tokens=True) | |
| def interpret_with_llm(mamba_output: str) -> str: | |
| """Synchronous LLM interpretation function using llama.cpp.""" | |
| if llm is None: | |
| return "Error: LLM model not loaded." | |
| prompt = ( | |
| "هذه نتائج تحليل إحصائي لقناة تلغرام (معلومات عامة فقط):\n" | |
| f"{mamba_output}\n\n" | |
| "حلل أداء القناة الأساسي (عدد الأعضاء والمشرفين).\n" | |
| "استخرج:\n" | |
| "- نقاط القوة (مثل عدد الأعضاء)\n" | |
| "- نقاط الضعف (مثل نقص البيانات الزمنية أو التفاعلات)\n" | |
| "- استراتيجيات لزيادة الاشتراكات والتفاعل\n" | |
| "اكتب التحليل بالعربية وبشكل مرتب ومختصر، مع ذكر القيود على البيانات المتاحة." | |
| ) | |
| res = llm( | |
| prompt, | |
| max_tokens=250, | |
| temperature=0.3, | |
| top_p=0.95 | |
| ) | |
| return res["choices"][0]["text"].strip() | |
| # ---------------- FETCH TELEGRAM STATS (VIA REPLIT PROXY) ---------------- | |
| async def fetch_telegram_stats(limit: int = POSTS_LIMIT) -> List[Dict[str, Any]]: | |
| """ | |
| Fetches general channel statistics by routing requests through the Replit Proxy. | |
| """ | |
| if not IS_SERVICE_READY or not REPLIT_PROXY_URL: | |
| raise RuntimeError(STATUS_MESSAGE) | |
| async def fetch_via_proxy(method: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Sends a request to the Replit proxy and returns the Telegram response result.""" | |
| url = f"{REPLIT_PROXY_URL}/route_telegram/{method}" | |
| try: | |
| # نستخدم to_thread لجعل طلب requests متزامن (Blocking) يعمل في خيط منفصل | |
| response = await asyncio.to_thread( | |
| requests.post, | |
| url, | |
| json=data if data is not None else {}, | |
| timeout=200 | |
| ) | |
| response.raise_for_status() | |
| json_response = response.json() | |
| if not json_response.get('ok'): | |
| # يتم إلقاء خطأ Telegram API إذا كان الرد سلبياً | |
| error_description = json_response.get('description', 'Unknown API Error') | |
| raise requests.exceptions.HTTPError( | |
| f"Telegram API Error via Proxy: {error_description}", | |
| response=response | |
| ) | |
| return json_response['result'] | |
| except requests.exceptions.HTTPError as e: | |
| error_details = str(e) | |
| log.error(f"Proxy/Telegram HTTP Error during {method}: {error_details}") | |
| try: | |
| error_data = e.response.json() | |
| error_msg = error_data.get("description", error_details) | |
| except: | |
| error_msg = error_details | |
| if 'unauthorized' in error_msg.lower() or 'forbidden' in error_msg.lower(): | |
| raise RuntimeError(f"فشل مصادقة Telegram (عبر الجسر): {error_msg}. تأكد من صحة TG_BOT_TOKEN في الجسر.") | |
| else: | |
| raise RuntimeError(f"خطأ في جلب البيانات عبر الجسر ({method}): {error_msg}.") | |
| except Exception as e: | |
| log.error(f"An unexpected error occurred during proxy fetch: {e}") | |
| raise RuntimeError(f"خطأ غير متوقع في الاتصال بالجسر: {e}.") | |
| # --- 1. جلب معلومات القناة الأساسية (getChat) --- | |
| chat_info_data = await fetch_via_proxy( | |
| "getChat", | |
| data={"chat_id": TG_CHANNEL} | |
| ) | |
| # --- 2. جلب المشرفين (getChatAdministrators) --- | |
| admins_list = await fetch_via_proxy( | |
| "getChatAdministrators", | |
| data={"chat_id": TG_CHANNEL} | |
| ) | |
| admins_count = len(admins_list) | |
| # تحويل البيانات إلى تنسيق موحد للتحليل | |
| stats = { | |
| "chat_id": chat_info_data.get('id'), | |
| "title": chat_info_data.get('title'), | |
| "members": chat_info_data.get('members_count', 'N/A'), | |
| "admins_count": admins_count, | |
| "description": chat_info_data.get('description'), | |
| "date": datetime.utcnow().isoformat(), | |
| } | |
| log.info(f"Successfully fetched stats (via proxy) for channel {stats['title']} with {stats['members']} members.") | |
| return [stats] | |
| # ---------------- Main Analysis Pipeline ---------------- | |
| async def run_analysis_pipeline(limit: int = POSTS_LIMIT) -> Dict[str, Any]: | |
| """Runs the full analysis pipeline.""" | |
| log.info("Running analysis job...") | |
| # 1. جلب البيانات | |
| try: | |
| posts = await fetch_telegram_stats(limit=limit) | |
| except RuntimeError as e: | |
| log.warning(f"Failed to fetch stats: {e}") | |
| error_detail = str(e) | |
| entry = {"time": datetime.utcnow().isoformat(), "error": f"fetch_error: {error_detail}"} | |
| save_log(entry) | |
| return {"status": "error", "message": error_detail, "log_entry": entry} | |
| if not posts: | |
| log.warning("No channel stats found for analysis.") | |
| entry = {"time": datetime.utcnow().isoformat(), "error": "no_stats"} | |
| save_log(entry) | |
| return {"status": "warning", "message": "لم يتم العثور على إحصائيات للقناة.", "log_entry": entry} | |
| # 2. تشغيل Mamba | |
| stats_text = encode_stats_for_mamba(posts) | |
| mamba_out = await run_mamba(stats_text) | |
| # 3. تفسير LLM | |
| interpretation = await interpret_with_llm(mamba_out) | |
| entry = { | |
| "time": datetime.utcnow().isoformat(), | |
| "posts_count": 1, | |
| "channel_title": posts[0].get('title'), | |
| "channel_members": posts[0].get('members'), | |
| "advice": interpretation | |
| } | |
| save_log(entry) | |
| log.info("Analysis saved.") | |
| # تنسيق الخرج | |
| output_message = ( | |
| f"**✅ اكتمل التحليل بنجاح!**\n\n" | |
| f"**القناة:** {posts[0].get('title')}\n" | |
| f"**عدد الأعضاء:** {posts[0].get('members')}\n" | |
| f"**وقت التحليل:** {entry['time']}\n\n" | |
| f"--- **توصيات الذكاء الاصطناعي** ---\n" | |
| f"{interpretation}\n\n" | |
| f"--- **إخراج Mamba الخام** ---\n" | |
| f"`{mamba_out.strip()}`" | |
| ) | |
| return {"status": "success", "message": output_message, "log_entry": entry} | |
| # ---------------- Gradio Interface Functions ---------------- | |
| async def gradio_run_once(limit: int) -> str: | |
| """Gradio function to run the analysis manually.""" | |
| result = await run_analysis_pipeline(limit=limit) | |
| if result.get("status") == "success": | |
| return result["message"] | |
| else: | |
| return f"**⚠️ فشل التحليل:** {result['message']}" | |
| def gradio_get_logs() -> str: | |
| """Gradio function to display logs.""" | |
| logs = [] | |
| if os.path.exists(LOG_PATH): | |
| try: | |
| with open(LOG_PATH, "r", encoding="utf-8") as f: | |
| logs = json.load(f) | |
| except Exception as e: | |
| log.error(f"Error reading log file: {e}") | |
| return "حدث خطأ أثناء قراءة ملف السجلات." | |
| if not logs: | |
| return "لا توجد سجلات تحليل متاحة." | |
| output = "## 📄 سجلات التحليل الأخيرة\n" | |
| for i, entry in enumerate(logs): | |
| output += f"### {i+1}. تحليل بتاريخ {entry.get('time', 'N/A')}\n" | |
| if entry.get('error'): | |
| output += f"**❌ خطأ:** {entry['error']}\n" | |
| else: | |
| output += f"**✅ القناة:** {entry.get('channel_title', 'N/A')}\n" | |
| output += f"**💡 النصيحة:**\n```\n{entry.get('advice', 'N/A')[:300]}...\n```\n" | |
| output += "---\n" | |
| return output | |
| # ---------------- Scheduler ---------------- | |
| def daily_job_wrapper(): | |
| """Synchronous wrapper to run the async job in the scheduler.""" | |
| log.info("Running scheduled analysis job via Gradio wrapper...") | |
| try: | |
| # تشغيل الدالة async في حلقة الحدث الخاصة بها | |
| result = asyncio.run(run_analysis_pipeline()) | |
| log.info(f"Scheduled job completed. Status: {result.get('status')}") | |
| except Exception as e: | |
| log.error(f"Error in scheduled job: {e}") | |
| # ---------------- Gradio Interface Definition ---------------- | |
| # تم وضع التعريف هنا لاستخدامه لاحقاً في خيط منفصل | |
| with gr.Blocks(title="Telegram Channel Analyzer Agent") as demo: | |
| gr.Markdown("# 🤖 وكيل تحليل قناة Telegram (Gradio & Scheduler)") | |
| gr.Markdown(f"**حالة الخدمة:** {STATUS_MESSAGE}") | |
| gr.Markdown(f"**القناة المستهدفة:** `{TG_CHANNEL}`") | |
| gr.Markdown(f"**جسر Replit:** `{REPLIT_PROXY_URL}`") | |
| gr.Markdown("---") | |
| with gr.Tab("تشغيل التحليل يدوياً"): | |
| gr.Markdown("## ⚙️ تشغيل لمرة واحدة - جلب إحصائيات القناة الأساسية") | |
| limit_input = gr.Slider( | |
| minimum=10, | |
| maximum=300, | |
| step=10, | |
| value=POSTS_LIMIT, | |
| label="الحد الأقصى للمنشورات (غير مستخدم مع Bot API)" | |
| ) | |
| run_button = gr.Button("🚀 بدء تحليل معلومات القناة الآن") | |
| output_textbox = gr.Markdown(label="نتيجة التحليل") | |
| run_button.click( | |
| fn=gradio_run_once, | |
| inputs=[limit_input], | |
| outputs=[output_textbox] | |
| ) | |
| with gr.Tab("سجلات التحليل"): | |
| gr.Markdown("## 📋 سجلات التحليل المحفوظة") | |
| log_button = gr.Button("🔄 تحديث السجلات") | |
| log_output = gr.Markdown(label="السجلات") | |
| log_button.click( | |
| fn=gradio_get_logs, | |
| inputs=[], | |
| outputs=[log_output] | |
| ) | |
| demo.load( | |
| fn=gradio_get_logs, | |
| inputs=[], | |
| outputs=[log_output] | |
| ) | |
| # ---------------- Main Entry Point (Gradio/Scheduler) ---------------- | |
| if __name__ == "__main__": | |
| if not IS_SERVICE_READY: | |
| log.error("Service is not ready. Exiting.") | |
| exit(1) | |
| # <<< تم حذف كل ما يتعلق بـ Application و Handlers (Webhook/Polling) >>> | |
| # 1. تشغيل Gradio في خيط منفصل | |
| def start_gradio(): | |
| log.info("Starting Gradio Interface in a separate thread...") | |
| # استخدام المنفذ الذي يحدده المضيف (عادةً Render) | |
| PORT = int(os.environ.get("PORT", "7860")) | |
| try: | |
| demo.launch(server_name="0.0.0.0", server_port=PORT) | |
| except Exception as e: | |
| log.error(f"Failed to start Gradio: {e}") | |
| threading.Thread(target=start_gradio, daemon=True).start() | |
| # 2. بدء المجدول للمهمة الدورية | |
| scheduler.add_job(daily_job_wrapper, "cron", hour=21, minute=10) | |
| scheduler.start() | |
| log.info("Scheduler started for daily analysis.") | |
| # 3. إبقاء الخيط الرئيسي حياً لضمان استمرار عمل المجدول و Gradio | |
| try: | |
| log.info("Keeping main thread alive for Gradio and Scheduler...") | |
| # حلقة بسيطة لإبقاء البرنامج قيد التشغيل (بديل لـ run_polling) | |
| while True: | |
| time.sleep(1) | |
| except (KeyboardInterrupt, SystemExit): | |
| log.info("Exiting gracefully.") | |
| except Exception as e: | |
| log.error(f"Error in main loop: {e}") | |
| log.info("Analyzer Agent stopped.") | |