GradLLM / listener.py
johnbridges's picture
.
552430d
import json
import logging
from typing import Callable, Awaitable, Dict, Any, List
import aio_pika
from aiormq.exceptions import ChannelInvalidStateError
Handler = Callable[[Any], Awaitable[None]]
logger = logging.getLogger(__name__)
class RabbitListenerBase:
def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
self._base = base
self._instance_name = instance_name
self._handlers = handlers
self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
def _qname(self, exchange: str, routing_keys: List[str]) -> str:
rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
suffix = f"-{rk_part}" if rk_part else ""
return f"{self._instance_name}-{exchange}{suffix}"
async def start(self, declarations: List[dict]) -> None:
for d in declarations:
exch = d["ExchangeName"]
ttl = d.get("MessageTimeout") or None
rks = d.get("RoutingKeys") or [""]
qname = self._qname(exch, rks)
q = await self._base.declare_queue_bind(
exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
)
# manual ack, parity with .NET (autoAck: false)
await q.consume(self._make_consumer(d["FuncName"]), no_ack=False)
self._consumers.append(q)
def _make_consumer(self, func_name: str):
handler = self._handlers.get(func_name)
async def _on_msg(msg: aio_pika.IncomingMessage):
# decode
try:
raw_body = msg.body.decode("utf-8", errors="replace")
logger.info("Received message for handler '%s': %s", func_name, raw_body)
try:
envelope = json.loads(raw_body)
except Exception:
logger.exception("Invalid JSON for '%s'", func_name)
envelope = {"data": None}
data = envelope.get("data", None)
except Exception:
# if we cannot decode, ack to drop (matches .NET non-requeue behavior)
try:
await msg.ack()
except Exception:
pass
return
# ACK FIRST (like C#)
try:
await msg.ack()
except ChannelInvalidStateError:
# channel died; message may be redelivered; avoid loops
logger.warning("Ack failed: channel invalid for '%s'. Skipping ack.", func_name)
return
except Exception:
# swallow ack errors to avoid crash; mirrors resilient .NET behavior
logger.exception("Ack error for '%s'", func_name)
return
# run handler after ack; if it fails, caller handles own idempotency
if handler:
try:
await handler(data)
except Exception:
logger.exception("Handler error for '%s'", func_name)
else:
logger.error("No handler bound for '%s'", func_name)
return _on_msg