owenkaplinsky
Change to Gradio 6 MCP format
f7a31b9
raw
history blame
43.2 kB
import os
import re
import requests
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from openai import OpenAI
import gradio as gr
import asyncio
import queue
import json
import uuid
import time
from colorama import Fore, Style
from huggingface_hub import HfApi
# Initialize OpenAI client (will be updated when API key is set)
client = None
# Store API keys in memory for this process
stored_api_key = ""
stored_hf_key = ""
# Global variable to store the latest chat context
latest_blockly_chat_code = ""
# Global variable to store the workspace's variables
latest_blockly_vars = ""
# Queue for deletion requests and results storage
deletion_queue = queue.Queue()
deletion_results = {}
# Queue for creation requests and results storage
creation_queue = queue.Queue()
creation_results = {}
# Queue for variable creation requests and results storage
variable_queue = queue.Queue()
variable_results = {}
# Global variable to store the deployed HF MCP server URL
current_mcp_server_url = None
# Global variable to track if a deployment just happened
deployment_just_happened = False
deployment_message = ""
blocks_context = ""
try:
file_path = os.path.join(os.path.dirname(__file__), "blocks.txt")
with open(file_path, "r", encoding="utf-8") as f:
blocks_context = f.read().strip()
except Exception as e:
print(f"[WARN] Could not read blocks.txt: {e}")
blocks_context = "(No external block data available.)"
# FastAPI App
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/update_chat")
async def update_chat(request: Request):
global latest_blockly_chat_code, latest_blockly_vars
data = await request.json()
latest_blockly_chat_code = data.get("code", "")
latest_blockly_vars = data.get("varString", "")
print("\n[FASTAPI] Updated Blockly chat code:\n", latest_blockly_chat_code)
print("\n[FASTAPI] Updated Blockly variables:\n", latest_blockly_vars)
return {"code": latest_blockly_chat_code}
@app.post("/set_api_key_chat")
async def set_api_key_chat(request: Request):
"""Receive API keys from frontend and store them"""
global stored_api_key, stored_hf_key
data = await request.json()
api_key = data.get("api_key", "").strip()
hf_key = data.get("hf_key", "").strip()
# Store in memory and set environment variables for this process
if api_key:
stored_api_key = api_key
os.environ["OPENAI_API_KEY"] = api_key
print(f"[CHAT API KEY] Set OPENAI_API_KEY in chat.py environment")
if hf_key:
stored_hf_key = hf_key
os.environ["HUGGINGFACE_API_KEY"] = hf_key
print(f"[CHAT HF KEY] Set HUGGINGFACE_API_KEY in chat.py environment")
return {"success": True}
def delete_block(block_id):
"""Delete a block from the Blockly workspace"""
try:
print(f"[DELETE REQUEST] Attempting to delete block: {block_id}")
# Clear any old results for this block ID first
if block_id in deletion_results:
deletion_results.pop(block_id)
# Add to deletion queue
deletion_queue.put({"block_id": block_id})
print(f"[DELETE REQUEST] Added to queue: {block_id}")
# Wait for result with timeout
import time
timeout = 8 # Increased timeout to 8 seconds
start_time = time.time()
check_interval = 0.05 # Check more frequently
while time.time() - start_time < timeout:
if block_id in deletion_results:
result = deletion_results.pop(block_id)
print(f"[DELETE RESULT] Received result for {block_id}: success={result.get('success')}, error={result.get('error')}")
if result["success"]:
return f"[TOOL] Successfully deleted block {block_id}"
else:
return f"[TOOL] Failed to delete block {block_id}: {result.get('error', 'Unknown error')}"
time.sleep(check_interval)
print(f"[DELETE TIMEOUT] No response received for block {block_id} after {timeout} seconds")
return f"Timeout waiting for deletion confirmation for block {block_id}"
except Exception as e:
print(f"[DELETE ERROR] {e}")
import traceback
traceback.print_exc()
return f"Error deleting block: {str(e)}"
def create_block(block_spec, under_block_id=None):
"""Create a block in the Blockly workspace"""
try:
print(f"[CREATE REQUEST] Attempting to create block: {block_spec}")
if under_block_id:
print(f"[CREATE REQUEST] Under block ID: {under_block_id}")
# Generate a unique request ID
import uuid
request_id = str(uuid.uuid4())
# Clear any old results for this request ID first
if request_id in creation_results:
creation_results.pop(request_id)
# Add to creation queue with optional under_block_id
queue_data = {"request_id": request_id, "block_spec": block_spec}
if under_block_id:
queue_data["under_block_id"] = under_block_id
creation_queue.put(queue_data)
print(f"[CREATE REQUEST] Added to queue with ID: {request_id}")
# Wait for result with timeout
import time
timeout = 8 # 8 seconds timeout
start_time = time.time()
check_interval = 0.05 # Check more frequently
while time.time() - start_time < timeout:
if request_id in creation_results:
result = creation_results.pop(request_id)
print(f"[CREATE RESULT] Received result for {request_id}: success={result.get('success')}, error={result.get('error')}")
if result["success"]:
return f"[TOOL] Successfully created block: {result.get('block_id', 'unknown')}"
else:
return f"[TOOL] Failed to create block: {result.get('error', 'Unknown error')}"
time.sleep(check_interval)
print(f"[CREATE TIMEOUT] No response received for request {request_id} after {timeout} seconds")
return f"Timeout waiting for block creation confirmation"
except Exception as e:
print(f"[CREATE ERROR] {e}")
import traceback
traceback.print_exc()
return f"Error creating block: {str(e)}"
def create_variable(var_name):
"""Create a variable in the Blockly workspace"""
try:
print(f"[VARIABLE REQUEST] Attempting to create variable: {var_name}")
# Generate a unique request ID
request_id = str(uuid.uuid4())
# Clear any old results for this request ID first
if request_id in variable_results:
variable_results.pop(request_id)
# Add to variable creation queue
queue_data = {"request_id": request_id, "variable_name": var_name}
variable_queue.put(queue_data)
print(f"[VARIABLE REQUEST] Added to queue with ID: {request_id}")
# Wait for result with timeout
timeout = 8 # 8 seconds timeout
start_time = time.time()
check_interval = 0.05 # Check more frequently
while time.time() - start_time < timeout:
if request_id in variable_results:
result = variable_results.pop(request_id)
print(f"[VARIABLE RESULT] Received result for {request_id}: success={result.get('success')}, error={result.get('error')}")
if result["success"]:
return f"[TOOL] Successfully created variable: {result.get('variable_id', var_name)}"
else:
return f"[TOOL] Failed to create variable: {result.get('error', 'Unknown error')}"
time.sleep(check_interval)
print(f"[VARIABLE TIMEOUT] No response received for request {request_id} after {timeout} seconds")
return f"Timeout waiting for variable creation confirmation"
except Exception as e:
print(f"[VARIABLE ERROR] {e}")
import traceback
traceback.print_exc()
return f"Error creating variable: {str(e)}"
# Server-Sent Events endpoint for creation requests
@app.get("/create_stream")
async def create_stream():
"""Stream creation requests to the frontend using Server-Sent Events"""
async def clear_sent_request(sent_requests, request_id, delay):
"""Clear request_id from sent_requests after delay seconds"""
await asyncio.sleep(delay)
if request_id in sent_requests:
sent_requests.discard(request_id)
async def event_generator():
sent_requests = set() # Track sent requests to avoid duplicates
heartbeat_counter = 0
while True:
try:
# Check for creation requests (non-blocking)
if not creation_queue.empty():
creation_request = creation_queue.get_nowait()
request_id = creation_request.get("request_id")
# Avoid sending duplicate requests too quickly
if request_id not in sent_requests:
sent_requests.add(request_id)
print(f"[SSE CREATE SEND] Sending creation request with ID: {request_id}")
yield f"data: {json.dumps(creation_request)}\n\n"
# Clear from sent_requests after 10 seconds
asyncio.create_task(clear_sent_request(sent_requests, request_id, 10))
else:
print(f"[SSE CREATE SKIP] Skipping duplicate request for ID: {request_id}")
await asyncio.sleep(0.1) # Small delay between messages
else:
# Send a heartbeat every 30 seconds to keep connection alive
heartbeat_counter += 1
if heartbeat_counter >= 300: # 300 * 0.1 = 30 seconds
yield f"data: {json.dumps({'heartbeat': True})}\n\n"
heartbeat_counter = 0
await asyncio.sleep(0.1)
except queue.Empty:
await asyncio.sleep(0.1)
except Exception as e:
print(f"[SSE CREATE ERROR] {e}")
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# Endpoint to receive creation results from frontend
@app.post("/creation_result")
async def creation_result(request: Request):
"""Receive creation results from the frontend"""
data = await request.json()
request_id = data.get("request_id")
success = data.get("success")
error = data.get("error")
block_id = data.get("block_id")
print(f"[CREATION RESULT RECEIVED] request_id={request_id}, success={success}, error={error}, block_id={block_id}")
if request_id:
# Store the result for the create_block function to retrieve
creation_results[request_id] = data
print(f"[CREATION RESULT STORED] Results dict now has {len(creation_results)} items")
return {"received": True}
# Server-Sent Events endpoint for deletion requests
@app.get("/delete_stream")
async def delete_stream():
"""Stream deletion requests to the frontend using Server-Sent Events"""
async def clear_sent_request(sent_requests, block_id, delay):
"""Clear block_id from sent_requests after delay seconds"""
await asyncio.sleep(delay)
if block_id in sent_requests:
sent_requests.discard(block_id)
async def event_generator():
sent_requests = set() # Track sent requests to avoid duplicates
heartbeat_counter = 0
while True:
try:
# Check for deletion requests (non-blocking)
if not deletion_queue.empty():
deletion_request = deletion_queue.get_nowait()
block_id = deletion_request.get("block_id")
# Avoid sending duplicate requests too quickly
if block_id not in sent_requests:
sent_requests.add(block_id)
print(f"[SSE SEND] Sending deletion request for block: {block_id}")
yield f"data: {json.dumps(deletion_request)}\n\n"
# Clear from sent_requests after 10 seconds
asyncio.create_task(clear_sent_request(sent_requests, block_id, 10))
else:
print(f"[SSE SKIP] Skipping duplicate request for block: {block_id}")
await asyncio.sleep(0.1) # Small delay between messages
else:
# Send a heartbeat every 30 seconds to keep connection alive
heartbeat_counter += 1
if heartbeat_counter >= 300: # 300 * 0.1 = 30 seconds
yield f"data: {json.dumps({'heartbeat': True})}\n\n"
heartbeat_counter = 0
await asyncio.sleep(0.1)
except queue.Empty:
await asyncio.sleep(0.1)
except Exception as e:
print(f"[SSE ERROR] {e}")
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# Endpoint to receive deletion results from frontend
@app.post("/deletion_result")
async def deletion_result(request: Request):
"""Receive deletion results from the frontend"""
data = await request.json()
block_id = data.get("block_id")
success = data.get("success")
error = data.get("error")
print(f"[DELETION RESULT RECEIVED] block_id={block_id}, success={success}, error={error}")
if block_id:
# Store the result for the delete_block function to retrieve
deletion_results[block_id] = data
print(f"[DELETION RESULT STORED] Results dict now has {len(deletion_results)} items")
return {"received": True}
# Server-Sent Events endpoint for variable creation requests
@app.get("/variable_stream")
async def variable_stream():
"""Stream variable creation requests to the frontend using Server-Sent Events"""
async def clear_sent_request(sent_requests, request_id, delay):
"""Clear request_id from sent_requests after delay seconds"""
await asyncio.sleep(delay)
if request_id in sent_requests:
sent_requests.discard(request_id)
async def event_generator():
sent_requests = set() # Track sent requests to avoid duplicates
heartbeat_counter = 0
while True:
try:
# Check for variable creation requests (non-blocking)
if not variable_queue.empty():
var_request = variable_queue.get_nowait()
request_id = var_request.get("request_id")
# Avoid sending duplicate requests too quickly
if request_id not in sent_requests:
sent_requests.add(request_id)
print(f"[SSE VARIABLE SEND] Sending variable creation request with ID: {request_id}")
yield f"data: {json.dumps(var_request)}\n\n"
# Clear from sent_requests after 10 seconds
asyncio.create_task(clear_sent_request(sent_requests, request_id, 10))
else:
print(f"[SSE VARIABLE SKIP] Skipping duplicate request for ID: {request_id}")
await asyncio.sleep(0.1) # Small delay between messages
else:
# Send a heartbeat every 30 seconds to keep connection alive
heartbeat_counter += 1
if heartbeat_counter >= 300: # 300 * 0.1 = 30 seconds
yield f"data: {json.dumps({'heartbeat': True})}\n\n"
heartbeat_counter = 0
await asyncio.sleep(0.1)
except queue.Empty:
await asyncio.sleep(0.1)
except Exception as e:
print(f"[SSE VARIABLE ERROR] {e}")
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# Endpoint to receive variable creation results from frontend
@app.post("/variable_result")
async def variable_result(request: Request):
"""Receive variable creation results from the frontend"""
data = await request.json()
request_id = data.get("request_id")
success = data.get("success")
error = data.get("error")
variable_id = data.get("variable_id")
print(f"[VARIABLE RESULT RECEIVED] request_id={request_id}, success={success}, error={error}, variable_id={variable_id}")
if request_id:
# Store the result for the create_variable function to retrieve
variable_results[request_id] = data
print(f"[VARIABLE RESULT STORED] Results dict now has {len(variable_results)} items")
return {"received": True}
def deploy_to_huggingface(space_name):
"""Deploy the generated MCP code to a Hugging Face Space"""
global stored_hf_key
if not stored_hf_key:
return "[DEPLOY ERROR] No Hugging Face API key configured. Please set it in File > Keys."
try:
from huggingface_hub import HfApi
except ImportError:
return "[DEPLOY ERROR] huggingface_hub not installed. Run: pip install huggingface_hub"
try:
api = HfApi(token=stored_hf_key)
# Get username from token
user_info = api.whoami()
username = user_info["name"]
repo_id = f"{username}/{space_name}"
print(f"[DEPLOY] Creating HF Space: {repo_id}")
# Create the Space
api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk="gradio",
private=False,
)
print(f"[DEPLOY] Space created. Uploading files...")
# Get the actual generated Python code from test.py (not the Blockly DSL)
python_code = ""
try:
resp = requests.get(f"http://127.0.0.1:{os.getenv('PORT', 8080)}/get_latest_code")
if resp.ok:
python_code = resp.json().get("code", "")
except Exception as e:
print(f"[DEPLOY WARN] Could not fetch Python code from test.py: {e}")
if not python_code.strip():
return "[DEPLOY ERROR] No generated Python code available. Create and test your tool first."
# Upload app.py with actual Python code
api.upload_file(
path_or_fileobj=python_code.encode(),
path_in_repo="app.py",
repo_id=repo_id,
repo_type="space",
)
# Create requirements.txt
requirements_content = """gradio
openai
requests
huggingface_hub
"""
api.upload_file(
path_or_fileobj=requirements_content.encode(),
path_in_repo="requirements.txt",
repo_id=repo_id,
repo_type="space",
)
# Create README.md with proper YAML front matter
readme_content = f"""---
title: {space_name.replace('-', ' ').title()}
emoji: 🚀
colorFrom: purple
colorTo: blue
sdk: gradio
app_file: app.py
pinned: false
---
# {space_name}
This is a MCP server created with [MCP Blockly](https://github.com/owenkaplinsky/mcp-blockly): a visual programming environment for building AI tools.
The tool has been automatically deployed to Hugging Face Spaces and is ready to use!
"""
api.upload_file(
path_or_fileobj=readme_content.encode("utf-8"),
path_in_repo="README.md",
repo_id=repo_id,
repo_type="space",
)
space_url = f"https://huggingface.co/spaces/{repo_id}"
print(f"[DEPLOY SUCCESS] Space deployed: {space_url}")
# Store the MCP server URL globally for native MCP support
global current_mcp_server_url, deployment_just_happened, deployment_message
current_mcp_server_url = space_url
deployment_just_happened = True
deployment_message = f"Your MCP tool is being built on Hugging Face Spaces. This usually takes 1-2 minutes. Once it's ready, you'll be able to use the MCP tools defined in your blocks."
print(f"[MCP] Registered MCP server: {current_mcp_server_url}")
return f"[TOOL] Successfully deployed to Hugging Face Space!\n\n**Space URL:** {space_url}"
except Exception as e:
print(f"[DEPLOY ERROR] {e}")
import traceback
traceback.print_exc()
return f"[DEPLOY ERROR] Failed to deploy: {str(e)}"
def create_gradio_interface():
# Hardcoded system prompt
SYSTEM_PROMPT = f"""You are an AI assistant that helps users build **MCP servers** using Blockly blocks.
MCP lets AI systems define tools with specific inputs and outputs that any LLM can call.
You’ll receive the workspace state in this format:
`blockId | block_name(inputs(input_name: value))`
**Special cases:**
- `create_mcp` and `func_def` use `blockId | block_name(inputs(input_name: type), outputs(output_name: value))`
- Indentation or nesting shows logic hierarchy (like loops or conditionals).
- The `blockId` before the pipe `|` is each block’s unique identifier.
---
### Your job
- Help users understand or fix their MCP logic in natural, human language.
- You may reference the internal block syntax for your own understanding, but never show or explain it to the
user unless they explicitly ask.
- Focus on what the code *does* and what the user is trying to achieve, not on the raw block format.
- In your first message, you may either respond normally or call a tool. If you call a tool, you must first
explain your intended plan and the steps you will take, then perform the tool call in the same message.
---
### Using Your MCP
Once you deploy your MCP to a Hugging Face Space, the model will automatically have access to all the tools you defined. Simply ask the model to use your MCP tools, and it will call them natively without manual intervention.
**Deployment workflow:**
1. Create and test your MCP using Blockly blocks
2. Deploy to a Hugging Face Space using the `deploy_to_huggingface` tool
3. After deployment, the MCP tool becomes immediately available in this chat
4. You may call this tool afterwards as needed. Do not immediately run the MCP
server after deploying it. The user must ask (you can ask if they want it)
---
### Deleting Blocks
- Each block starts with its ID, like `blockId | block_name(...)`.
- To delete a block, specify its block ID. Each block ID is a unique random alphanumeric string shown to the left of the block.
- You can delete any block except the main `create_mcp` block.
`blockId | code`
Each block has its own ID, and you need to use the ID specifically from
the correct block.
---
### Creating Blocks
List of blocks:
{blocks_context}
---
You can create new blocks in the workspace by specifying the block type and its input parameters, if it has any.
You cannot create a MCP block or edit its inputs or outputs.
There are two kinds of nesting in Blockly:
1. **Statement-level nesting (main-level blocks)**
These are blocks that represent actions or structures, such as loops or conditionals, which can contain other blocks *under* them.
To create this kind of nesting, use **two separate `create_block` commands**:
- First, create the outer block (for example, a `repeat` or `if` block).
- Then, create the inner block *under* it using the `under` parameter.
Example: putting an `if` block inside a `repeat` block.
2. **Value-level nesting (output blocks)**
These are blocks that produce a value (like a number, text, or expression). They can’t exist alone in the workspace - they must
be nested inside another block’s input. To create these, you can nest them directly in a single command, for example:
math_arithmetic(inputs(A: math_number(inputs(NUM: 1)), B: math_number(inputs(NUM: 1))))
Here, the two `math_number` blocks are nested inside the `math_arithmetic` block in one call.
When creating blocks, you are never allowed to insert raw text or numbers directly into a block's inputs.
Every value must be enclosed inside the correct block type that represents that value.
Failing to do this will cause the block to be invalid and unusable.
Example of what you must NOT do:
`text_isEmpty(inputs(VALUE: "text"))`
This is invalid because "text" is a raw string and not a block.
The correct and required form wraps the string inside a text block:
`text_isEmpty(inputs(VALUE: text(inputs(TEXT: "text"))))`
This is valid because it uses a text block as the value.
This rule is absolute and applies to all value types:
- Strings must always use a text block.
- Numbers must always use a math_number block.
- Booleans, lists, colors, and every other type must always use their correct block type.
If a block has a value input, that input must always contain another block.
You are not permitted to use raw values under any circumstance.
For blocks that allow infinite things (like ...N) you do not need to provide any inputs
if you want it to be blank.
When creating blocks, you are unable to put an outputting block inside of another block
which already exists. If you are trying to nest input blocks, you must create them all
in one call.
But, for blocks that you want to stack that connect above or below to other blocks, you cannot
create both blocks in the same response. You must create one, wait, then create the other. You
need to wait and not do both in the same response because you need the ID of the first block.
### Variables
You will be given the current variables that are in the workspace. Like the blocks, you will see:
`varId | varName`
---
### Deploying to Hugging Face Spaces
Once the user has tested and is happy with their MCP tool, you can deploy it to a live Hugging Face Space using the `deploy_to_huggingface` tool.
**To deploy:**
1. Ask the user for a name for their Space (e.g., "my-tool")
2. Call the `deploy_to_huggingface` tool with that name
3. The tool will create a new Space, upload the code, and return a live URL
The deployed Space will be public and shareable with others.
You NEVER need to deploy it more than once. If you deployed it, you can run it as many times as you want WITHOUT deploying again.
---
Note: Users can see tool response outputs verbatim. You don't have to repeat the tool response unless you want to.
"""
tools = [
{
"type": "function",
"name": "delete_block",
"description": "Delete a single block using its ID.",
"parameters": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The ID of the block you're trying to delete.",
},
},
"required": ["id"],
}
},
{
"type": "function",
"name": "create_block",
"description": "Creates a single block that allows recursive nested blocks.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The create block command using the custom DSL format.",
},
"under": {
"type": "string",
"description": "The ID of the block that you want to place this under.",
},
},
"required": ["command"],
}
},
{
"type": "function",
"name": "create_variable",
"description": "Creates a variable.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the variable you want to create.",
},
},
"required": ["name"],
}
},
{
"type": "function",
"name": "deploy_to_huggingface",
"description": "Deploy the generated MCP tool to a Hugging Face Space. Requires a Hugging Face API key to be set.",
"parameters": {
"type": "object",
"properties": {
"space_name": {
"type": "string",
"description": "The name of the Hugging Face Space to create (e.g., 'my-tool')",
},
},
"required": ["space_name"],
}
},
]
def chat_with_context(message, history):
# Check if API key is set and create/update client
global client, stored_api_key
# Use stored key or environment key
api_key = stored_api_key or os.environ.get("OPENAI_API_KEY")
if api_key and (not client or (hasattr(client, 'api_key') and client.api_key != api_key)):
try:
client = OpenAI(api_key=api_key)
except Exception as e:
yield f"Error initializing OpenAI client: {str(e)}"
return
if not client or not api_key:
yield "OpenAI API key not configured. Please set it in File > Settings in the Blockly interface."
return
# Get chat context
global latest_blockly_chat_code
context = latest_blockly_chat_code
global latest_blockly_vars
vars = latest_blockly_vars
# Convert history to OpenAI format
input_items = []
for human, ai in history:
input_items.append({"role": "user", "content": human})
input_items.append({"role": "assistant", "content": ai})
# Debug
print(f"[DEBUG] Context received: {context if context else 'No context available'}")
# Build instructions
instructions = SYSTEM_PROMPT
if context:
instructions += f"\n\nCurrent Blockly workspace state:\n{context}"
else:
instructions += "\n\nNote: No Blockly workspace context is currently available."
if vars != "":
instructions += f"\n\nCurrent Blockly variables:\n{vars}"
else:
instructions += "\n\nNote: No Blockly variables are currently available."
# Iteration control
accumulated_response = ""
max_iterations = 10
current_iteration = 0
# Start with original user message
current_prompt = message
temp_input_items = input_items.copy()
# MAIN LOOP
while current_iteration < max_iterations:
current_iteration += 1
try:
# Build dynamic tools list with MCP support
dynamic_tools = tools.copy() if tools else []
# Inject MCP tool if a server is registered
global current_mcp_server_url, deployment_just_happened, deployment_message
space_building_status = None # Track if space is building
if current_mcp_server_url:
mcp_injection_successful = False
try:
# Try to verify the MCP server is available before injecting
space_is_running = False
try:
# Extract username and space name from URL
# URL format: https://huggingface.co/spaces/username/space_name
url_parts = current_mcp_server_url.split("/spaces/")
if len(url_parts) == 2:
space_id = url_parts[1]
api = HfApi()
runtime_info = api.get_space_runtime(space_id)
print(f"[MCP] Space runtime status: {runtime_info}")
# Check if space is running
if runtime_info and runtime_info.stage == "RUNNING":
space_is_running = True
# Space is running - deployment is complete
deployment_just_happened = False
print(f"[MCP] Space is RUNNING")
else:
# Space is not running - it's likely building
space_building_status = runtime_info.stage if runtime_info else "unknown"
print(f"[MCP] Space is not running yet (stage: {space_building_status})")
except Exception as check_error:
print(f"[MCP] Could not verify space runtime: {check_error}")
# Only inject the MCP tool if the space is verified running
if space_is_running:
def convert_repo_to_live_mcp(url):
# input: https://huggingface.co/spaces/user/space
# output: https://user-space.hf.space/gradio_api/mcp
parts = url.split("/spaces/")
user, space = parts[1].split("/")
return f"https://{user}-{space}.hf.space/gradio_api/mcp"
live_mcp_url = convert_repo_to_live_mcp(current_mcp_server_url)
mcp_tool = {
"type": "mcp",
"server_url": live_mcp_url,
"server_label": "user_mcp_server",
"require_approval": "never"
}
dynamic_tools.append(mcp_tool)
print(f"[MCP] Injected MCP tool for server: {current_mcp_server_url}")
else:
print(f"[MCP] Skipping MCP tool injection - space not running yet")
except Exception as mcp_error:
print(f"[MCP ERROR] Failed during MCP injection: {mcp_error}")
print(f"[MCP] Continuing without MCP tools...")
# Continue without MCP - don't crash
# Add deployment status message to instructions if deployment just happened and space is not running
deployment_instructions = instructions
if deployment_just_happened and space_building_status and space_building_status != "RUNNING":
deployment_instructions = instructions + f"\n\n**MCP DEPLOYMENT STATUS:** {deployment_message}"
# Create Responses API call
response = client.responses.create(
model="gpt-4o",
instructions=deployment_instructions,
input=temp_input_items + [{"role": "user", "content": current_prompt}],
tools=dynamic_tools,
tool_choice="auto"
)
# print(response)
# Extract outputs
ai_response = ""
tool_calls = []
for item in response.output:
if item.type == "message":
# Extract assistant text
for content in item.content:
if content.type == "output_text":
ai_response = content.text
elif item.type == "function_call":
# Collect tool calls
tool_calls.append(item)
# PROCESSING TOOL CALLS
if tool_calls:
# Show assistant text FIRST if it exists
if ai_response:
if accumulated_response:
accumulated_response += "\n\n"
accumulated_response += ai_response
yield accumulated_response
# Now process each tool call, one by one
for tool_call in tool_calls:
function_name = tool_call.name
function_args = json.loads(tool_call.arguments)
call_id = tool_call.call_id
temp_input_items.append({"role": "user", "content": current_prompt})
temp_input_items.append({"role": "assistant", "content": ai_response})
temp_input_items.append({
"type": "function_call",
"call_id": call_id,
"name": function_name,
"arguments": tool_call.arguments
})
# Execute the tool
tool_result = None
result_label = ""
if function_name == "delete_block":
block_id = function_args.get("id", "")
print(Fore.YELLOW + f"Agent deleted block with ID `{block_id}`." + Style.RESET_ALL)
tool_result = delete_block(block_id)
result_label = "Delete Operation"
elif function_name == "create_block":
command = function_args.get("command", "")
under_block_id = function_args.get("under", None)
if under_block_id is None:
print(Fore.YELLOW + f"Agent created block with command `{command}`." + Style.RESET_ALL)
else:
print(Fore.YELLOW + f"Agent created block with command `{command}`, under block ID `{under_block_id}`." + Style.RESET_ALL)
tool_result = create_block(command, under_block_id)
result_label = "Create Operation"
elif function_name == "create_variable":
name = function_args.get("name", "")
print(Fore.YELLOW + f"Agent created variable with name `{name}`." + Style.RESET_ALL)
tool_result = create_variable(name)
result_label = "Create Var Operation"
elif function_name == "deploy_to_huggingface":
space_name = function_args.get("space_name", "")
print(Fore.YELLOW + f"Agent deploying to Hugging Face Space `{space_name}`." + Style.RESET_ALL)
tool_result = deploy_to_huggingface(space_name)
result_label = "Deployment Result"
# SHOW TOOL RESULT IMMEDIATELY
if tool_result is not None:
print(Fore.YELLOW + f"[TOOL RESULT] {tool_result}" + Style.RESET_ALL)
if accumulated_response:
accumulated_response += "\n\n"
accumulated_response += f"**{result_label}:** {tool_result}"
yield accumulated_response
# Append the tool result into the conversation for the model
temp_input_items.append({
"type": "function_call_output",
"call_id": tool_call.call_id,
"output": str(tool_result)
})
# Tell model to respond to tool result
current_prompt = "The tool has been executed with the result shown above. Please respond appropriately."
continue # Continue the main loop
else:
if ai_response:
if accumulated_response:
accumulated_response += "\n\n"
accumulated_response += ai_response
yield accumulated_response
break
except Exception as e:
if accumulated_response:
yield f"{accumulated_response}\n\nError in iteration {current_iteration}: {str(e)}"
else:
yield f"Error: {str(e)}"
return
# Max iterations reached
if current_iteration >= max_iterations:
accumulated_response += f"\n\n*(Reached maximum of {max_iterations} consecutive responses)*"
yield accumulated_response
# Attach to Gradio ChatInterface
demo = gr.ChatInterface(
fn=chat_with_context,
title="AI Assistant",
)
return demo
def get_chat_gradio_interface():
return create_gradio_interface()
if __name__ == "__main__":
demo = create_gradio_interface()
app = gr.mount_gradio_app(app, demo, path="/")