|
|
import asyncio |
|
|
from typing import Dict, Optional |
|
|
from collections import defaultdict |
|
|
|
|
|
from .rabbit_repo import RabbitRepo |
|
|
from .config import settings |
|
|
from .runners.base import ILLMRunner |
|
|
|
|
|
class LLMService: |
|
|
def __init__(self, publisher: RabbitRepo, runner_factory): |
|
|
self._pub = publisher |
|
|
self._runner_factory = runner_factory |
|
|
self._sessions: Dict[str, dict] = {} |
|
|
self._ready = asyncio.Event() |
|
|
self._ready.set() |
|
|
|
|
|
async def init(self): |
|
|
|
|
|
pass |
|
|
|
|
|
async def _set_result(self, obj: dict, message: str, success: bool, queue: str, check_system: bool=False): |
|
|
obj["ResultMessage"] = message |
|
|
obj["ResultSuccess"] = success |
|
|
obj["LlmMessage"] = (f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>") |
|
|
|
|
|
if not (check_system and obj.get("IsSystemLlm")): |
|
|
await self._pub.publish(queue, obj) |
|
|
|
|
|
async def StartProcess(self, llmServiceObj: dict): |
|
|
session_id = f"{llmServiceObj['RequestSessionId']}_{llmServiceObj['LLMRunnerType']}" |
|
|
llmServiceObj["SessionId"] = session_id |
|
|
|
|
|
|
|
|
try: |
|
|
await asyncio.wait_for(self._ready.wait(), timeout=120) |
|
|
except asyncio.TimeoutError: |
|
|
await self._set_result(llmServiceObj, "Timed out waiting for initialization.", False, "llmServiceMessage", True) |
|
|
return |
|
|
|
|
|
sess = self._sessions.get(session_id) |
|
|
is_runner_null = not sess or not sess.get("Runner") |
|
|
|
|
|
create_new = is_runner_null or sess["Runner"].IsStateFailed |
|
|
if create_new: |
|
|
if sess and sess.get("Runner"): |
|
|
try: |
|
|
await sess["Runner"].RemoveProcess(session_id) |
|
|
except: pass |
|
|
|
|
|
runner: ILLMRunner = await self._runner_factory(llmServiceObj) |
|
|
if not runner.IsEnabled: |
|
|
await self._set_result(llmServiceObj, f"{llmServiceObj['LLMRunnerType']} {settings.SERVICE_ID} not started as it is disabled.", True, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
await self._set_result(llmServiceObj, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", True) |
|
|
await runner.StartProcess(llmServiceObj) |
|
|
|
|
|
self._sessions[session_id] = {"Runner": runner, "FullSessionId": session_id} |
|
|
if settings.SERVICE_ID == "monitor": |
|
|
await self._set_result(llmServiceObj, f"Hi i'm {runner.Type} your Network Monitor Assistant. How can I help you.", True, "llmServiceMessage", True) |
|
|
|
|
|
await self._pub.publish("llmServiceStarted", llmServiceObj) |
|
|
|
|
|
async def RemoveSession(self, llmServiceObj: dict): |
|
|
|
|
|
base = llmServiceObj.get("SessionId","").split("_")[0] |
|
|
targets = [k for k in self._sessions.keys() if k.startswith(base + "_")] |
|
|
msgs = [] |
|
|
ok = True |
|
|
for sid in targets: |
|
|
s = self._sessions.get(sid) |
|
|
if s and s.get("Runner"): |
|
|
try: |
|
|
await s["Runner"].RemoveProcess(sid) |
|
|
s["Runner"] = None |
|
|
msgs.append(sid) |
|
|
except Exception as e: |
|
|
ok = False |
|
|
msgs.append(f"Error {sid}: {e}") |
|
|
if ok: |
|
|
await self._set_result(llmServiceObj, f"Success: Removed sessions for {' '.join(msgs)}", True, "llmSessionMessage", True) |
|
|
else: |
|
|
await self._set_result(llmServiceObj, " ".join(msgs), False, "llmServiceMessage") |
|
|
|
|
|
async def StopRequest(self, llmServiceObj: dict): |
|
|
sid = llmServiceObj.get("SessionId","") |
|
|
s = self._sessions.get(sid) |
|
|
if not s or not s.get("Runner"): |
|
|
await self._set_result(llmServiceObj, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage") |
|
|
return |
|
|
await s["Runner"].StopRequest(sid) |
|
|
await self._set_result(llmServiceObj, f"Success {s['Runner'].Type} {settings.SERVICE_ID} Assistant output has been halted", True, "llmServiceMessage", True) |
|
|
|
|
|
async def UserInput(self, llmServiceObj: dict): |
|
|
sid = llmServiceObj.get("SessionId","") |
|
|
s = self._sessions.get(sid) |
|
|
if not s or not s.get("Runner"): |
|
|
await self._set_result(llmServiceObj, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage") |
|
|
return |
|
|
r: ILLMRunner = s["Runner"] |
|
|
if r.IsStateStarting: |
|
|
await self._set_result(llmServiceObj, "Please wait, the assistant is starting...", False, "llmServiceMessage") |
|
|
return |
|
|
if r.IsStateFailed: |
|
|
await self._set_result(llmServiceObj, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage") |
|
|
return |
|
|
await r.SendInputAndGetResponse(llmServiceObj) |
|
|
|
|
|
|
|
|
async def QueryIndexResult(self, queryIndexRequest: dict): |
|
|
|
|
|
try: |
|
|
rag_data = "\n".join([qr.get("Output","") for qr in (queryIndexRequest.get("QueryResults") or [])]) |
|
|
|
|
|
|
|
|
await self._pub.publish("llmServiceMessage", { |
|
|
"ResultSuccess": queryIndexRequest.get("Success", False), |
|
|
"ResultMessage": queryIndexRequest.get("Message",""), |
|
|
"Data": rag_data, |
|
|
}) |
|
|
except Exception as e: |
|
|
await self._pub.publish("llmServiceMessage", {"ResultSuccess": False, "ResultMessage": str(e)}) |
|
|
|
|
|
async def GetFunctionRegistry(self, filtered: bool = False): |
|
|
|
|
|
data = {"FunctionCatalogJson": "{}", "Filtered": filtered} |
|
|
await self._pub.publish("llmServiceMessage", {"ResultSuccess": True, "ResultMessage": f"Success : Got GetFunctionCatalogJson : {data}"}) |
|
|
|