|
|
""" |
|
|
Agent Collaboratif LangGraph pour l'Université Gustave Eiffel |
|
|
=============================================================== |
|
|
|
|
|
Ce script implémente un agent collaboratif multi-base utilisant LangGraph pour orchestrer |
|
|
des recherches dans 4 bases vectorielles Pinecone liées aux thématiques de Ville Durable. |
|
|
|
|
|
Architecture: |
|
|
- Workflow LangGraph avec nodes spécialisés |
|
|
- Retrievers Langchain-Pinecone avec similarity search + score |
|
|
- Filtrage par catégorie pour chaque base |
|
|
- Validation anti-hallucination en boucle |
|
|
- Orchestration intelligente des recherches |
|
|
|
|
|
Prérequis: |
|
|
- pip install langgraph langchain langchain-pinecone langchain-openai pinecone |
|
|
- Variables d'environnement: PINECONE_API_KEY, OPENAI_API_KEY |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
from typing import TypedDict, Annotated, List, Dict, Any, Sequence |
|
|
from operator import add |
|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain_pinecone import PineconeVectorStore |
|
|
from langchain_core.embeddings import Embeddings |
|
|
from langchain_core.documents import Document |
|
|
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage |
|
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
from langchain_core.output_parsers import JsonOutputParser |
|
|
|
|
|
from langgraph.graph import StateGraph, END |
|
|
from langgraph.prebuilt import ToolNode |
|
|
|
|
|
|
|
|
from pinecone import Pinecone |
|
|
import asyncio |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") |
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
|
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL") |
|
|
OPENAI_MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME") |
|
|
|
|
|
HUGGINGFACE_MODEL = os.environ.get("HUGGINGFACE_MODEL", "sentence-transformers/all-mpnet-base-v2") |
|
|
PINECONE_INDEX_NAME = "all-jdlp" |
|
|
|
|
|
|
|
|
MAX_VALIDATION_LOOPS = 1 |
|
|
SIMILARITY_TOP_K = 10 |
|
|
SIMILARITY_SCORE_THRESHOLD = 0.5 |
|
|
|
|
|
|
|
|
if not PINECONE_API_KEY: |
|
|
raise ValueError("❌ PINECONE_API_KEY non définie. Exécutez: export PINECONE_API_KEY='votre-clé'") |
|
|
if not OPENAI_API_KEY: |
|
|
raise ValueError("❌ OPENAI_API_KEY non définie. Exécutez: export OPENAI_API_KEY='votre-clé'") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HuggingFaceEmbeddings(Embeddings): |
|
|
""" |
|
|
Classe d'embeddings utilisant HuggingFace Transformers. |
|
|
""" |
|
|
|
|
|
def __init__(self, model_name: str = HUGGINGFACE_MODEL): |
|
|
""" |
|
|
Initialise les embeddings HuggingFace. |
|
|
|
|
|
Args: |
|
|
model_name: Nom du modèle HuggingFace à utiliser |
|
|
""" |
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
self.model_name = model_name |
|
|
print(f"🤗 Chargement du modèle HuggingFace: {model_name}") |
|
|
self.model = SentenceTransformer(model_name) |
|
|
self.dimension = self.model.get_sentence_embedding_dimension() |
|
|
print(f"✅ Modèle chargé (dimension: {self.dimension})") |
|
|
|
|
|
def embed_documents(self, texts: List[str]) -> List[List[float]]: |
|
|
""" |
|
|
Génère des embeddings pour une liste de documents. |
|
|
|
|
|
Args: |
|
|
texts: Liste de textes à vectoriser |
|
|
|
|
|
Returns: |
|
|
Liste de vecteurs d'embeddings |
|
|
""" |
|
|
embeddings = self.model.encode(texts, convert_to_numpy=True) |
|
|
return embeddings.tolist() |
|
|
|
|
|
def embed_query(self, text: str) -> List[float]: |
|
|
""" |
|
|
Génère un embedding pour une requête unique. |
|
|
|
|
|
Args: |
|
|
text: Texte de la requête |
|
|
|
|
|
Returns: |
|
|
Vecteur d'embedding |
|
|
""" |
|
|
embedding = self.model.encode(text, convert_to_numpy=True) |
|
|
return embedding.tolist() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentState(TypedDict): |
|
|
"""État global du workflow LangGraph.""" |
|
|
messages: Annotated[Sequence[BaseMessage], add] |
|
|
user_query: str |
|
|
query_analysis: Dict[str, Any] |
|
|
collected_information: List[Dict[str, Any]] |
|
|
validation_results: List[Dict[str, Any]] |
|
|
final_response: str |
|
|
iteration_count: int |
|
|
errors: List[str] |
|
|
additional_information: List[Dict[str, Any]] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PineconeRetrieverManager: |
|
|
"""Gestionnaire centralisé des retrievers Pinecone.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialise le gestionnaire et crée les 4 retrievers spécialisés.""" |
|
|
print("🔧 Initialisation du gestionnaire Pinecone...") |
|
|
|
|
|
self.pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
self.index = self.pc.Index(PINECONE_INDEX_NAME) |
|
|
|
|
|
|
|
|
self.embeddings = HuggingFaceEmbeddings() |
|
|
|
|
|
self.retrievers = { |
|
|
"laboratoires": self._create_retriever( |
|
|
category="FICHELABOTHEMATIQUEAVID", |
|
|
description="Laboratoires et thématiques Ville Durable" |
|
|
), |
|
|
"formations": self._create_retriever( |
|
|
category="FORMATIONTHEMATIQUEAVID", |
|
|
description="Formations liées à la Ville Durable" |
|
|
), |
|
|
"recherche": self._create_retriever( |
|
|
category="RECHERCHETHEMATIQUEAVID", |
|
|
description="Axes de recherche et partenariats" |
|
|
), |
|
|
"publications": self._create_retriever( |
|
|
category="PUBLICATIONTHEMATIQUEAVID", |
|
|
description="Publications scientifiques" |
|
|
) |
|
|
} |
|
|
|
|
|
print("✅ Gestionnaire Pinecone initialisé avec 4 retrievers\n") |
|
|
|
|
|
def _create_retriever(self, category: str, description: str): |
|
|
"""Crée un retriever Pinecone avec filtrage par catégorie.""" |
|
|
vectorstore = PineconeVectorStore( |
|
|
index=self.index, |
|
|
embedding=self.embeddings, |
|
|
text_key="text", |
|
|
namespace="" |
|
|
) |
|
|
|
|
|
retriever = vectorstore.as_retriever( |
|
|
search_type="similarity_score_threshold", |
|
|
search_kwargs={ |
|
|
"k": SIMILARITY_TOP_K, |
|
|
"score_threshold": SIMILARITY_SCORE_THRESHOLD, |
|
|
"filter": {"categorie": {"$eq": category}} |
|
|
} |
|
|
) |
|
|
|
|
|
retriever.metadata = { |
|
|
"category": category, |
|
|
"description": description |
|
|
} |
|
|
|
|
|
return retriever |
|
|
|
|
|
def get_retriever(self, retriever_name: str): |
|
|
"""Récupère un retriever par son nom.""" |
|
|
return self.retrievers.get(retriever_name) |
|
|
|
|
|
def search_all_databases(self, query: str, exclude_categories: List[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Recherche dans toutes les bases pour trouver des informations similaires. |
|
|
|
|
|
Args: |
|
|
query: Requête de recherche |
|
|
exclude_categories: Catégories à exclure de la recherche |
|
|
|
|
|
Returns: |
|
|
Liste des informations similaires trouvées |
|
|
""" |
|
|
exclude_categories = exclude_categories or [] |
|
|
similar_info = [] |
|
|
|
|
|
for db_name, retriever in self.retrievers.items(): |
|
|
if retriever.metadata["category"] in exclude_categories: |
|
|
continue |
|
|
|
|
|
try: |
|
|
documents = retriever.get_relevant_documents(query) |
|
|
|
|
|
if documents: |
|
|
for doc in documents: |
|
|
similar_info.append({ |
|
|
"database": db_name, |
|
|
"category": retriever.metadata["category"], |
|
|
"content": doc.page_content, |
|
|
"metadata": doc.metadata, |
|
|
"score": getattr(doc, 'score', None) |
|
|
}) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Erreur recherche similaires dans '{db_name}': {str(e)}") |
|
|
|
|
|
return similar_info |
|
|
|
|
|
retriever_manager = PineconeRetrieverManager() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_query_node(state: AgentState) -> AgentState: |
|
|
"""Node d'analyse de la requête utilisateur.""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"📊 NODE 1: ANALYSE DE LA REQUÊTE") |
|
|
print(f"{'='*80}") |
|
|
print(f"🔍 Requête: {state['user_query']}\n") |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model=OPENAI_MODEL_NAME, |
|
|
base_url=OPENAI_BASE_URL, |
|
|
api_key=OPENAI_API_KEY, |
|
|
temperature=0 |
|
|
) |
|
|
|
|
|
analysis_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Tu es un expert de l'Université Gustave Eiffel spécialisé dans les thématiques de Ville Durable. |
|
|
|
|
|
Analyse la requête et détermine quelle(s) base(s) de données interroger parmi: |
|
|
|
|
|
1. **laboratoires** (FICHELABOTHEMATIQUEAVID) |
|
|
2. **formations** (FORMATIONTHEMATIQUEAVID) |
|
|
3. **recherche** (RECHERCHETHEMATIQUEAVID) |
|
|
4. **publications** (PUBLICATIONTHEMATIQUEAVID) |
|
|
|
|
|
Réponds UNIQUEMENT en JSON valide."""), |
|
|
("human", """{user_query} |
|
|
|
|
|
Format de réponse attendu: |
|
|
{{ |
|
|
"databases_to_query": ["laboratoires", "formations", "recherche", "publications"], |
|
|
"priorities": {{ |
|
|
"laboratoires": "high", |
|
|
"formations": "medium", |
|
|
"recherche": "low", |
|
|
"publications": "high" |
|
|
}}, |
|
|
"optimized_queries": {{ |
|
|
"laboratoires": "requête optimisée", |
|
|
"formations": "requête optimisée", |
|
|
"recherche": "requête optimisée", |
|
|
"publications": "requête optimisée" |
|
|
}}, |
|
|
"analysis_summary": "résumé de l'analyse" |
|
|
}}""") |
|
|
]) |
|
|
|
|
|
json_parser = JsonOutputParser() |
|
|
analysis_chain = analysis_prompt | llm | json_parser |
|
|
|
|
|
try: |
|
|
query_analysis = analysis_chain.invoke({"user_query": state["user_query"]}) |
|
|
|
|
|
print(f"✅ Bases identifiées: {', '.join(query_analysis['databases_to_query'])}") |
|
|
print(f"✅ {query_analysis['analysis_summary']}\n") |
|
|
|
|
|
state["query_analysis"] = query_analysis |
|
|
state["messages"].append(AIMessage(content=f"Analyse terminée: {query_analysis['analysis_summary']}")) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur lors de l'analyse: {str(e)}" |
|
|
print(f"❌ {error_msg}") |
|
|
state["errors"].append(error_msg) |
|
|
state["query_analysis"] = { |
|
|
"databases_to_query": ["laboratoires"], |
|
|
"priorities": {"laboratoires": "high"}, |
|
|
"optimized_queries": {"laboratoires": state["user_query"]}, |
|
|
"analysis_summary": "Analyse par défaut suite à erreur" |
|
|
} |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_information_node(state: AgentState) -> AgentState: |
|
|
"""Node de collecte d'informations depuis les bases Pinecone.""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"🔎 NODE 2: COLLECTE D'INFORMATIONS DEPUIS PINECONE") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
query_analysis = state["query_analysis"] |
|
|
collected_info = [] |
|
|
|
|
|
priorities_order = {"high": 0, "medium": 1, "low": 2} |
|
|
databases = sorted( |
|
|
query_analysis["databases_to_query"], |
|
|
key=lambda db: priorities_order.get(query_analysis["priorities"].get(db, "low"), 2) |
|
|
) |
|
|
|
|
|
for db_name in databases: |
|
|
retriever = retriever_manager.get_retriever(db_name) |
|
|
if not retriever: |
|
|
print(f"⚠️ Retriever '{db_name}' non trouvé, ignoré.") |
|
|
continue |
|
|
|
|
|
query = query_analysis["optimized_queries"].get(db_name, state["user_query"]) |
|
|
priority = query_analysis["priorities"].get(db_name, "low") |
|
|
|
|
|
print(f"🔍 Recherche dans '{db_name}' (priorité: {priority})") |
|
|
print(f" Requête: {query[:80]}...") |
|
|
|
|
|
try: |
|
|
documents = retriever.get_relevant_documents(query) |
|
|
|
|
|
if documents: |
|
|
print(f" ✅ {len(documents)} résultat(s) trouvé(s)") |
|
|
|
|
|
results = [] |
|
|
for doc in documents: |
|
|
results.append({ |
|
|
"content": doc.page_content, |
|
|
"metadata": doc.metadata, |
|
|
"score": getattr(doc, 'score', None) |
|
|
}) |
|
|
|
|
|
collected_info.append({ |
|
|
"database": db_name, |
|
|
"category": retriever.metadata["category"], |
|
|
"query": query, |
|
|
"priority": priority, |
|
|
"results_count": len(results), |
|
|
"results": results |
|
|
}) |
|
|
else: |
|
|
print(f" ℹ️ Aucun résultat") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur lors de la recherche dans '{db_name}': {str(e)}" |
|
|
print(f" ❌ {error_msg}") |
|
|
state["errors"].append(error_msg) |
|
|
|
|
|
print(f"\n✅ Collecte terminée: {len(collected_info)} base(s) interrogée(s)\n") |
|
|
|
|
|
state["collected_information"] = collected_info |
|
|
state["messages"].append(AIMessage( |
|
|
content=f"Collecte terminée depuis {len(collected_info)} bases Pinecone" |
|
|
)) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_response_node(state: AgentState) -> AgentState: |
|
|
"""Node de génération de la réponse finale.""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"✏️ NODE 3: GÉNÉRATION DE LA RÉPONSE") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model=OPENAI_MODEL_NAME, |
|
|
base_url=OPENAI_BASE_URL, |
|
|
api_key=OPENAI_API_KEY, |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
context_parts = [] |
|
|
for info in state["collected_information"]: |
|
|
context_parts.append(f"\n### Base: {info['database']} (Catégorie: {info['category']})") |
|
|
context_parts.append(f"Requête: {info['query']}") |
|
|
context_parts.append(f"Résultats: {info['results_count']}") |
|
|
|
|
|
for idx, result in enumerate(info['results'], 1): |
|
|
context_parts.append(f"\nRésultat {idx}:") |
|
|
context_parts.append(f"Score: {result.get('score', 'N/A')}") |
|
|
context_parts.append(f"Contenu: {result['content'][:500]}...") |
|
|
if result['metadata']: |
|
|
context_parts.append(f"Métadonnées: {json.dumps(result['metadata'], ensure_ascii=False)}") |
|
|
|
|
|
context = "\n".join(context_parts) |
|
|
|
|
|
generation_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Tu es un assistant expert de l'Université Gustave Eiffel spécialisé en Ville Durable. |
|
|
|
|
|
RÈGLES STRICTES: |
|
|
1. Base ta réponse EXCLUSIVEMENT sur les informations fournies dans le contexte Pinecone |
|
|
2. Ne JAMAIS inventer ou extrapoler d'informations |
|
|
3. Cite précisément les sources (nom de la base, catégorie Pinecone) |
|
|
4. Si une information n'est pas dans les sources, indique-le clairement |
|
|
5. Structure ta réponse de manière claire et professionnelle |
|
|
6. Mentionne les métadonnées pertinentes (laboratoires, formations, auteurs, etc.)"""), |
|
|
("human", """REQUÊTE UTILISATEUR: |
|
|
{user_query} |
|
|
|
|
|
CONTEXTE PINECONE (SOURCES VÉRIFIÉES): |
|
|
{context} |
|
|
|
|
|
Génère une réponse professionnelle basée uniquement sur ces sources.""") |
|
|
]) |
|
|
|
|
|
generation_chain = generation_prompt | llm |
|
|
|
|
|
try: |
|
|
response = generation_chain.invoke({ |
|
|
"user_query": state["user_query"], |
|
|
"context": context |
|
|
}) |
|
|
|
|
|
final_response = response.content |
|
|
print(f"✅ Réponse générée ({len(final_response)} caractères)\n") |
|
|
|
|
|
state["final_response"] = final_response |
|
|
state["messages"].append(AIMessage(content=final_response)) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur lors de la génération: {str(e)}" |
|
|
print(f"❌ {error_msg}") |
|
|
state["errors"].append(error_msg) |
|
|
state["final_response"] = f"Erreur lors de la génération de la réponse: {str(e)}" |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_response_node(state: AgentState) -> AgentState: |
|
|
"""Node de validation anti-hallucination.""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"✅ NODE 4: VALIDATION ANTI-HALLUCINATION") |
|
|
print(f"{'='*80}") |
|
|
|
|
|
iteration = state["iteration_count"] + 1 |
|
|
print(f"🔄 Itération {iteration}/{MAX_VALIDATION_LOOPS}\n") |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model=OPENAI_MODEL_NAME, |
|
|
base_url=OPENAI_BASE_URL, |
|
|
api_key=OPENAI_API_KEY, |
|
|
temperature=0 |
|
|
) |
|
|
|
|
|
validation_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Tu es un validateur strict pour l'Université Gustave Eiffel. |
|
|
|
|
|
Vérifie que CHAQUE élément de la réponse est STRICTEMENT basé sur les sources Pinecone fournies. |
|
|
|
|
|
Sois IMPITOYABLE: mieux vaut rejeter une bonne réponse que laisser passer une hallucination."""), |
|
|
("human", """RÉPONSE À VALIDER: |
|
|
{response} |
|
|
|
|
|
SOURCES PINECONE (VÉRITÉ ABSOLUE): |
|
|
{sources} |
|
|
|
|
|
Réponds en JSON valide: |
|
|
{{ |
|
|
"is_valid": true/false, |
|
|
"confidence_score": 0-100, |
|
|
"hallucinations_detected": ["liste précise des hallucinations"], |
|
|
"missing_information": ["informations manquantes si dans sources"], |
|
|
"incorrect_facts": ["faits incorrects ou mal attribués"], |
|
|
"validation_message": "message détaillé avec recommandations" |
|
|
}}""") |
|
|
]) |
|
|
|
|
|
json_parser = JsonOutputParser() |
|
|
validation_chain = validation_prompt | llm | json_parser |
|
|
|
|
|
try: |
|
|
sources_json = json.dumps( |
|
|
state["collected_information"], |
|
|
ensure_ascii=False, |
|
|
indent=2 |
|
|
) |
|
|
|
|
|
validation_result = validation_chain.invoke({ |
|
|
"response": state["final_response"], |
|
|
"sources": sources_json |
|
|
}) |
|
|
|
|
|
print(f"📊 Confiance: {validation_result['confidence_score']}%") |
|
|
print(f"📊 Valide: {validation_result['is_valid']}") |
|
|
|
|
|
if validation_result['hallucinations_detected']: |
|
|
print(f"⚠️ Hallucinations détectées: {len(validation_result['hallucinations_detected'])}") |
|
|
for hall in validation_result['hallucinations_detected']: |
|
|
print(f" - {hall}") |
|
|
else: |
|
|
print(f"✅ Aucune hallucination détectée") |
|
|
|
|
|
state["validation_results"].append(validation_result) |
|
|
state["iteration_count"] = iteration |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur lors de la validation: {str(e)}" |
|
|
print(f"❌ {error_msg}") |
|
|
state["errors"].append(error_msg) |
|
|
|
|
|
validation_result = { |
|
|
"is_valid": False, |
|
|
"confidence_score": 0, |
|
|
"hallucinations_detected": [f"Erreur de validation: {str(e)}"], |
|
|
"missing_information": [], |
|
|
"incorrect_facts": [], |
|
|
"validation_message": "Erreur lors de la validation" |
|
|
} |
|
|
state["validation_results"].append(validation_result) |
|
|
state["iteration_count"] = iteration |
|
|
|
|
|
print() |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def refine_response_node(state: AgentState) -> AgentState: |
|
|
"""Node de refinement de la réponse.""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"⚙️ NODE 5: REFINEMENT (CORRECTION)") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
last_validation = state["validation_results"][-1] |
|
|
|
|
|
print(f"🔧 Correction des problèmes détectés:") |
|
|
print(f" - Hallucinations: {len(last_validation['hallucinations_detected'])}") |
|
|
print(f" - Faits incorrects: {len(last_validation['incorrect_facts'])}") |
|
|
print(f" - Infos manquantes: {len(last_validation['missing_information'])}\n") |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model=OPENAI_MODEL_NAME, |
|
|
base_url=OPENAI_BASE_URL, |
|
|
api_key=OPENAI_API_KEY, |
|
|
temperature=0.2 |
|
|
) |
|
|
|
|
|
refinement_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Tu es un correcteur expert pour l'Université Gustave Eiffel. |
|
|
|
|
|
Corrige la réponse précédente en éliminant TOUTES les hallucinations et erreurs."""), |
|
|
("human", """RÉPONSE PRÉCÉDENTE (AVEC ERREURS): |
|
|
{previous_response} |
|
|
|
|
|
PROBLÈMES DÉTECTÉS: |
|
|
{validation_issues} |
|
|
|
|
|
SOURCES PINECONE (VÉRITÉ ABSOLUE): |
|
|
{sources} |
|
|
|
|
|
Génère une réponse corrigée, précise et vérifiable.""") |
|
|
]) |
|
|
|
|
|
refinement_chain = refinement_prompt | llm |
|
|
|
|
|
try: |
|
|
validation_issues = json.dumps({ |
|
|
"hallucinations": last_validation['hallucinations_detected'], |
|
|
"incorrect_facts": last_validation['incorrect_facts'], |
|
|
"missing_information": last_validation['missing_information'], |
|
|
"validation_message": last_validation['validation_message'] |
|
|
}, ensure_ascii=False, indent=2) |
|
|
|
|
|
sources_json = json.dumps( |
|
|
state["collected_information"], |
|
|
ensure_ascii=False, |
|
|
indent=2 |
|
|
) |
|
|
|
|
|
response = refinement_chain.invoke({ |
|
|
"previous_response": state["final_response"], |
|
|
"validation_issues": validation_issues, |
|
|
"sources": sources_json |
|
|
}) |
|
|
|
|
|
refined_response = response.content |
|
|
print(f"✅ Réponse corrigée générée ({len(refined_response)} caractères)\n") |
|
|
|
|
|
state["final_response"] = refined_response |
|
|
state["messages"].append(AIMessage( |
|
|
content=f"Réponse corrigée (itération {state['iteration_count']})" |
|
|
)) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur lors du refinement: {str(e)}" |
|
|
print(f"❌ {error_msg}") |
|
|
state["errors"].append(error_msg) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_similar_information_node(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Node de collecte d'informations similaires depuis les autres bases. |
|
|
""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"🔗 NODE 6: COLLECTE D'INFORMATIONS SIMILAIRES") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
|
|
|
used_categories = [info["category"] for info in state["collected_information"]] |
|
|
|
|
|
|
|
|
print(f"🔍 Recherche d'informations similaires dans les bases non consultées...") |
|
|
similar_info = retriever_manager.search_all_databases( |
|
|
query=state["user_query"], |
|
|
exclude_categories=used_categories |
|
|
) |
|
|
|
|
|
|
|
|
if state.get("final_response"): |
|
|
print(f"🔍 Recherche basée sur la réponse finale...") |
|
|
response_based_info = retriever_manager.search_all_databases( |
|
|
query=state["final_response"][:500], |
|
|
exclude_categories=used_categories |
|
|
) |
|
|
|
|
|
|
|
|
for info in response_based_info: |
|
|
if info not in similar_info: |
|
|
similar_info.append(info) |
|
|
|
|
|
print(f"✅ {len(similar_info)} information(s) similaire(s) trouvée(s)\n") |
|
|
|
|
|
state["additional_information"] = similar_info |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def should_collect_information(state: AgentState) -> str: |
|
|
if state.get("query_analysis") and state["query_analysis"].get("databases_to_query"): |
|
|
return "collect" |
|
|
return "end" |
|
|
|
|
|
def should_generate_response(state: AgentState) -> str: |
|
|
if state.get("collected_information") and len(state["collected_information"]) > 0: |
|
|
return "generate" |
|
|
return "end" |
|
|
|
|
|
def should_validate(state: AgentState) -> str: |
|
|
if state.get("final_response") and state["final_response"]: |
|
|
return "validate" |
|
|
return "end" |
|
|
|
|
|
def should_refine_or_collect_similar(state: AgentState) -> str: |
|
|
if not state.get("validation_results") or len(state["validation_results"]) == 0: |
|
|
return "collect_similar" |
|
|
|
|
|
last_validation = state["validation_results"][-1] |
|
|
iteration = state["iteration_count"] |
|
|
|
|
|
is_valid = last_validation.get("is_valid", False) |
|
|
confidence = last_validation.get("confidence_score", 0) |
|
|
|
|
|
if is_valid and confidence >= 85: |
|
|
print(f"✅ Validation réussie (confiance: {confidence}%) - Collecte d'infos similaires\n") |
|
|
return "collect_similar" |
|
|
|
|
|
if iteration >= MAX_VALIDATION_LOOPS: |
|
|
print(f"⚠️ Nombre maximum d'itérations atteint ({MAX_VALIDATION_LOOPS}) - Collecte d'infos similaires\n") |
|
|
return "collect_similar" |
|
|
|
|
|
print(f"🔄 Refinement nécessaire (confiance: {confidence}%, itération {iteration}/{MAX_VALIDATION_LOOPS})\n") |
|
|
return "refine" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_agent_workflow() -> StateGraph: |
|
|
"""Crée et configure le workflow LangGraph complet.""" |
|
|
print("\n🗺️ Construction du workflow LangGraph...") |
|
|
|
|
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
workflow.add_node("analyze_query", analyze_query_node) |
|
|
workflow.add_node("collect_information", collect_information_node) |
|
|
workflow.add_node("generate_response", generate_response_node) |
|
|
workflow.add_node("validate_response", validate_response_node) |
|
|
workflow.add_node("refine_response", refine_response_node) |
|
|
workflow.add_node("collect_similar_information", collect_similar_information_node) |
|
|
|
|
|
workflow.set_entry_point("analyze_query") |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"analyze_query", |
|
|
should_collect_information, |
|
|
{ |
|
|
"collect": "collect_information", |
|
|
"end": END |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"collect_information", |
|
|
should_generate_response, |
|
|
{ |
|
|
"generate": "generate_response", |
|
|
"end": END |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"generate_response", |
|
|
should_validate, |
|
|
{ |
|
|
"validate": "validate_response", |
|
|
"end": END |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"validate_response", |
|
|
should_refine_or_collect_similar, |
|
|
{ |
|
|
"refine": "refine_response", |
|
|
"collect_similar": "collect_similar_information" |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_edge("refine_response", "validate_response") |
|
|
workflow.add_edge("collect_similar_information", END) |
|
|
|
|
|
|
|
|
|
|
|
app = workflow.compile() |
|
|
|
|
|
|
|
|
print("✅ Workflow LangGraph construit avec succès\n") |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_collaborative_agent(user_query: str) -> Dict[str, Any]: |
|
|
"""Exécute le workflow complet de l'agent collaboratif.""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"🚀 AGENT COLLABORATIF - UNIVERSITÉ GUSTAVE EIFFEL") |
|
|
print(f"{'='*80}") |
|
|
print(f"🔍 Requête: {user_query}\n") |
|
|
|
|
|
app = create_agent_workflow() |
|
|
|
|
|
initial_state = { |
|
|
"messages": [HumanMessage(content=user_query)], |
|
|
"user_query": user_query, |
|
|
"query_analysis": {}, |
|
|
"collected_information": [], |
|
|
"validation_results": [], |
|
|
"final_response": "", |
|
|
"iteration_count": 0, |
|
|
"errors": [], |
|
|
"additional_information": [] |
|
|
} |
|
|
|
|
|
print(f"{'='*80}") |
|
|
print(f"⚙️ EXÉCUTION DU WORKFLOW") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
try: |
|
|
final_state = await app.ainvoke(initial_state) |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"✨ PROCESSUS TERMINÉ") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
result = { |
|
|
"query": user_query, |
|
|
"query_analysis": final_state.get("query_analysis", {}), |
|
|
"collected_information": final_state.get("collected_information", []), |
|
|
"validation_results": final_state.get("validation_results", []), |
|
|
"final_response": final_state.get("final_response", ""), |
|
|
"iteration_count": final_state.get("iteration_count", 0), |
|
|
"errors": final_state.get("errors", []), |
|
|
"additional_information": final_state.get("additional_information", []), |
|
|
"sources_used": [ |
|
|
info["database"] |
|
|
for info in final_state.get("collected_information", []) |
|
|
], |
|
|
"pinecone_index": PINECONE_INDEX_NAME |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur lors de l'exécution du workflow: {str(e)}" |
|
|
print(f"\n❌ {error_msg}\n") |
|
|
|
|
|
return { |
|
|
"query": user_query, |
|
|
"query_analysis": {}, |
|
|
"collected_information": [], |
|
|
"validation_results": [], |
|
|
"final_response": f"Erreur: {error_msg}", |
|
|
"iteration_count": 0, |
|
|
"errors": [error_msg], |
|
|
"additional_information": [], |
|
|
"sources_used": [], |
|
|
"pinecone_index": PINECONE_INDEX_NAME |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def display_results(result: Dict[str, Any]) -> None: |
|
|
""" |
|
|
Affiche les résultats de manière formatée et lisible. |
|
|
|
|
|
Args: |
|
|
result: Dictionnaire des résultats du workflow |
|
|
""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"📋 RÉPONSE FINALE") |
|
|
print(f"{'='*80}") |
|
|
print(result["final_response"]) |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"📊 MÉTADONNÉES DU TRAITEMENT") |
|
|
print(f"{'='*80}") |
|
|
print(f"🗄️ Index Pinecone: {result['pinecone_index']}") |
|
|
print(f"📚 Sources consultées: {', '.join(result['sources_used']) if result['sources_used'] else 'Aucune'}") |
|
|
print(f"🔄 Itérations de validation: {result['iteration_count']}") |
|
|
|
|
|
if result['validation_results']: |
|
|
last_validation = result['validation_results'][-1] |
|
|
print(f"✅ Score de confiance final: {last_validation.get('confidence_score', 0)}%") |
|
|
print(f"✅ Validation finale: {'Réussie' if last_validation.get('is_valid') else 'Échouée'}") |
|
|
|
|
|
hallucinations = last_validation.get('hallucinations_detected', []) |
|
|
print(f"⚠️ Hallucinations détectées: {len(hallucinations)}") |
|
|
|
|
|
if hallucinations: |
|
|
print(f"\n⚠️ HALLUCINATIONS CORRIGÉES:") |
|
|
for i, hall in enumerate(hallucinations, 1): |
|
|
print(f" {i}. {hall}") |
|
|
|
|
|
if result['errors']: |
|
|
print(f"\n❌ ERREURS RENCONTRÉES:") |
|
|
for i, error in enumerate(result['errors'], 1): |
|
|
print(f" {i}. {error}") |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"📈 DÉTAILS DE LA COLLECTE") |
|
|
print(f"{'='*80}") |
|
|
for info in result['collected_information']: |
|
|
print(f"\n📦 Base: {info['database']}") |
|
|
print(f" Catégorie: {info['category']}") |
|
|
print(f" Priorité: {info['priority']}") |
|
|
print(f" Résultats: {info['results_count']}") |
|
|
print(f" Requête: {info['query'][:80]}...") |
|
|
|
|
|
|
|
|
if result.get('additional_information') and len(result['additional_information']) > 0: |
|
|
print(f"\n{'='*80}") |
|
|
print(f"💡 LES INFORMATIONS QUI AURAIENT PU VOUS INTÉRESSER") |
|
|
print(f"{'='*80}") |
|
|
print(f"\nInformations similaires ou apparentées trouvées dans d'autres bases:\n") |
|
|
|
|
|
|
|
|
grouped_info = {} |
|
|
for info in result['additional_information']: |
|
|
db_name = info['database'] |
|
|
if db_name not in grouped_info: |
|
|
grouped_info[db_name] = [] |
|
|
grouped_info[db_name].append(info) |
|
|
|
|
|
|
|
|
for db_name, items in grouped_info.items(): |
|
|
print(f"\n{'─'*80}") |
|
|
print(f"📚 Base: {db_name.upper()}") |
|
|
print(f" Catégorie Pinecone: {items[0]['category']}") |
|
|
print(f" Nombre de résultats: {len(items)}") |
|
|
print(f"{'─'*80}\n") |
|
|
|
|
|
for idx, item in enumerate(items, 1): |
|
|
print(f" Résultat {idx}:") |
|
|
print(f" ├─ Score de similarité: {item['score']:.4f}" if item.get('score') else " ├─ Score: N/A") |
|
|
|
|
|
|
|
|
content_preview = item['content'][:300] |
|
|
if len(item['content']) > 300: |
|
|
content_preview += "..." |
|
|
print(f" ├─ Contenu: {content_preview}") |
|
|
|
|
|
|
|
|
if item.get('metadata'): |
|
|
metadata = item['metadata'] |
|
|
print(f" └─ Sources complètes:") |
|
|
|
|
|
|
|
|
if 'titre' in metadata or 'title' in metadata: |
|
|
titre = metadata.get('titre') or metadata.get('title') |
|
|
print(f" • Titre: {titre}") |
|
|
|
|
|
if 'laboratoire' in metadata: |
|
|
print(f" • Laboratoire: {metadata['laboratoire']}") |
|
|
|
|
|
if 'formation' in metadata: |
|
|
print(f" • Formation: {metadata['formation']}") |
|
|
|
|
|
if 'auteur' in metadata or 'auteurs' in metadata or 'authors' in metadata: |
|
|
auteurs = metadata.get('auteur') or metadata.get('auteurs') or metadata.get('authors') |
|
|
print(f" • Auteur(s): {auteurs}") |
|
|
|
|
|
if 'date' in metadata or 'annee' in metadata or 'year' in metadata: |
|
|
date = metadata.get('date') or metadata.get('annee') or metadata.get('year') |
|
|
print(f" • Date/Année: {date}") |
|
|
|
|
|
if 'thematique' in metadata or 'thematiques' in metadata: |
|
|
them = metadata.get('thematique') or metadata.get('thematiques') |
|
|
print(f" • Thématique(s): {them}") |
|
|
|
|
|
if 'niveau' in metadata: |
|
|
print(f" • Niveau: {metadata['niveau']}") |
|
|
|
|
|
if 'competences' in metadata: |
|
|
print(f" • Compétences: {metadata['competences']}") |
|
|
|
|
|
if 'equipements' in metadata: |
|
|
print(f" • Équipements: {metadata['equipements']}") |
|
|
|
|
|
if 'axe_recherche' in metadata: |
|
|
print(f" • Axe de recherche: {metadata['axe_recherche']}") |
|
|
|
|
|
if 'partenaires' in metadata or 'collaborations' in metadata: |
|
|
part = metadata.get('partenaires') or metadata.get('collaborations') |
|
|
print(f" • Partenaires/Collaborations: {part}") |
|
|
|
|
|
if 'url' in metadata or 'lien' in metadata: |
|
|
url = metadata.get('url') or metadata.get('lien') |
|
|
print(f" • Lien: {url}") |
|
|
|
|
|
if 'doi' in metadata: |
|
|
print(f" • DOI: {metadata['doi']}") |
|
|
|
|
|
if 'source' in metadata: |
|
|
print(f" • Source document: {metadata['source']}") |
|
|
|
|
|
|
|
|
displayed_keys = ['titre', 'title', 'laboratoire', 'formation', 'auteur', 'auteurs', |
|
|
'authors', 'date', 'annee', 'year', 'thematique', 'thematiques', |
|
|
'niveau', 'competences', 'equipements', 'axe_recherche', |
|
|
'partenaires', 'collaborations', 'url', 'lien', 'doi', 'source', |
|
|
'categorie', 'text'] |
|
|
|
|
|
other_metadata = {k: v for k, v in metadata.items() if k not in displayed_keys} |
|
|
if other_metadata: |
|
|
print(f" • Autres informations: {json.dumps(other_metadata, ensure_ascii=False, indent=8)}") |
|
|
|
|
|
print() |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"💬 INTERPRÉTATION DES RÉSULTATS SIMILAIRES") |
|
|
print(f"{'='*80}") |
|
|
print("Ces informations proviennent de bases qui n'ont pas été prioritaires pour") |
|
|
print("votre requête initiale, mais qui contiennent des éléments apparentés.") |
|
|
print("Elles peuvent enrichir votre compréhension du sujet ou vous orienter") |
|
|
print("vers des domaines connexes intéressants.\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Fonction principale de l'application.""" |
|
|
|
|
|
exemples_requetes = [ |
|
|
"Quels sont les laboratoires de l'université Gustave Eiffel travaillant sur la mobilité urbaine durable?", |
|
|
"Je cherche des formations en master sur l'aménagement urbain et le développement durable", |
|
|
"Quels laboratoires ont des axes de recherche similaires en énergie et pourraient collaborer?", |
|
|
"Liste les équipements disponibles dans les laboratoires travaillant sur la qualité de l'air", |
|
|
"Trouve des publications récentes sur la transition énergétique dans les villes", |
|
|
"Qui sont les auteurs qui publient sur la mobilité douce et dans quels laboratoires?", |
|
|
"Quelles publications traitent de l'urbanisme durable et quand ont-elles été publiées?", |
|
|
"Compare les formations et les laboratoires sur le thème de la ville intelligente", |
|
|
"Identifie les opportunités de partenariats entre laboratoires sur la résilience urbaine", |
|
|
"Quelles sont les compétences enseignées dans les formations liées à l'économie circulaire?" |
|
|
] |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"🎓 AGENT COLLABORATIF - UNIVERSITÉ GUSTAVE EIFFEL") |
|
|
print(f"{'='*80}") |
|
|
print(f"🗄️ Index Pinecone: {PINECONE_INDEX_NAME}") |
|
|
print(f"🤖 Modèle: {OPENAI_MODEL_NAME}") |
|
|
print(f"🌐 Base URL: {OPENAI_BASE_URL}") |
|
|
print(f"🤗 Embeddings: {HUGGINGFACE_MODEL}") |
|
|
print(f"🔄 Max itérations: {MAX_VALIDATION_LOOPS}") |
|
|
print(f"🎯 Top K résultats: {SIMILARITY_TOP_K}") |
|
|
print(f"📊 Seuil de similarité: {SIMILARITY_SCORE_THRESHOLD}") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
print("📚 EXEMPLES DE REQUÊTES DISPONIBLES:") |
|
|
print("="*80) |
|
|
for i, req in enumerate(exemples_requetes, 1): |
|
|
print(f"{i:2d}. {req}") |
|
|
print("="*80 + "\n") |
|
|
|
|
|
selected_query = exemples_requetes[0] |
|
|
|
|
|
print(f"🎯 Requête sélectionnée: {selected_query}\n") |
|
|
|
|
|
result = await run_collaborative_agent(selected_query) |
|
|
|
|
|
display_results(result) |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"✅ TRAITEMENT TERMINÉ AVEC SUCCÈS") |
|
|
print(f"{'='*80}\n") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
""" |
|
|
Point d'entrée principal du script. |
|
|
|
|
|
Configuration requise: |
|
|
1. Variables d'environnement: |
|
|
export PINECONE_API_KEY="votre-clé-pinecone" |
|
|
export OPENAI_API_KEY="votre-clé-openai" |
|
|
export OPENAI_BASE_URL="https://votre-endpoint.com/v1" # Optionnel |
|
|
export OPENAI_MODEL_NAME="gpt-4" # Optionnel |
|
|
export HUGGINGFACE_MODEL="sentence-transformers/all-mpnet-base-v2" # Optionnel |
|
|
|
|
|
2. Dépendances: |
|
|
pip install langgraph langchain langchain-pinecone langchain-openai pinecone-client sentence-transformers |
|
|
|
|
|
3. Structure Pinecone: |
|
|
- Index: "all-jdlp" |
|
|
- Dimension: compatible avec le modèle HuggingFace (ex: 768) |
|
|
- Métrique: cosine |
|
|
- Catégories: FICHELABOTHEMATIQUEAVID, FORMATIONTHEMATIQUEAVID, |
|
|
RECHERCHETHEMATIQUEAVID, PUBLICATIONTHEMATIQUEAVID |
|
|
|
|
|
Utilisation: |
|
|
- Développement: python script.py |
|
|
- Production: Intégrer dans une API FastAPI/Flask |
|
|
- Tests: pytest script.py --asyncio-mode=auto |
|
|
""" |
|
|
|
|
|
asyncio.run(main()) |