""" Application Chainlit pour l'Agent Collaboratif LangGraph ======================================================== Intégration complète avec: - Chainlit 2.8.1 - Official Data Layer (PostgreSQL/Supabase) - LangSmith monitoring - Starters avec icônes - Chain of Thought visible - Style personnalisé (dark theme) """ import os import json import asyncio from typing import Dict, Any, List, Optional import chainlit as cl from chainlit.types import ThreadDict, Starter #from langsmith import Client #from langsmith.run_helpers import traceable from langsmith import traceable # Import du module agent (votre code existant) # On suppose que le code est dans agent_collaboratif_avid.py from agent_collaboratif_avid import ( run_collaborative_agent, retriever_manager, PINECONE_INDEX_NAME, OPENAI_MODEL_NAME, SIMILARITY_TOP_K, MAX_VALIDATION_LOOPS ) import bcrypt #from chainlit.data.sql_alchemy import SQLAlchemyDataLayer from sql_alchemy import SQLAlchemyDataLayer from supabase import create_client, Client from supabase.lib.client_options import ClientOptions #from langfuse.langchain import CallbackHandler #import getpass #os.environ['LANGFUSE_SECRET_KEY'] = getpass.getpass("Enter your Secret key: ") #os.environ['LANGFUSE_PUBLIC_KEY'] = getpass.getpass("Enter your Public key: ") #os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" #langfuse_handler = CallbackHandler() SUPABASE_URL = os.environ.get("SUPABASE_URL") SUPABASE_ANON_KEY = os.environ.get("SUPABASE_ANON_KEY") CONNINFO = os.environ.get("CONNINFO") url: str = SUPABASE_URL key: str = SUPABASE_ANON_KEY supabase: Client = create_client(url, key) #url: str = "https://urtvfyesitnmwrouarze.supabase.co" #key: str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InVydHZmeWVzaXRubXdyb3VhcnplIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTk5NTgyNzIsImV4cCI6MjA3NTUzNDI3Mn0.HvZVdlyyck6iI6L5XiPupPZ8voLt-u4NCKAqgwHl6dE" #supabase: Client = create_client(url, key, options=ClientOptions(auto_refresh_token=False,persist_session=True)) #@cl.data_layer #def get_data_layer(): # return SQLAlchemyDataLayer(conninfo="postgresql+asyncpg://postgres.urtvfyesitnmwrouarze:2VlIHUmI3qVhJpcb@aws-1-eu-north-1.pooler.supabase.com:5432/postgres", storage_provider=supabase) @cl.data_layer def get_data_layer(): return SQLAlchemyDataLayer(conninfo=CONNINFO, storage_provider=supabase) # ============================================================================= # CONFIGURATION DE L'AUTHENTIFICATION (Optionnel) # ============================================================================= @cl.password_auth_callback def auth_callback(username: str, password: str) -> Optional[cl.User]: """ Callback d'authentification (optionnel). À configurer selon vos besoins. """ # Exemple simple (à remplacer par votre logique) auth = json.loads(os.environ.get("CHAINLIT_AUTH_LOGIN")) auth_iter = iter(auth) while True: # item will be "end" if iteration is complete connexion = next(auth_iter, "end") if bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(connexion['ident'].encode('utf-8'), bcrypt.gensalt())) == True and bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(connexion['pwd'].encode('utf-8'), bcrypt.gensalt())) == True: print("OK") return cl.User( identifier=connexion['ident'], metadata={"role": connexion['role'], "provider": "credentials"} ) if connexion == "end": break return None # ============================================================================= # CONFIGURATION LANGSMITH # ============================================================================= LANGCHAIN_API_KEY = os.environ.get("LANGCHAIN_API_KEY") LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT") if LANGCHAIN_API_KEY: os.environ["LANGCHAIN_TRACING_V2"] = "true" #os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT #langsmith_client = Client() print(f"✅ LangSmith activé - Projet: {LANGSMITH_PROJECT}") else: print("⚠️ LANGCHAIN_API_KEY non définie - Monitoring désactivé") langsmith_client = None # ============================================================================= # FONCTIONS AUXILIAIRES POUR L'AFFICHAGE # ============================================================================= async def send_cot_step(step_name: str, content: str, status: str = "running"): """Envoie une étape du Chain of Thought.""" step = cl.Step( name=step_name, type="tool", show_input=True ) step.output = content if status == "done": step.is_error = False elif status == "error": step.is_error = True await step.send() return step async def display_query_analysis(analysis: Dict[str, Any]): """Affiche l'analyse de la requête.""" content = f"""**Bases identifiées:** {', '.join(analysis.get('databases_to_query', []))} **Priorités:** {json.dumps(analysis.get('priorities', {}), indent=2, ensure_ascii=False)} **Résumé:** {analysis.get('analysis_summary', 'N/A')} """ await send_cot_step("🔍 Analyse de la requête", content, "done") async def display_collection(info_list: List[Dict[str, Any]]): """Affiche les informations collectées.""" content_parts = [] for info in info_list: content_parts.append(f""" **📦 Base:** {info['database']} **Catégorie:** {info['category']} **Priorité:** {info['priority']} **Résultats:** {info['results_count']} """) content = "\n".join(content_parts) await send_cot_step("📊 Collecte d'informations", content, "done") async def display_validation(validation: Dict[str, Any], iteration: int): """Affiche les résultats de validation.""" content = f"""**Itération:** {iteration} **Score de confiance:** {validation.get('confidence_score', 0)}% **Validé:** {'✅ Oui' if validation.get('is_valid') else '❌ Non'} **Hallucinations détectées:** {len(validation.get('hallucinations_detected', []))} """ if validation.get('hallucinations_detected'): content += "\n**Problèmes:**\n" for hall in validation['hallucinations_detected']: content += f"- {hall}\n" status = "done" if validation.get('is_valid') else "error" await send_cot_step(f"✅ Validation (#{iteration})", content, status) async def display_similar_info(similar_info: List[Dict[str, Any]]): """Affiche les informations similaires.""" if not similar_info: return # Regrouper par base grouped = {} for item in similar_info: db = item['database'] if db not in grouped: grouped[db] = [] grouped[db].append(item) elements = [] for db_name, items in grouped.items(): content_parts = [f"### 📚 {db_name.upper()}\n"] content_parts.append(f"**Catégorie:** {items[0]['category']}") content_parts.append(f"**Résultats:** {len(items)}\n") for idx, item in enumerate(items[:3], 1): # Limiter à 3 par base score = item.get('score', 'N/A') content_parts.append(f"**{idx}. Score:** {score}") content_preview = item['content'][:200] if len(item['content']) > 200: content_preview += "..." content_parts.append(f"**Contenu:** {content_preview}\n") # Créer un élément Chainlit element = cl.Text( content="\n".join(content_parts), display="side" ) elements.append(element) if elements: await cl.Message( content="💡 **Informations similaires trouvées dans d'autres bases**", elements=elements ).send() async def display_web_search_results(web_search_results: List[Dict[str, Any]]): """Affiche les résultats de recherche web.""" if not web_search_results: return elements = [] content_parts = [] content_parts.append(f"### 🌐 Résultats de la recherche web\n") content_parts.append(f"**Nombre de résultats:** {len(web_search_results)}\n") for idx, item in enumerate(web_search_results[:5], 1): # Limiter à 5 résultats content_parts.append(f"**{idx}. Titre:** {item['title']}") content_parts.append(f"**Lien:** {item['markdown_link']}") content_parts.append(f"**Résumé:** {item['summary']}\n") element = cl.Text( content="\n".join(content_parts), display="side" ) elements.append(element) if elements: await cl.Message( content="🌐 **Informations trouvées sur le web**", elements=elements ).send() # ============================================================================= # FONCTIONS D'AFFICHAGE STREAMING PAR NŒUD # ============================================================================= async def stream_response(content: str, msg: cl.Message, chunk_size: int = 50): """Stream du contenu progressivement dans un message.""" for i in range(0, len(content), chunk_size): chunk = content[i:i + chunk_size] msg.content += chunk await msg.update() await asyncio.sleep(0.25) # Petit délai pour un effet visuel async def display_node_update(node_name: str, state: Dict[str, Any]): """Affiche les mises à jour d'état après l'exécution d'un nœud.""" if node_name == "analyze_query": if state.get("query_analysis"): await display_query_analysis(state["query_analysis"]) elif node_name == "collect_information": if state.get("collected_information"): await display_collection(state["collected_information"]) elif node_name == "generate_response": if state.get("final_response"): content = f"**Réponse générée** ({len(state['final_response'])} caractères)\n\nLa réponse complète sera affichée à la fin du workflow." await send_cot_step("✏️ Génération de la réponse", content, "done") elif node_name == "validate_response": if state.get("validation_results"): iteration = state.get("iteration_count", len(state["validation_results"])) last_validation = state["validation_results"][-1] await display_validation(last_validation, iteration) elif node_name == "refine_response": content = f"**Itération:** {state.get('iteration_count', 0)}\n**Correction en cours...**" await send_cot_step("⚙️ Refinement", content, "done") elif node_name == "collect_similar_information": if state.get("additional_information"): await display_similar_info(state["additional_information"]) # ============================================================================= # FONCTION PRINCIPALE TRACÉE AVEC LANGSMITH # ============================================================================= @traceable(name="agent_collaboratif_query", project_name=LANGSMITH_PROJECT) async def process_query_with_tracing(query: str, thread_id: str) -> Dict[str, Any]: """Traite la requête avec traçage LangSmith et streaming en temps réel.""" # Import du workflow from agent_collaboratif_avid import AgentState, create_agent_workflow from langchain_core.messages import HumanMessage app = create_agent_workflow() initial_state = { "messages": [HumanMessage(content=query)], "user_query": query, "query_analysis": {}, "collected_information": [], "validation_results": [], "final_response": "", "iteration_count": 0, "errors": [], "additional_information": [], "similar_info_response":"", "web_search_results": [] } # Message de démarrage await send_cot_step("🔄 Démarrage", "Initialisation du workflow LangGraph...", "done") # Variables pour suivre l'état final_state = None # STREAMING: Utilisation de app.astream() pour obtenir les mises à jour après chaque nœud try: #async for event in app.astream(initial_state, {"callbacks": [langfuse_handler]}): #app.invoke(intial_state, config={"callbacks": [langfuse_handler]}) async for event in app.astream(initial_state): # event est un dictionnaire avec les nœuds comme clés for node_name, node_state in event.items(): # Ignorer le nœud spécial __start__ if node_name == "__start__": continue # Afficher un message de progression pour le nœud actuel node_display_names = { "analyze_query": "🔍 Analyse de la requête", "collect_information": "📊 Collecte d'informations", "generate_response": "✏️ Génération de la réponse", "validate_response": "✅ Validation anti-hallucination", "refine_response": "⚙️ Refinement de la réponse", "collect_similar_information": "🔗 Collecte d'informations similaires" } display_name = node_display_names.get(node_name, f"⚙️ {node_name}") # Message de progression await send_cot_step( f"🔄 {display_name}", f"Nœud exécuté avec succès", "done" ) # Afficher les détails spécifiques du nœud await display_node_update(node_name, node_state) # Sauvegarder l'état final final_state = node_state except Exception as e: error_msg = f"Erreur lors du streaming: {str(e)}" await send_cot_step("❌ Erreur", error_msg, "error") raise # Si le streaming n'a pas retourné d'état final, utiliser la méthode classique if final_state is None: final_state = initial_state result = { "query": query, "query_analysis": final_state.get("query_analysis", {}), "collected_information": final_state.get("collected_information", []), "validation_results": final_state.get("validation_results", []), "final_response": final_state.get("final_response", ""), "iteration_count": final_state.get("iteration_count", 0), "errors": final_state.get("errors", []), "additional_information": final_state.get("additional_information", []), "similar_info_response": final_state.get("similar_info_response", ""), "web_search_results": final_state.get("web_search_results", []), "sources_used": [ info["database"] for info in final_state.get("collected_information", []) ], "pinecone_index": PINECONE_INDEX_NAME } return result # ============================================================================= # CALLBACKS CHAINLIT # ============================================================================= @cl.set_chat_profiles async def chat_profile(current_user: cl.User): return [ cl.ChatProfile( name="Avid Agent", markdown_description="🎓 Avid Agent permet de converser avec un agent collaboratif entre 4 bases de données pour extraire les informations pertinentes afin de générer une réponse en réduisant les hallucations, par relecture et redéfinition des éléments.", icon="/public/sparkles-gustaveia.png", starters=[ cl.Starter( label= "🔬 Laboratoires & Mobilité", message= "Quels sont les laboratoires de l'université Gustave Eiffel travaillant sur la mobilité urbaine durable?", #icon= "/public/icons/lab.svg" ), cl.Starter( label= "🎓 Formations Master", message= "Je cherche des formations en master sur l'aménagement urbain et le développement durable", #icon= "/public/icons/education.svg" ), cl.Starter( label= "🤝 Collaborations Recherche", message= "Quels laboratoires ont des axes de recherche similaires en énergie et pourraient collaborer?", #icon= "/public/icons/collaboration.svg" ), cl.Starter( label= "⚙️ Équipements Lab", message= "Liste les équipements disponibles dans les laboratoires travaillant sur la qualité de l'air", #icon= "/public/icons/equipment.svg" ), cl.Starter( label= "📚 Publications Récentes", message= "Trouve des publications récentes sur la transition énergétique dans les villes", #icon= "/public/icons/publications.svg" ), cl.Starter( label= "👥 Auteurs & Labs", message= "Qui sont les auteurs qui publient sur la mobilité douce et dans quels laboratoires?", #icon= "/public/icons/authors.svg" ), cl.Starter( label= "📖 Urbanisme Durable", message= "Quelles publications traitent de l'urbanisme durable et quand ont-elles été publiées?", #icon= "/public/icons/urban.svg" ), cl.Starter( label= "🏙️ Ville Intelligente", message= "Compare les formations et les laboratoires sur le thème de la ville intelligente", #icon= "/public/icons/smart-city.svg" ), cl.Starter( label= "🌍 Résilience Urbaine", message= "Identifie les opportunités de partenariats entre laboratoires sur la résilience urbaine", #icon= "/public/icons/resilience.svg" ), cl.Starter( label= "♻️ Économie Circulaire", message= "Quelles sont les compétences enseignées dans les formations liées à l'économie circulaire?", #icon= "/public/icons/circular.svg" ) ] ),cl.ChatProfile( name="Avid Dataviz", markdown_description="💡 Avid Dataviz permet d'avoir recours à des éléments statistiques et de corrélation entre les données laboratoires et les thématiques Ville Durable", ) ] @cl.on_chat_start async def start(): """Initialisation de la session chat.""" user = cl.user_session.get("user") chat_profile = cl.user_session.get("chat_profile") if chat_profile == "Avid Dataviz": await cl.Message( content=f"Bienvenue {user.identifier}!\n\nL'environnement {chat_profile} vous restitue les données sous forme d'objets statistiques." ).send() # Message de bienvenue avec style # welcome_msg = f"""# 🎓 Agent Collaboratif - Université Gustave Eiffel #Bienvenue ! Je suis votre assistant spécialisé en **Ville Durable**. ## 🔧 Configuration #- **Index Pinecone:** `{PINECONE_INDEX_NAME}` #- **Modèle:** `{OPENAI_MODEL_NAME}` #- **Top K résultats:** `{SIMILARITY_TOP_K}` #- **Max validations:** `{MAX_VALIDATION_LOOPS}` ## 💡 Fonctionnalités #✅ Recherche multi-bases vectorielles #✅ Validation anti-hallucination #✅ Suggestions d'informations connexes #✅ Traçage LangSmith actif #**Choisissez un starter ou posez votre question !** #""" # await cl.Message(content=welcome_msg).send() # Sauvegarder les métadonnées de session # cl.user_session.set("session_started", True) # cl.user_session.set("query_count", 0) @cl.on_message async def main(message: cl.Message): """Traitement du message utilisateur.""" query = message.content thread_id = cl.context.session.thread_id # Incrémenter le compteur query_count = cl.user_session.get("query_count", 0) + 1 cl.user_session.set("query_count", query_count) # Message de traitement processing_msg = cl.Message(content="") await processing_msg.send() try: # Traitement avec affichage du COT result = await process_query_with_tracing(query, thread_id) # Réponse finale en streaming final_response = result["final_response"] # Afficher un séparateur await send_cot_step("📝 Réponse finale", "Affichage de la réponse complète en streaming...", "done") # Créer un nouveau message pour la réponse finale response_msg = cl.Message(content="") await response_msg.send() # Streamer la réponse complète await stream_response(final_response, response_msg, chunk_size=50) # Afficher les informations similaires collectées par le nœud 6 if result.get("similar_info_response"): similar_msg = cl.Message(content="") await similar_msg.send() # Streamer la réponse similaire await stream_response(result["similar_info_response"], similar_msg, chunk_size=50) #await display_similar_info(result["similar_info_response"]) # Afficher les résultats de recherche web collectés par le nœud 7 web_msg = cl.Message(content="Résultats de recherche complémentaires sur le web : \n\n") await web_msg.send() for result_web in result["web_search_results"]: web_search = "- " + result_web['markdown_link'] + " : " + result_web['summary'] + "\n\n" await stream_response(web_search, web_msg, chunk_size=50) #await display_web_search_results(result["web_search_results"]) # Métadonnées metadata_parts = [ f"\n\n---\n### 📊 Métadonnées du traitement", f"**Sources consultées:** {', '.join(result['sources_used']) if result['sources_used'] else 'Aucune'}", f"**Itérations:** {result['iteration_count']}", ] if result['validation_results']: last_val = result['validation_results'][-1] metadata_parts.append(f"**Confiance finale:** {last_val.get('confidence_score', 0)}%") metadata_parts.append(f"**Requête n°:** {query_count}") # Ajouter les métadonnées en streaming metadata_text = "\n".join(metadata_parts) await stream_response(metadata_text, response_msg, chunk_size=100) # Supprimer le message de traitement initial vide processing_msg.content = "✅ Traitement terminé" await processing_msg.update() # Sauvegarder dans l'historique de session cl.user_session.set(f"query_{query_count}", { "query": query, "response": final_response, "sources": result['sources_used'] }) except Exception as e: error_msg = f"❌ **Erreur lors du traitement:**\n\n```\n{str(e)}\n```" processing_msg.content = error_msg await processing_msg.update() # Log dans LangSmith si disponible #if langsmith_client: # langsmith_client.create_feedback( # run_id=thread_id, # key="error", # score=0, # comment=str(e) # ) @cl.on_shared_thread_view async def on_shared_thread_view(thread: ThreadDict, viewer: Optional[cl.User]) -> bool: return True @cl.on_chat_resume async def on_chat_resume(thread: ThreadDict): """Reprise d'une conversation existante.""" thread_id = thread["id"] resume_msg = f"""# 🔄 Conversation reprise **Thread ID:** `{thread_id}` Vous pouvez continuer votre conversation ou poser une nouvelle question. """ await cl.Message(content=resume_msg).send() @cl.on_stop async def on_stop(): """Callback à l'arrêt de l'exécution.""" await cl.Message(content="⏹️ Traitement interrompu par l'utilisateur.").send() @cl.on_chat_end async def on_chat_end(): """Callback à la fin de la session.""" query_count = cl.user_session.get("query_count", 0) end_msg = f"""# 👋 Session terminée Merci d'avoir utilisé l'agent collaboratif ! **Statistiques de session:** - **Requêtes traitées:** {query_count} - **Index Pinecone:** {PINECONE_INDEX_NAME} """ await cl.Message(content=end_msg).send() # ============================================================================= # CONFIGURATION DU DATA LAYER (Supabase/PostgreSQL) # ============================================================================= """ Pour activer le Data Layer avec Supabase, créez un fichier .env: CHAINLIT_AUTH_SECRET=your-secret-key LITERAL_API_KEY=your-literal-api-key LITERAL_API_URL=https://cloud.getliteral.ai Ou configurez PostgreSQL directement: DATABASE_URL=postgresql://user:password@host:port/dbname Le Data Layer sera automatiquement activé si ces variables sont définies. """