File size: 3,168 Bytes
bf292d9
7630510
0bf22fe
 
bf292d9
552430d
0bf22fe
552430d
7630510
 
bf292d9
2c8368f
0bf22fe
2c8368f
552430d
bf292d9
 
 
 
2c8368f
bf292d9
2c8368f
bf292d9
0bf22fe
bf292d9
 
 
2c8368f
 
0bf22fe
 
 
552430d
66c4f69
bf292d9
 
 
 
2c8368f
bf292d9
552430d
66c4f69
 
 
 
7630510
66c4f69
 
 
 
552430d
 
 
 
 
 
 
66c4f69
552430d
 
 
 
 
 
 
66c4f69
552430d
 
 
 
 
 
 
 
 
 
 
 
2c8368f
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import json
import logging
from typing import Callable, Awaitable, Dict, Any, List

import aio_pika
from aiormq.exceptions import ChannelInvalidStateError

Handler = Callable[[Any], Awaitable[None]]
logger = logging.getLogger(__name__)


class RabbitListenerBase:
    def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
        self._base = base
        self._instance_name = instance_name
        self._handlers = handlers
        self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []

    def _qname(self, exchange: str, routing_keys: List[str]) -> str:
        rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
        suffix = f"-{rk_part}" if rk_part else ""
        return f"{self._instance_name}-{exchange}{suffix}"

    async def start(self, declarations: List[dict]) -> None:
        for d in declarations:
            exch = d["ExchangeName"]
            ttl = d.get("MessageTimeout") or None
            rks = d.get("RoutingKeys") or [""]
            qname = self._qname(exch, rks)
            q = await self._base.declare_queue_bind(
                exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
            )
            # manual ack, parity with .NET (autoAck: false)
            await q.consume(self._make_consumer(d["FuncName"]), no_ack=False)
            self._consumers.append(q)

    def _make_consumer(self, func_name: str):
        handler = self._handlers.get(func_name)

        async def _on_msg(msg: aio_pika.IncomingMessage):
            # decode
            try:
                raw_body = msg.body.decode("utf-8", errors="replace")
                logger.info("Received message for handler '%s': %s", func_name, raw_body)
                try:
                    envelope = json.loads(raw_body)
                except Exception:
                    logger.exception("Invalid JSON for '%s'", func_name)
                    envelope = {"data": None}
                data = envelope.get("data", None)
            except Exception:
                # if we cannot decode, ack to drop (matches .NET non-requeue behavior)
                try:
                    await msg.ack()
                except Exception:
                    pass
                return

            # ACK FIRST (like C#)
            try:
                await msg.ack()
            except ChannelInvalidStateError:
                # channel died; message may be redelivered; avoid loops
                logger.warning("Ack failed: channel invalid for '%s'. Skipping ack.", func_name)
                return
            except Exception:
                # swallow ack errors to avoid crash; mirrors resilient .NET behavior
                logger.exception("Ack error for '%s'", func_name)
                return

            # run handler after ack; if it fails, caller handles own idempotency
            if handler:
                try:
                    await handler(data)
                except Exception:
                    logger.exception("Handler error for '%s'", func_name)
            else:
                logger.error("No handler bound for '%s'", func_name)

        return _on_msg