datavid / app.py
datacipen's picture
Upload app.py
050a20c verified
raw
history blame
16.2 kB
"""
Application Chainlit pour l'Agent Collaboratif LangGraph
========================================================
Intégration complète avec:
- Chainlit 2.8.1
- Official Data Layer (PostgreSQL/Supabase)
- LangSmith monitoring
- Starters avec icônes
- Chain of Thought visible
- Style personnalisé (dark theme)
"""
import os
import json
import asyncio
from typing import Dict, Any, List, Optional
import chainlit as cl
from chainlit.types import ThreadDict, Starter
#from langsmith import Client
#from langsmith.run_helpers import traceable
from langsmith import traceable
# Import du module agent (votre code existant)
# On suppose que le code est dans agent_collaboratif_avid.py
from agent_collaboratif_avid import (
run_collaborative_agent,
retriever_manager,
PINECONE_INDEX_NAME,
OPENAI_MODEL_NAME,
SIMILARITY_TOP_K,
MAX_VALIDATION_LOOPS
)
# =============================================================================
# CONFIGURATION LANGSMITH
# =============================================================================
LANGCHAIN_API_KEY = os.environ.get("LANGCHAIN_API_KEY")
LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT")
if LANGCHAIN_API_KEY:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
#os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY
os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT
#langsmith_client = Client()
print(f"✅ LangSmith activé - Projet: {LANGSMITH_PROJECT}")
else:
print("⚠️ LANGCHAIN_API_KEY non définie - Monitoring désactivé")
langsmith_client = None
# =============================================================================
# FONCTIONS AUXILIAIRES POUR L'AFFICHAGE
# =============================================================================
async def send_cot_step(step_name: str, content: str, status: str = "running"):
"""Envoie une étape du Chain of Thought."""
step = cl.Step(
name=step_name,
type="tool",
show_input=True
)
step.output = content
if status == "done":
step.is_error = False
elif status == "error":
step.is_error = True
await step.send()
return step
async def display_query_analysis(analysis: Dict[str, Any]):
"""Affiche l'analyse de la requête."""
content = f"""**Bases identifiées:** {', '.join(analysis.get('databases_to_query', []))}
**Priorités:**
{json.dumps(analysis.get('priorities', {}), indent=2, ensure_ascii=False)}
**Résumé:** {analysis.get('analysis_summary', 'N/A')}
"""
await send_cot_step("🔍 Analyse de la requête", content, "done")
async def display_collection(info_list: List[Dict[str, Any]]):
"""Affiche les informations collectées."""
content_parts = []
for info in info_list:
content_parts.append(f"""
**📦 Base:** {info['database']}
**Catégorie:** {info['category']}
**Priorité:** {info['priority']}
**Résultats:** {info['results_count']}
""")
content = "\n".join(content_parts)
await send_cot_step("📊 Collecte d'informations", content, "done")
async def display_validation(validation: Dict[str, Any], iteration: int):
"""Affiche les résultats de validation."""
content = f"""**Itération:** {iteration}
**Score de confiance:** {validation.get('confidence_score', 0)}%
**Validé:** {'✅ Oui' if validation.get('is_valid') else '❌ Non'}
**Hallucinations détectées:** {len(validation.get('hallucinations_detected', []))}
"""
if validation.get('hallucinations_detected'):
content += "\n**Problèmes:**\n"
for hall in validation['hallucinations_detected']:
content += f"- {hall}\n"
status = "done" if validation.get('is_valid') else "error"
await send_cot_step(f"✅ Validation (#{iteration})", content, status)
async def display_similar_info(similar_info: List[Dict[str, Any]]):
"""Affiche les informations similaires."""
if not similar_info:
return
# Regrouper par base
grouped = {}
for item in similar_info:
db = item['database']
if db not in grouped:
grouped[db] = []
grouped[db].append(item)
elements = []
for db_name, items in grouped.items():
content_parts = [f"### 📚 {db_name.upper()}\n"]
content_parts.append(f"**Catégorie:** {items[0]['category']}")
content_parts.append(f"**Résultats:** {len(items)}\n")
for idx, item in enumerate(items[:3], 1): # Limiter à 3 par base
score = item.get('score', 'N/A')
content_parts.append(f"**{idx}. Score:** {score}")
content_preview = item['content'][:200]
if len(item['content']) > 200:
content_preview += "..."
content_parts.append(f"**Contenu:** {content_preview}\n")
# Créer un élément Chainlit
element = cl.Text(
name=f"similar_{db_name}",
content="\n".join(content_parts),
display="side"
)
elements.append(element)
if elements:
await cl.Message(
content="💡 **Informations similaires trouvées dans d'autres bases**",
elements=elements
).send()
# =============================================================================
# FONCTION PRINCIPALE TRACÉE AVEC LANGSMITH
# =============================================================================
@traceable(name="agent_collaboratif_query", project_name=LANGSMITH_PROJECT)
async def process_query_with_tracing(query: str, thread_id: str) -> Dict[str, Any]:
"""Traite la requête avec traçage LangSmith."""
# Import du workflow
from agent_collaboratif_avid import AgentState, create_agent_workflow
from langchain_core.messages import HumanMessage
app = create_agent_workflow()
initial_state = {
"messages": [HumanMessage(content=query)],
"user_query": query,
"query_analysis": {},
"collected_information": [],
"validation_results": [],
"final_response": "",
"iteration_count": 0,
"errors": [],
"additional_information": []
}
# Analyse de la requête
await send_cot_step("🔄 Démarrage", "Initialisation du workflow...", "running")
final_state = await app.ainvoke(initial_state)
# Affichage progressif
if final_state.get("query_analysis"):
await display_query_analysis(final_state["query_analysis"])
if final_state.get("collected_information"):
await send_cot_step("📊 Collecte d'informations", "Collecte d'informations...", "running")
await display_collection(final_state["collected_information"])
if final_state.get("validation_results"):
for idx, validation in enumerate(final_state["validation_results"], 1):
await display_validation(validation, idx)
# Informations similaires
if final_state.get("additional_information"):
await display_similar_info(final_state["additional_information"])
result = {
"query": query,
"query_analysis": final_state.get("query_analysis", {}),
"collected_information": final_state.get("collected_information", []),
"validation_results": final_state.get("validation_results", []),
"final_response": final_state.get("final_response", ""),
"iteration_count": final_state.get("iteration_count", 0),
"errors": final_state.get("errors", []),
"additional_information": final_state.get("additional_information", []),
"sources_used": [
info["database"]
for info in final_state.get("collected_information", [])
],
"pinecone_index": PINECONE_INDEX_NAME
}
return result
# =============================================================================
# CALLBACKS CHAINLIT
# =============================================================================
#@cl.on_chat_start
#async def start():
# """Initialisation de la session chat."""
# Message de bienvenue avec style
# welcome_msg = f"""# 🎓 Agent Collaboratif - Université Gustave Eiffel
#Bienvenue ! Je suis votre assistant spécialisé en **Ville Durable**.
## 🔧 Configuration
#- **Index Pinecone:** `{PINECONE_INDEX_NAME}`
#- **Modèle:** `{OPENAI_MODEL_NAME}`
#- **Top K résultats:** `{SIMILARITY_TOP_K}`
#- **Max validations:** `{MAX_VALIDATION_LOOPS}`
## 💡 Fonctionnalités
#✅ Recherche multi-bases vectorielles
#✅ Validation anti-hallucination
#✅ Suggestions d'informations connexes
#✅ Traçage LangSmith actif
#**Choisissez un starter ou posez votre question !**
#"""
# await cl.Message(content=welcome_msg).send()
# Sauvegarder les métadonnées de session
# cl.user_session.set("session_started", True)
# cl.user_session.set("query_count", 0)
@cl.set_starters
async def set_starters():
"""Configure les starters avec icônes."""
#return [cl.Starter(label=s["label"], message=s["message"], icon=s["icon"]) for s in STARTERS]
return [
cl.Starter(
label= "🔬 Laboratoires & Mobilité",
message= "Quels sont les laboratoires de l'université Gustave Eiffel travaillant sur la mobilité urbaine durable?",
#icon= "/public/icons/lab.svg"
),
cl.Starter(
label= "🎓 Formations Master",
message= "Je cherche des formations en master sur l'aménagement urbain et le développement durable",
#icon= "/public/icons/education.svg"
),
cl.Starter(
label= "🤝 Collaborations Recherche",
message= "Quels laboratoires ont des axes de recherche similaires en énergie et pourraient collaborer?",
#icon= "/public/icons/collaboration.svg"
),
cl.Starter(
label= "⚙️ Équipements Lab",
message= "Liste les équipements disponibles dans les laboratoires travaillant sur la qualité de l'air",
#icon= "/public/icons/equipment.svg"
),
cl.Starter(
label= "📚 Publications Récentes",
message= "Trouve des publications récentes sur la transition énergétique dans les villes",
#icon= "/public/icons/publications.svg"
),
cl.Starter(
label= "👥 Auteurs & Labs",
message= "Qui sont les auteurs qui publient sur la mobilité douce et dans quels laboratoires?",
#icon= "/public/icons/authors.svg"
),
cl.Starter(
label= "📖 Urbanisme Durable",
message= "Quelles publications traitent de l'urbanisme durable et quand ont-elles été publiées?",
#icon= "/public/icons/urban.svg"
),
cl.Starter(
label= "🏙️ Ville Intelligente",
message= "Compare les formations et les laboratoires sur le thème de la ville intelligente",
#icon= "/public/icons/smart-city.svg"
),
cl.Starter(
label= "🌍 Résilience Urbaine",
message= "Identifie les opportunités de partenariats entre laboratoires sur la résilience urbaine",
#icon= "/public/icons/resilience.svg"
),
cl.Starter(
label= "♻️ Économie Circulaire",
message= "Quelles sont les compétences enseignées dans les formations liées à l'économie circulaire?",
#icon= "/public/icons/circular.svg"
)
]
@cl.on_message
async def main(message: cl.Message):
"""Traitement du message utilisateur."""
query = message.content
thread_id = cl.context.session.thread_id
# Incrémenter le compteur
query_count = cl.user_session.get("query_count", 0) + 1
cl.user_session.set("query_count", query_count)
# Message de traitement
processing_msg = cl.Message(content="")
await processing_msg.send()
try:
# Traitement avec affichage du COT
result = await process_query_with_tracing(query, thread_id)
# Réponse finale
final_response = result["final_response"]
# Métadonnées
metadata_parts = [
f"\n\n---\n### 📊 Métadonnées du traitement",
f"**Sources consultées:** {', '.join(result['sources_used']) if result['sources_used'] else 'Aucune'}",
f"**Itérations:** {result['iteration_count']}",
]
if result['validation_results']:
last_val = result['validation_results'][-1]
metadata_parts.append(f"**Confiance finale:** {last_val.get('confidence_score', 0)}%")
metadata_parts.append(f"**Requête n°:** {query_count}")
full_response = final_response + "\n".join(metadata_parts)
# Mise à jour du message
processing_msg.content = full_response
await processing_msg.update()
# Sauvegarder dans l'historique de session
cl.user_session.set(f"query_{query_count}", {
"query": query,
"response": final_response,
"sources": result['sources_used']
})
except Exception as e:
error_msg = f"❌ **Erreur lors du traitement:**\n\n```\n{str(e)}\n```"
processing_msg.content = error_msg
await processing_msg.update()
# Log dans LangSmith si disponible
#if langsmith_client:
# langsmith_client.create_feedback(
# run_id=thread_id,
# key="error",
# score=0,
# comment=str(e)
# )
@cl.on_chat_resume
async def on_chat_resume(thread: ThreadDict):
"""Reprise d'une conversation existante."""
thread_id = thread["id"]
resume_msg = f"""# 🔄 Conversation reprise
**Thread ID:** `{thread_id}`
Vous pouvez continuer votre conversation ou poser une nouvelle question.
"""
await cl.Message(content=resume_msg).send()
@cl.on_stop
async def on_stop():
"""Callback à l'arrêt de l'exécution."""
await cl.Message(content="⏹️ Traitement interrompu par l'utilisateur.").send()
@cl.on_chat_end
async def on_chat_end():
"""Callback à la fin de la session."""
query_count = cl.user_session.get("query_count", 0)
end_msg = f"""# 👋 Session terminée
Merci d'avoir utilisé l'agent collaboratif !
**Statistiques de session:**
- **Requêtes traitées:** {query_count}
- **Index Pinecone:** {PINECONE_INDEX_NAME}
"""
await cl.Message(content=end_msg).send()
# =============================================================================
# CONFIGURATION DE L'AUTHENTIFICATION (Optionnel)
# =============================================================================
@cl.password_auth_callback
def auth_callback(username: str, password: str) -> Optional[cl.User]:
"""
Callback d'authentification (optionnel).
À configurer selon vos besoins.
"""
# Exemple simple (à remplacer par votre logique)
if username == "admin" and password == "password":
return cl.User(
identifier=username,
metadata={"role": "admin", "provider": "credentials"}
)
return None
# =============================================================================
# CONFIGURATION DU DATA LAYER (Supabase/PostgreSQL)
# =============================================================================
"""
Pour activer le Data Layer avec Supabase, créez un fichier .env:
CHAINLIT_AUTH_SECRET=your-secret-key
LITERAL_API_KEY=your-literal-api-key
LITERAL_API_URL=https://cloud.getliteral.ai
Ou configurez PostgreSQL directement:
DATABASE_URL=postgresql://user:password@host:port/dbname
Le Data Layer sera automatiquement activé si ces variables sont définies.
"""