GradLLM / app.py
johnbridges's picture
.
9f3b48c
raw
history blame
2.84 kB
# app.py
import asyncio, logging, gradio as gr
from config import settings
from rabbit_base import RabbitBase
from listener import RabbitListenerBase
from rabbit_repo import RabbitRepo
from oa_server import OpenAIServers
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
log = logging.getLogger("app")
try:
import spaces
@spaces.GPU(duration=60)
def gpu_entrypoint() -> str: return "gpu: ready"
except Exception:
def gpu_entrypoint() -> str: return "gpu: not available (CPU only)"
# publisher + servers
publisher = RabbitRepo(external_source="openai.mq.server")
# Force oa.* exchanges to DIRECT using the built-in resolver hook (no raw API added)
resolver = (lambda name: "direct" if name.startswith("oa.") else settings.RABBIT_EXCHANGE_TYPE)
base = RabbitBase(exchange_type_resolver=resolver)
servers = OpenAIServers(publisher)
# Existing handlers can stay; add our two:
handlers = {
"oaChatCreate": servers.handle_chat_create,
"oaImagesGenerate": servers.handle_images_generate,
}
# Declare listener queues using your proven pattern
DECLS = [
# Chat Completions
{"ExchangeName": "oa.chat.create", "FuncName": "oaChatCreate",
"MessageTimeout": 600_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
# Images (generations)
{"ExchangeName": "oa.images.generate", "FuncName": "oaImagesGenerate",
"MessageTimeout": 600_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
]
listener = RabbitListenerBase(base, instance_name=settings.RABBIT_INSTANCE_NAME, handlers=handlers)
async def _startup_init():
try:
await base.connect() # your TLS settings apply here
await listener.start(DECLS) # same listener pattern as before
return "OpenAI MQ servers: ready"
except Exception as e:
log.exception("Startup init failed")
return f"ERROR: {e}"
async def ping(): return "ok"
with gr.Blocks(title="OpenAI over RabbitMQ (proven API)", theme=gr.themes.Soft()) as demo:
gr.Markdown("## OpenAI-compatible over RabbitMQ β€” using existing Rabbit API (CloudEvent envelopes)")
with gr.Tabs():
with gr.Tab("Service"):
btn = gr.Button("Ping"); out = gr.Textbox(label="Ping result")
btn.click(ping, inputs=None, outputs=out)
init_status = gr.Textbox(label="Startup status", interactive=False)
demo.load(fn=_startup_init, inputs=None, outputs=init_status)
with gr.Tab("@spaces.GPU Probe"):
gpu_btn = gr.Button("GPU Ready Probe", variant="primary")
gpu_out = gr.Textbox(label="GPU Probe Result", interactive=False)
gpu_btn.click(gpu_entrypoint, inputs=None, outputs=gpu_out)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)