AnalizerAgent / app.py
Mustafa-albakkar's picture
Update app.py
b872dab verified
# ============================================================
# analyzer_agent_gradio/app.py — Telegram Analyzer Agent (Gradio/Scheduler Only)
# Uses Replit Proxy for Telegram API access.
# ============================================================
import huggingface_hub
import os
import json
import asyncio
from datetime import datetime
import logging
import threading
from functools import wraps
from typing import List, Dict, Any, Optional
import time # <<< إضافة مكتبة time للـ loop
import requests # <<< إضافة مكتبة requests للاتصال بالجسر
import gradio as gr
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# LLM & minimal telegram dependencies
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import telegram
# لا نحتاج لـ telegram.ext هنا لأننا أزلنا الـ Handlers
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
# ---------------- Logging ----------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("analyzer")
# ---------------- Env & config ----------------
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
TG_CHANNEL = os.getenv("TG_CHANNEL")
# المتغير الجديد: رابط تطبيق الجسر على Replit
REPLIT_PROXY_URL = os.getenv("REPLIT_PROXY_URL")
LOG_PATH = os.getenv("ANALYZER_LOG", "analyzer_log.json")
POSTS_LIMIT = int(os.getenv("ANALYZER_LIMIT", "80"))
MAMBA_MODEL_PATH = os.getenv("MAMBA_MODEL_PATH", "state-spaces/mamba-1.4b-hf")
# ---------------- Initialization Check ----------------
if not all([TG_BOT_TOKEN, TG_CHANNEL, REPLIT_PROXY_URL]):
log.error("Telegram Bot Token, Channel ID, or Replit Proxy URL are missing in environment variables.")
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ النظام غير جاهز: بيانات اعتماد البوت، معرف القناة، أو رابط الجسر مفقودة."
else:
IS_SERVICE_READY = True
STATUS_MESSAGE = "✅ النظام جاهز للتحليل (Gradio & Scheduler)."
# ---------------- Helpers for Non-Async Blocking Operations ----------------
def async_wrap_blocking(func):
"""Wraps a synchronous function to be run in a separate thread."""
@wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
return wrapper
# ---------------- Load Mamba ----------------
log.info("Loading Mamba model...")
try:
mamba_tok = AutoTokenizer.from_pretrained(MAMBA_MODEL_PATH)
mamba_model = AutoModelForCausalLM.from_pretrained(
MAMBA_MODEL_PATH,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
except Exception as e:
log.error(f"Failed to load Mamba model: {e}")
mamba_tok, mamba_model = None, None
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ فشل تحميل نموذج Mamba."
# ---------------- Load GGUF LLM (Zephyr 7B) ----------------
log.info("Loading LLM interpreter (GGUF + llama.cpp)...")
llm: Optional[Llama] = None
try:
LLM_GGUF_REPO = "TheBloke/zephyr-7B-beta-GGUF"
LLM_GGUF_FILE = "zephyr-7b-beta.Q6_K.gguf"
LLM_LOCAL_PATH = os.getenv("LLM_GGUF_PATH", f"./{LLM_GGUF_FILE}")
if not os.path.exists(LLM_LOCAL_PATH):
log.info("Downloading GGUF model from HuggingFace...")
LLM_LOCAL_PATH = hf_hub_download(
repo_id=LLM_GGUF_REPO,
filename=LLM_GGUF_FILE
)
llm = Llama(
model_path=LLM_LOCAL_PATH,
n_ctx=4096,
n_threads=4,
n_gpu_layers=0
)
log.info("GGUF model loaded successfully.")
except Exception as e:
log.error(f"Failed to load GGUF model: {e}")
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ فشل تحميل نموذج GGUF."
# ---------------- Core Helpers ----------------
def save_log(entry: Dict[str, Any]):
"""Saves the analysis entry to a JSON log file."""
logs = []
if os.path.exists(LOG_PATH):
try:
with open(LOG_PATH, "r", encoding="utf-8") as f:
logs = json.load(f)
except Exception:
logs = []
logs.insert(0, entry)
with open(LOG_PATH, "w", encoding="utf-8") as f:
json.dump(logs, f, ensure_ascii=False, indent=2)
def encode_stats_for_mamba(posts: List[Dict[str, Any]]) -> str:
"""Encodes the list of posts into a single string for Mamba analysis."""
lines = []
for p in posts:
if p.get('chat_id'):
line = (
f"Channel Name:{p.get('title')} | Members:{p.get('members')} | "
f"Admins:{p.get('admins_count')}"
)
lines.append(line)
break
if not lines:
return "No sufficient data for sequential analysis. Analyzing overall channel status."
return "\n".join(lines)
@async_wrap_blocking
def run_mamba(text: str) -> str:
"""Synchronous Mamba generation function."""
if mamba_model is None or mamba_tok is None:
return "Error: Mamba model not loaded."
inp = mamba_tok(text, return_tensors="pt")
with torch.no_grad():
out = mamba_model.generate(**inp, max_new_tokens=64, do_sample=False)
return mamba_tok.decode(out[0], skip_special_tokens=True)
@async_wrap_blocking
def interpret_with_llm(mamba_output: str) -> str:
"""Synchronous LLM interpretation function using llama.cpp."""
if llm is None:
return "Error: LLM model not loaded."
prompt = (
"هذه نتائج تحليل إحصائي لقناة تلغرام (معلومات عامة فقط):\n"
f"{mamba_output}\n\n"
"حلل أداء القناة الأساسي (عدد الأعضاء والمشرفين).\n"
"استخرج:\n"
"- نقاط القوة (مثل عدد الأعضاء)\n"
"- نقاط الضعف (مثل نقص البيانات الزمنية أو التفاعلات)\n"
"- استراتيجيات لزيادة الاشتراكات والتفاعل\n"
"اكتب التحليل بالعربية وبشكل مرتب ومختصر، مع ذكر القيود على البيانات المتاحة."
)
res = llm(
prompt,
max_tokens=250,
temperature=0.3,
top_p=0.95
)
return res["choices"][0]["text"].strip()
# ---------------- FETCH TELEGRAM STATS (VIA REPLIT PROXY) ----------------
async def fetch_telegram_stats(limit: int = POSTS_LIMIT) -> List[Dict[str, Any]]:
"""
Fetches general channel statistics by routing requests through the Replit Proxy.
"""
if not IS_SERVICE_READY or not REPLIT_PROXY_URL:
raise RuntimeError(STATUS_MESSAGE)
async def fetch_via_proxy(method: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Sends a request to the Replit proxy and returns the Telegram response result."""
url = f"{REPLIT_PROXY_URL}/route_telegram/{method}"
try:
# نستخدم to_thread لجعل طلب requests متزامن (Blocking) يعمل في خيط منفصل
response = await asyncio.to_thread(
requests.post,
url,
json=data if data is not None else {},
timeout=200
)
response.raise_for_status()
json_response = response.json()
if not json_response.get('ok'):
# يتم إلقاء خطأ Telegram API إذا كان الرد سلبياً
error_description = json_response.get('description', 'Unknown API Error')
raise requests.exceptions.HTTPError(
f"Telegram API Error via Proxy: {error_description}",
response=response
)
return json_response['result']
except requests.exceptions.HTTPError as e:
error_details = str(e)
log.error(f"Proxy/Telegram HTTP Error during {method}: {error_details}")
try:
error_data = e.response.json()
error_msg = error_data.get("description", error_details)
except:
error_msg = error_details
if 'unauthorized' in error_msg.lower() or 'forbidden' in error_msg.lower():
raise RuntimeError(f"فشل مصادقة Telegram (عبر الجسر): {error_msg}. تأكد من صحة TG_BOT_TOKEN في الجسر.")
else:
raise RuntimeError(f"خطأ في جلب البيانات عبر الجسر ({method}): {error_msg}.")
except Exception as e:
log.error(f"An unexpected error occurred during proxy fetch: {e}")
raise RuntimeError(f"خطأ غير متوقع في الاتصال بالجسر: {e}.")
# --- 1. جلب معلومات القناة الأساسية (getChat) ---
chat_info_data = await fetch_via_proxy(
"getChat",
data={"chat_id": TG_CHANNEL}
)
# --- 2. جلب المشرفين (getChatAdministrators) ---
admins_list = await fetch_via_proxy(
"getChatAdministrators",
data={"chat_id": TG_CHANNEL}
)
admins_count = len(admins_list)
# تحويل البيانات إلى تنسيق موحد للتحليل
stats = {
"chat_id": chat_info_data.get('id'),
"title": chat_info_data.get('title'),
"members": chat_info_data.get('members_count', 'N/A'),
"admins_count": admins_count,
"description": chat_info_data.get('description'),
"date": datetime.utcnow().isoformat(),
}
log.info(f"Successfully fetched stats (via proxy) for channel {stats['title']} with {stats['members']} members.")
return [stats]
# ---------------- Main Analysis Pipeline ----------------
async def run_analysis_pipeline(limit: int = POSTS_LIMIT) -> Dict[str, Any]:
"""Runs the full analysis pipeline."""
log.info("Running analysis job...")
# 1. جلب البيانات
try:
posts = await fetch_telegram_stats(limit=limit)
except RuntimeError as e:
log.warning(f"Failed to fetch stats: {e}")
error_detail = str(e)
entry = {"time": datetime.utcnow().isoformat(), "error": f"fetch_error: {error_detail}"}
save_log(entry)
return {"status": "error", "message": error_detail, "log_entry": entry}
if not posts:
log.warning("No channel stats found for analysis.")
entry = {"time": datetime.utcnow().isoformat(), "error": "no_stats"}
save_log(entry)
return {"status": "warning", "message": "لم يتم العثور على إحصائيات للقناة.", "log_entry": entry}
# 2. تشغيل Mamba
stats_text = encode_stats_for_mamba(posts)
mamba_out = await run_mamba(stats_text)
# 3. تفسير LLM
interpretation = await interpret_with_llm(mamba_out)
entry = {
"time": datetime.utcnow().isoformat(),
"posts_count": 1,
"channel_title": posts[0].get('title'),
"channel_members": posts[0].get('members'),
"advice": interpretation
}
save_log(entry)
log.info("Analysis saved.")
# تنسيق الخرج
output_message = (
f"**✅ اكتمل التحليل بنجاح!**\n\n"
f"**القناة:** {posts[0].get('title')}\n"
f"**عدد الأعضاء:** {posts[0].get('members')}\n"
f"**وقت التحليل:** {entry['time']}\n\n"
f"--- **توصيات الذكاء الاصطناعي** ---\n"
f"{interpretation}\n\n"
f"--- **إخراج Mamba الخام** ---\n"
f"`{mamba_out.strip()}`"
)
return {"status": "success", "message": output_message, "log_entry": entry}
# ---------------- Gradio Interface Functions ----------------
async def gradio_run_once(limit: int) -> str:
"""Gradio function to run the analysis manually."""
result = await run_analysis_pipeline(limit=limit)
if result.get("status") == "success":
return result["message"]
else:
return f"**⚠️ فشل التحليل:** {result['message']}"
def gradio_get_logs() -> str:
"""Gradio function to display logs."""
logs = []
if os.path.exists(LOG_PATH):
try:
with open(LOG_PATH, "r", encoding="utf-8") as f:
logs = json.load(f)
except Exception as e:
log.error(f"Error reading log file: {e}")
return "حدث خطأ أثناء قراءة ملف السجلات."
if not logs:
return "لا توجد سجلات تحليل متاحة."
output = "## 📄 سجلات التحليل الأخيرة\n"
for i, entry in enumerate(logs):
output += f"### {i+1}. تحليل بتاريخ {entry.get('time', 'N/A')}\n"
if entry.get('error'):
output += f"**❌ خطأ:** {entry['error']}\n"
else:
output += f"**✅ القناة:** {entry.get('channel_title', 'N/A')}\n"
output += f"**💡 النصيحة:**\n```\n{entry.get('advice', 'N/A')[:300]}...\n```\n"
output += "---\n"
return output
# ---------------- Scheduler ----------------
def daily_job_wrapper():
"""Synchronous wrapper to run the async job in the scheduler."""
log.info("Running scheduled analysis job via Gradio wrapper...")
try:
# تشغيل الدالة async في حلقة الحدث الخاصة بها
result = asyncio.run(run_analysis_pipeline())
log.info(f"Scheduled job completed. Status: {result.get('status')}")
except Exception as e:
log.error(f"Error in scheduled job: {e}")
# ---------------- Gradio Interface Definition ----------------
# تم وضع التعريف هنا لاستخدامه لاحقاً في خيط منفصل
with gr.Blocks(title="Telegram Channel Analyzer Agent") as demo:
gr.Markdown("# 🤖 وكيل تحليل قناة Telegram (Gradio & Scheduler)")
gr.Markdown(f"**حالة الخدمة:** {STATUS_MESSAGE}")
gr.Markdown(f"**القناة المستهدفة:** `{TG_CHANNEL}`")
gr.Markdown(f"**جسر Replit:** `{REPLIT_PROXY_URL}`")
gr.Markdown("---")
with gr.Tab("تشغيل التحليل يدوياً"):
gr.Markdown("## ⚙️ تشغيل لمرة واحدة - جلب إحصائيات القناة الأساسية")
limit_input = gr.Slider(
minimum=10,
maximum=300,
step=10,
value=POSTS_LIMIT,
label="الحد الأقصى للمنشورات (غير مستخدم مع Bot API)"
)
run_button = gr.Button("🚀 بدء تحليل معلومات القناة الآن")
output_textbox = gr.Markdown(label="نتيجة التحليل")
run_button.click(
fn=gradio_run_once,
inputs=[limit_input],
outputs=[output_textbox]
)
with gr.Tab("سجلات التحليل"):
gr.Markdown("## 📋 سجلات التحليل المحفوظة")
log_button = gr.Button("🔄 تحديث السجلات")
log_output = gr.Markdown(label="السجلات")
log_button.click(
fn=gradio_get_logs,
inputs=[],
outputs=[log_output]
)
demo.load(
fn=gradio_get_logs,
inputs=[],
outputs=[log_output]
)
# ---------------- Main Entry Point (Gradio/Scheduler) ----------------
if __name__ == "__main__":
if not IS_SERVICE_READY:
log.error("Service is not ready. Exiting.")
exit(1)
# <<< تم حذف كل ما يتعلق بـ Application و Handlers (Webhook/Polling) >>>
# 1. تشغيل Gradio في خيط منفصل
def start_gradio():
log.info("Starting Gradio Interface in a separate thread...")
# استخدام المنفذ الذي يحدده المضيف (عادةً Render)
PORT = int(os.environ.get("PORT", "7860"))
try:
demo.launch(server_name="0.0.0.0", server_port=PORT)
except Exception as e:
log.error(f"Failed to start Gradio: {e}")
threading.Thread(target=start_gradio, daemon=True).start()
# 2. بدء المجدول للمهمة الدورية
scheduler.add_job(daily_job_wrapper, "cron", hour=21, minute=10)
scheduler.start()
log.info("Scheduler started for daily analysis.")
# 3. إبقاء الخيط الرئيسي حياً لضمان استمرار عمل المجدول و Gradio
try:
log.info("Keeping main thread alive for Gradio and Scheduler...")
# حلقة بسيطة لإبقاء البرنامج قيد التشغيل (بديل لـ run_polling)
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
log.info("Exiting gracefully.")
except Exception as e:
log.error(f"Error in main loop: {e}")
log.info("Analyzer Agent stopped.")