File size: 3,168 Bytes
bf292d9 7630510 0bf22fe bf292d9 552430d 0bf22fe 552430d 7630510 bf292d9 2c8368f 0bf22fe 2c8368f 552430d bf292d9 2c8368f bf292d9 2c8368f bf292d9 0bf22fe bf292d9 2c8368f 0bf22fe 552430d 66c4f69 bf292d9 2c8368f bf292d9 552430d 66c4f69 7630510 66c4f69 552430d 66c4f69 552430d 66c4f69 552430d 2c8368f bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
import json
import logging
from typing import Callable, Awaitable, Dict, Any, List
import aio_pika
from aiormq.exceptions import ChannelInvalidStateError
Handler = Callable[[Any], Awaitable[None]]
logger = logging.getLogger(__name__)
class RabbitListenerBase:
def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
self._base = base
self._instance_name = instance_name
self._handlers = handlers
self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
def _qname(self, exchange: str, routing_keys: List[str]) -> str:
rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
suffix = f"-{rk_part}" if rk_part else ""
return f"{self._instance_name}-{exchange}{suffix}"
async def start(self, declarations: List[dict]) -> None:
for d in declarations:
exch = d["ExchangeName"]
ttl = d.get("MessageTimeout") or None
rks = d.get("RoutingKeys") or [""]
qname = self._qname(exch, rks)
q = await self._base.declare_queue_bind(
exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
)
# manual ack, parity with .NET (autoAck: false)
await q.consume(self._make_consumer(d["FuncName"]), no_ack=False)
self._consumers.append(q)
def _make_consumer(self, func_name: str):
handler = self._handlers.get(func_name)
async def _on_msg(msg: aio_pika.IncomingMessage):
# decode
try:
raw_body = msg.body.decode("utf-8", errors="replace")
logger.info("Received message for handler '%s': %s", func_name, raw_body)
try:
envelope = json.loads(raw_body)
except Exception:
logger.exception("Invalid JSON for '%s'", func_name)
envelope = {"data": None}
data = envelope.get("data", None)
except Exception:
# if we cannot decode, ack to drop (matches .NET non-requeue behavior)
try:
await msg.ack()
except Exception:
pass
return
# ACK FIRST (like C#)
try:
await msg.ack()
except ChannelInvalidStateError:
# channel died; message may be redelivered; avoid loops
logger.warning("Ack failed: channel invalid for '%s'. Skipping ack.", func_name)
return
except Exception:
# swallow ack errors to avoid crash; mirrors resilient .NET behavior
logger.exception("Ack error for '%s'", func_name)
return
# run handler after ack; if it fails, caller handles own idempotency
if handler:
try:
await handler(data)
except Exception:
logger.exception("Handler error for '%s'", func_name)
else:
logger.error("No handler bound for '%s'", func_name)
return _on_msg
|