Spaces:
Running
Running
| import os | |
| import re | |
| import requests | |
| from fastapi import FastAPI, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse | |
| from openai import OpenAI | |
| import gradio as gr | |
| import asyncio | |
| import queue | |
| import json | |
| import uuid | |
| import time | |
| from colorama import Fore, Style | |
| from huggingface_hub import HfApi | |
| # Initialize OpenAI client (will be updated when API key is set) | |
| client = None | |
| # Store API keys in memory for this process | |
| stored_api_key = "" | |
| stored_hf_key = "" | |
| # Global variable to store the latest chat context | |
| latest_blockly_chat_code = "" | |
| # Global variable to store the workspace's variables | |
| latest_blockly_vars = "" | |
| # Queue for deletion requests and results storage | |
| deletion_queue = queue.Queue() | |
| deletion_results = {} | |
| # Queue for creation requests and results storage | |
| creation_queue = queue.Queue() | |
| creation_results = {} | |
| # Queue for variable creation requests and results storage | |
| variable_queue = queue.Queue() | |
| variable_results = {} | |
| # Global variable to store the deployed HF MCP server URL | |
| current_mcp_server_url = None | |
| # Global variable to track if a deployment just happened | |
| deployment_just_happened = False | |
| deployment_message = "" | |
| blocks_context = "" | |
| try: | |
| file_path = os.path.join(os.path.dirname(__file__), "blocks.txt") | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| blocks_context = f.read().strip() | |
| except Exception as e: | |
| print(f"[WARN] Could not read blocks.txt: {e}") | |
| blocks_context = "(No external block data available.)" | |
| # FastAPI App | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def update_chat(request: Request): | |
| global latest_blockly_chat_code, latest_blockly_vars | |
| data = await request.json() | |
| latest_blockly_chat_code = data.get("code", "") | |
| latest_blockly_vars = data.get("varString", "") | |
| print("\n[FASTAPI] Updated Blockly chat code:\n", latest_blockly_chat_code) | |
| print("\n[FASTAPI] Updated Blockly variables:\n", latest_blockly_vars) | |
| return {"code": latest_blockly_chat_code} | |
| async def set_api_key_chat(request: Request): | |
| """Receive API keys from frontend and store them""" | |
| global stored_api_key, stored_hf_key | |
| data = await request.json() | |
| api_key = data.get("api_key", "").strip() | |
| hf_key = data.get("hf_key", "").strip() | |
| # Store in memory and set environment variables for this process | |
| if api_key: | |
| stored_api_key = api_key | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| print(f"[CHAT API KEY] Set OPENAI_API_KEY in chat.py environment") | |
| if hf_key: | |
| stored_hf_key = hf_key | |
| os.environ["HUGGINGFACE_API_KEY"] = hf_key | |
| print(f"[CHAT HF KEY] Set HUGGINGFACE_API_KEY in chat.py environment") | |
| return {"success": True} | |
| def delete_block(block_id): | |
| """Delete a block from the Blockly workspace""" | |
| try: | |
| print(f"[DELETE REQUEST] Attempting to delete block: {block_id}") | |
| # Clear any old results for this block ID first | |
| if block_id in deletion_results: | |
| deletion_results.pop(block_id) | |
| # Add to deletion queue | |
| deletion_queue.put({"block_id": block_id}) | |
| print(f"[DELETE REQUEST] Added to queue: {block_id}") | |
| # Wait for result with timeout | |
| import time | |
| timeout = 8 # Increased timeout to 8 seconds | |
| start_time = time.time() | |
| check_interval = 0.05 # Check more frequently | |
| while time.time() - start_time < timeout: | |
| if block_id in deletion_results: | |
| result = deletion_results.pop(block_id) | |
| print(f"[DELETE RESULT] Received result for {block_id}: success={result.get('success')}, error={result.get('error')}") | |
| if result["success"]: | |
| return f"[TOOL] Successfully deleted block {block_id}" | |
| else: | |
| return f"[TOOL] Failed to delete block {block_id}: {result.get('error', 'Unknown error')}" | |
| time.sleep(check_interval) | |
| print(f"[DELETE TIMEOUT] No response received for block {block_id} after {timeout} seconds") | |
| return f"Timeout waiting for deletion confirmation for block {block_id}" | |
| except Exception as e: | |
| print(f"[DELETE ERROR] {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error deleting block: {str(e)}" | |
| def create_block(block_spec, under_block_id=None): | |
| """Create a block in the Blockly workspace""" | |
| try: | |
| print(f"[CREATE REQUEST] Attempting to create block: {block_spec}") | |
| if under_block_id: | |
| print(f"[CREATE REQUEST] Under block ID: {under_block_id}") | |
| # Generate a unique request ID | |
| import uuid | |
| request_id = str(uuid.uuid4()) | |
| # Clear any old results for this request ID first | |
| if request_id in creation_results: | |
| creation_results.pop(request_id) | |
| # Add to creation queue with optional under_block_id | |
| queue_data = {"request_id": request_id, "block_spec": block_spec} | |
| if under_block_id: | |
| queue_data["under_block_id"] = under_block_id | |
| creation_queue.put(queue_data) | |
| print(f"[CREATE REQUEST] Added to queue with ID: {request_id}") | |
| # Wait for result with timeout | |
| import time | |
| timeout = 8 # 8 seconds timeout | |
| start_time = time.time() | |
| check_interval = 0.05 # Check more frequently | |
| while time.time() - start_time < timeout: | |
| if request_id in creation_results: | |
| result = creation_results.pop(request_id) | |
| print(f"[CREATE RESULT] Received result for {request_id}: success={result.get('success')}, error={result.get('error')}") | |
| if result["success"]: | |
| return f"[TOOL] Successfully created block: {result.get('block_id', 'unknown')}" | |
| else: | |
| return f"[TOOL] Failed to create block: {result.get('error', 'Unknown error')}" | |
| time.sleep(check_interval) | |
| print(f"[CREATE TIMEOUT] No response received for request {request_id} after {timeout} seconds") | |
| return f"Timeout waiting for block creation confirmation" | |
| except Exception as e: | |
| print(f"[CREATE ERROR] {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error creating block: {str(e)}" | |
| def create_variable(var_name): | |
| """Create a variable in the Blockly workspace""" | |
| try: | |
| print(f"[VARIABLE REQUEST] Attempting to create variable: {var_name}") | |
| # Generate a unique request ID | |
| request_id = str(uuid.uuid4()) | |
| # Clear any old results for this request ID first | |
| if request_id in variable_results: | |
| variable_results.pop(request_id) | |
| # Add to variable creation queue | |
| queue_data = {"request_id": request_id, "variable_name": var_name} | |
| variable_queue.put(queue_data) | |
| print(f"[VARIABLE REQUEST] Added to queue with ID: {request_id}") | |
| # Wait for result with timeout | |
| timeout = 8 # 8 seconds timeout | |
| start_time = time.time() | |
| check_interval = 0.05 # Check more frequently | |
| while time.time() - start_time < timeout: | |
| if request_id in variable_results: | |
| result = variable_results.pop(request_id) | |
| print(f"[VARIABLE RESULT] Received result for {request_id}: success={result.get('success')}, error={result.get('error')}") | |
| if result["success"]: | |
| return f"[TOOL] Successfully created variable: {result.get('variable_id', var_name)}" | |
| else: | |
| return f"[TOOL] Failed to create variable: {result.get('error', 'Unknown error')}" | |
| time.sleep(check_interval) | |
| print(f"[VARIABLE TIMEOUT] No response received for request {request_id} after {timeout} seconds") | |
| return f"Timeout waiting for variable creation confirmation" | |
| except Exception as e: | |
| print(f"[VARIABLE ERROR] {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error creating variable: {str(e)}" | |
| # Server-Sent Events endpoint for creation requests | |
| async def create_stream(): | |
| """Stream creation requests to the frontend using Server-Sent Events""" | |
| async def clear_sent_request(sent_requests, request_id, delay): | |
| """Clear request_id from sent_requests after delay seconds""" | |
| await asyncio.sleep(delay) | |
| if request_id in sent_requests: | |
| sent_requests.discard(request_id) | |
| async def event_generator(): | |
| sent_requests = set() # Track sent requests to avoid duplicates | |
| heartbeat_counter = 0 | |
| while True: | |
| try: | |
| # Check for creation requests (non-blocking) | |
| if not creation_queue.empty(): | |
| creation_request = creation_queue.get_nowait() | |
| request_id = creation_request.get("request_id") | |
| # Avoid sending duplicate requests too quickly | |
| if request_id not in sent_requests: | |
| sent_requests.add(request_id) | |
| print(f"[SSE CREATE SEND] Sending creation request with ID: {request_id}") | |
| yield f"data: {json.dumps(creation_request)}\n\n" | |
| # Clear from sent_requests after 10 seconds | |
| asyncio.create_task(clear_sent_request(sent_requests, request_id, 10)) | |
| else: | |
| print(f"[SSE CREATE SKIP] Skipping duplicate request for ID: {request_id}") | |
| await asyncio.sleep(0.1) # Small delay between messages | |
| else: | |
| # Send a heartbeat every 30 seconds to keep connection alive | |
| heartbeat_counter += 1 | |
| if heartbeat_counter >= 300: # 300 * 0.1 = 30 seconds | |
| yield f"data: {json.dumps({'heartbeat': True})}\n\n" | |
| heartbeat_counter = 0 | |
| await asyncio.sleep(0.1) | |
| except queue.Empty: | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| print(f"[SSE CREATE ERROR] {e}") | |
| await asyncio.sleep(1) | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| ) | |
| # Endpoint to receive creation results from frontend | |
| async def creation_result(request: Request): | |
| """Receive creation results from the frontend""" | |
| data = await request.json() | |
| request_id = data.get("request_id") | |
| success = data.get("success") | |
| error = data.get("error") | |
| block_id = data.get("block_id") | |
| print(f"[CREATION RESULT RECEIVED] request_id={request_id}, success={success}, error={error}, block_id={block_id}") | |
| if request_id: | |
| # Store the result for the create_block function to retrieve | |
| creation_results[request_id] = data | |
| print(f"[CREATION RESULT STORED] Results dict now has {len(creation_results)} items") | |
| return {"received": True} | |
| # Server-Sent Events endpoint for deletion requests | |
| async def delete_stream(): | |
| """Stream deletion requests to the frontend using Server-Sent Events""" | |
| async def clear_sent_request(sent_requests, block_id, delay): | |
| """Clear block_id from sent_requests after delay seconds""" | |
| await asyncio.sleep(delay) | |
| if block_id in sent_requests: | |
| sent_requests.discard(block_id) | |
| async def event_generator(): | |
| sent_requests = set() # Track sent requests to avoid duplicates | |
| heartbeat_counter = 0 | |
| while True: | |
| try: | |
| # Check for deletion requests (non-blocking) | |
| if not deletion_queue.empty(): | |
| deletion_request = deletion_queue.get_nowait() | |
| block_id = deletion_request.get("block_id") | |
| # Avoid sending duplicate requests too quickly | |
| if block_id not in sent_requests: | |
| sent_requests.add(block_id) | |
| print(f"[SSE SEND] Sending deletion request for block: {block_id}") | |
| yield f"data: {json.dumps(deletion_request)}\n\n" | |
| # Clear from sent_requests after 10 seconds | |
| asyncio.create_task(clear_sent_request(sent_requests, block_id, 10)) | |
| else: | |
| print(f"[SSE SKIP] Skipping duplicate request for block: {block_id}") | |
| await asyncio.sleep(0.1) # Small delay between messages | |
| else: | |
| # Send a heartbeat every 30 seconds to keep connection alive | |
| heartbeat_counter += 1 | |
| if heartbeat_counter >= 300: # 300 * 0.1 = 30 seconds | |
| yield f"data: {json.dumps({'heartbeat': True})}\n\n" | |
| heartbeat_counter = 0 | |
| await asyncio.sleep(0.1) | |
| except queue.Empty: | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| print(f"[SSE ERROR] {e}") | |
| await asyncio.sleep(1) | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| ) | |
| # Endpoint to receive deletion results from frontend | |
| async def deletion_result(request: Request): | |
| """Receive deletion results from the frontend""" | |
| data = await request.json() | |
| block_id = data.get("block_id") | |
| success = data.get("success") | |
| error = data.get("error") | |
| print(f"[DELETION RESULT RECEIVED] block_id={block_id}, success={success}, error={error}") | |
| if block_id: | |
| # Store the result for the delete_block function to retrieve | |
| deletion_results[block_id] = data | |
| print(f"[DELETION RESULT STORED] Results dict now has {len(deletion_results)} items") | |
| return {"received": True} | |
| # Server-Sent Events endpoint for variable creation requests | |
| async def variable_stream(): | |
| """Stream variable creation requests to the frontend using Server-Sent Events""" | |
| async def clear_sent_request(sent_requests, request_id, delay): | |
| """Clear request_id from sent_requests after delay seconds""" | |
| await asyncio.sleep(delay) | |
| if request_id in sent_requests: | |
| sent_requests.discard(request_id) | |
| async def event_generator(): | |
| sent_requests = set() # Track sent requests to avoid duplicates | |
| heartbeat_counter = 0 | |
| while True: | |
| try: | |
| # Check for variable creation requests (non-blocking) | |
| if not variable_queue.empty(): | |
| var_request = variable_queue.get_nowait() | |
| request_id = var_request.get("request_id") | |
| # Avoid sending duplicate requests too quickly | |
| if request_id not in sent_requests: | |
| sent_requests.add(request_id) | |
| print(f"[SSE VARIABLE SEND] Sending variable creation request with ID: {request_id}") | |
| yield f"data: {json.dumps(var_request)}\n\n" | |
| # Clear from sent_requests after 10 seconds | |
| asyncio.create_task(clear_sent_request(sent_requests, request_id, 10)) | |
| else: | |
| print(f"[SSE VARIABLE SKIP] Skipping duplicate request for ID: {request_id}") | |
| await asyncio.sleep(0.1) # Small delay between messages | |
| else: | |
| # Send a heartbeat every 30 seconds to keep connection alive | |
| heartbeat_counter += 1 | |
| if heartbeat_counter >= 300: # 300 * 0.1 = 30 seconds | |
| yield f"data: {json.dumps({'heartbeat': True})}\n\n" | |
| heartbeat_counter = 0 | |
| await asyncio.sleep(0.1) | |
| except queue.Empty: | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| print(f"[SSE VARIABLE ERROR] {e}") | |
| await asyncio.sleep(1) | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| ) | |
| # Endpoint to receive variable creation results from frontend | |
| async def variable_result(request: Request): | |
| """Receive variable creation results from the frontend""" | |
| data = await request.json() | |
| request_id = data.get("request_id") | |
| success = data.get("success") | |
| error = data.get("error") | |
| variable_id = data.get("variable_id") | |
| print(f"[VARIABLE RESULT RECEIVED] request_id={request_id}, success={success}, error={error}, variable_id={variable_id}") | |
| if request_id: | |
| # Store the result for the create_variable function to retrieve | |
| variable_results[request_id] = data | |
| print(f"[VARIABLE RESULT STORED] Results dict now has {len(variable_results)} items") | |
| return {"received": True} | |
| def deploy_to_huggingface(space_name): | |
| """Deploy the generated MCP code to a Hugging Face Space""" | |
| global stored_hf_key | |
| if not stored_hf_key: | |
| return "[DEPLOY ERROR] No Hugging Face API key configured. Please set it in File > Keys." | |
| try: | |
| from huggingface_hub import HfApi | |
| except ImportError: | |
| return "[DEPLOY ERROR] huggingface_hub not installed. Run: pip install huggingface_hub" | |
| try: | |
| api = HfApi(token=stored_hf_key) | |
| # Get username from token | |
| user_info = api.whoami() | |
| username = user_info["name"] | |
| repo_id = f"{username}/{space_name}" | |
| print(f"[DEPLOY] Creating HF Space: {repo_id}") | |
| # Create the Space | |
| api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| space_sdk="gradio", | |
| private=False, | |
| ) | |
| print(f"[DEPLOY] Space created. Uploading files...") | |
| # Get the actual generated Python code from test.py (not the Blockly DSL) | |
| python_code = "" | |
| try: | |
| resp = requests.get(f"http://127.0.0.1:{os.getenv('PORT', 8080)}/get_latest_code") | |
| if resp.ok: | |
| python_code = resp.json().get("code", "") | |
| except Exception as e: | |
| print(f"[DEPLOY WARN] Could not fetch Python code from test.py: {e}") | |
| if not python_code.strip(): | |
| return "[DEPLOY ERROR] No generated Python code available. Create and test your tool first." | |
| # Upload app.py with actual Python code | |
| api.upload_file( | |
| path_or_fileobj=python_code.encode(), | |
| path_in_repo="app.py", | |
| repo_id=repo_id, | |
| repo_type="space", | |
| ) | |
| # Create requirements.txt | |
| requirements_content = """gradio | |
| openai | |
| requests | |
| huggingface_hub | |
| """ | |
| api.upload_file( | |
| path_or_fileobj=requirements_content.encode(), | |
| path_in_repo="requirements.txt", | |
| repo_id=repo_id, | |
| repo_type="space", | |
| ) | |
| # Create README.md with proper YAML front matter | |
| readme_content = f"""--- | |
| title: {space_name.replace('-', ' ').title()} | |
| emoji: 🚀 | |
| colorFrom: purple | |
| colorTo: blue | |
| sdk: gradio | |
| app_file: app.py | |
| pinned: false | |
| --- | |
| # {space_name} | |
| This is a MCP server created with [MCP Blockly](https://github.com/owenkaplinsky/mcp-blockly): a visual programming environment for building AI tools. | |
| The tool has been automatically deployed to Hugging Face Spaces and is ready to use! | |
| """ | |
| api.upload_file( | |
| path_or_fileobj=readme_content.encode("utf-8"), | |
| path_in_repo="README.md", | |
| repo_id=repo_id, | |
| repo_type="space", | |
| ) | |
| space_url = f"https://huggingface.co/spaces/{repo_id}" | |
| print(f"[DEPLOY SUCCESS] Space deployed: {space_url}") | |
| # Store the MCP server URL globally for native MCP support | |
| global current_mcp_server_url, deployment_just_happened, deployment_message | |
| current_mcp_server_url = space_url | |
| deployment_just_happened = True | |
| deployment_message = f"Your MCP tool is being built on Hugging Face Spaces. This usually takes 1-2 minutes. Once it's ready, you'll be able to use the MCP tools defined in your blocks." | |
| print(f"[MCP] Registered MCP server: {current_mcp_server_url}") | |
| return f"[TOOL] Successfully deployed to Hugging Face Space!\n\n**Space URL:** {space_url}" | |
| except Exception as e: | |
| print(f"[DEPLOY ERROR] {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"[DEPLOY ERROR] Failed to deploy: {str(e)}" | |
| def create_gradio_interface(): | |
| # Hardcoded system prompt | |
| SYSTEM_PROMPT = f"""You are an AI assistant that helps users build **MCP servers** using Blockly blocks. | |
| MCP lets AI systems define tools with specific inputs and outputs that any LLM can call. | |
| You’ll receive the workspace state in this format: | |
| `blockId | block_name(inputs(input_name: value))` | |
| **Special cases:** | |
| - `create_mcp` and `func_def` use `blockId | block_name(inputs(input_name: type), outputs(output_name: value))` | |
| - Indentation or nesting shows logic hierarchy (like loops or conditionals). | |
| - The `blockId` before the pipe `|` is each block’s unique identifier. | |
| --- | |
| ### Your job | |
| - Help users understand or fix their MCP logic in natural, human language. | |
| - You may reference the internal block syntax for your own understanding, but never show or explain it to the | |
| user unless they explicitly ask. | |
| - Focus on what the code *does* and what the user is trying to achieve, not on the raw block format. | |
| - In your first message, you may either respond normally or call a tool. If you call a tool, you must first | |
| explain your intended plan and the steps you will take, then perform the tool call in the same message. | |
| --- | |
| ### Using Your MCP | |
| Once you deploy your MCP to a Hugging Face Space, the model will automatically have access to all the tools you defined. Simply ask the model to use your MCP tools, and it will call them natively without manual intervention. | |
| **Deployment workflow:** | |
| 1. Create and test your MCP using Blockly blocks | |
| 2. Deploy to a Hugging Face Space using the `deploy_to_huggingface` tool | |
| 3. After deployment, the MCP tool becomes immediately available in this chat | |
| 4. You may call this tool afterwards as needed. Do not immediately run the MCP | |
| server after deploying it. The user must ask (you can ask if they want it) | |
| --- | |
| ### Deleting Blocks | |
| - Each block starts with its ID, like `blockId | block_name(...)`. | |
| - To delete a block, specify its block ID. Each block ID is a unique random alphanumeric string shown to the left of the block. | |
| - You can delete any block except the main `create_mcp` block. | |
| `blockId | code` | |
| Each block has its own ID, and you need to use the ID specifically from | |
| the correct block. | |
| --- | |
| ### Creating Blocks | |
| List of blocks: | |
| {blocks_context} | |
| --- | |
| You can create new blocks in the workspace by specifying the block type and its input parameters, if it has any. | |
| You cannot create a MCP block or edit its inputs or outputs. | |
| There are two kinds of nesting in Blockly: | |
| 1. **Statement-level nesting (main-level blocks)** | |
| These are blocks that represent actions or structures, such as loops or conditionals, which can contain other blocks *under* them. | |
| To create this kind of nesting, use **two separate `create_block` commands**: | |
| - First, create the outer block (for example, a `repeat` or `if` block). | |
| - Then, create the inner block *under* it using the `under` parameter. | |
| Example: putting an `if` block inside a `repeat` block. | |
| 2. **Value-level nesting (output blocks)** | |
| These are blocks that produce a value (like a number, text, or expression). They can’t exist alone in the workspace - they must | |
| be nested inside another block’s input. To create these, you can nest them directly in a single command, for example: | |
| math_arithmetic(inputs(A: math_number(inputs(NUM: 1)), B: math_number(inputs(NUM: 1)))) | |
| Here, the two `math_number` blocks are nested inside the `math_arithmetic` block in one call. | |
| When creating blocks, you are never allowed to insert raw text or numbers directly into a block's inputs. | |
| Every value must be enclosed inside the correct block type that represents that value. | |
| Failing to do this will cause the block to be invalid and unusable. | |
| Example of what you must NOT do: | |
| `text_isEmpty(inputs(VALUE: "text"))` | |
| This is invalid because "text" is a raw string and not a block. | |
| The correct and required form wraps the string inside a text block: | |
| `text_isEmpty(inputs(VALUE: text(inputs(TEXT: "text"))))` | |
| This is valid because it uses a text block as the value. | |
| This rule is absolute and applies to all value types: | |
| - Strings must always use a text block. | |
| - Numbers must always use a math_number block. | |
| - Booleans, lists, colors, and every other type must always use their correct block type. | |
| If a block has a value input, that input must always contain another block. | |
| You are not permitted to use raw values under any circumstance. | |
| For blocks that allow infinite things (like ...N) you do not need to provide any inputs | |
| if you want it to be blank. | |
| When creating blocks, you are unable to put an outputting block inside of another block | |
| which already exists. If you are trying to nest input blocks, you must create them all | |
| in one call. | |
| But, for blocks that you want to stack that connect above or below to other blocks, you cannot | |
| create both blocks in the same response. You must create one, wait, then create the other. You | |
| need to wait and not do both in the same response because you need the ID of the first block. | |
| ### Variables | |
| You will be given the current variables that are in the workspace. Like the blocks, you will see: | |
| `varId | varName` | |
| --- | |
| ### Deploying to Hugging Face Spaces | |
| Once the user has tested and is happy with their MCP tool, you can deploy it to a live Hugging Face Space using the `deploy_to_huggingface` tool. | |
| **To deploy:** | |
| 1. Ask the user for a name for their Space (e.g., "my-tool") | |
| 2. Call the `deploy_to_huggingface` tool with that name | |
| 3. The tool will create a new Space, upload the code, and return a live URL | |
| The deployed Space will be public and shareable with others. | |
| You NEVER need to deploy it more than once. If you deployed it, you can run it as many times as you want WITHOUT deploying again. | |
| --- | |
| Note: Users can see tool response outputs verbatim. You don't have to repeat the tool response unless you want to. | |
| """ | |
| tools = [ | |
| { | |
| "type": "function", | |
| "name": "delete_block", | |
| "description": "Delete a single block using its ID.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "id": { | |
| "type": "string", | |
| "description": "The ID of the block you're trying to delete.", | |
| }, | |
| }, | |
| "required": ["id"], | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "name": "create_block", | |
| "description": "Creates a single block that allows recursive nested blocks.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "command": { | |
| "type": "string", | |
| "description": "The create block command using the custom DSL format.", | |
| }, | |
| "under": { | |
| "type": "string", | |
| "description": "The ID of the block that you want to place this under.", | |
| }, | |
| }, | |
| "required": ["command"], | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "name": "create_variable", | |
| "description": "Creates a variable.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "name": { | |
| "type": "string", | |
| "description": "The name of the variable you want to create.", | |
| }, | |
| }, | |
| "required": ["name"], | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "name": "deploy_to_huggingface", | |
| "description": "Deploy the generated MCP tool to a Hugging Face Space. Requires a Hugging Face API key to be set.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "space_name": { | |
| "type": "string", | |
| "description": "The name of the Hugging Face Space to create (e.g., 'my-tool')", | |
| }, | |
| }, | |
| "required": ["space_name"], | |
| } | |
| }, | |
| ] | |
| def chat_with_context(message, history): | |
| # Check if API key is set and create/update client | |
| global client, stored_api_key | |
| # Use stored key or environment key | |
| api_key = stored_api_key or os.environ.get("OPENAI_API_KEY") | |
| if api_key and (not client or (hasattr(client, 'api_key') and client.api_key != api_key)): | |
| try: | |
| client = OpenAI(api_key=api_key) | |
| except Exception as e: | |
| yield f"Error initializing OpenAI client: {str(e)}" | |
| return | |
| if not client or not api_key: | |
| yield "OpenAI API key not configured. Please set it in File > Settings in the Blockly interface." | |
| return | |
| # Get chat context | |
| global latest_blockly_chat_code | |
| context = latest_blockly_chat_code | |
| global latest_blockly_vars | |
| vars = latest_blockly_vars | |
| # Convert history to OpenAI format | |
| input_items = [] | |
| for human, ai in history: | |
| input_items.append({"role": "user", "content": human}) | |
| input_items.append({"role": "assistant", "content": ai}) | |
| # Debug | |
| print(f"[DEBUG] Context received: {context if context else 'No context available'}") | |
| # Build instructions | |
| instructions = SYSTEM_PROMPT | |
| if context: | |
| instructions += f"\n\nCurrent Blockly workspace state:\n{context}" | |
| else: | |
| instructions += "\n\nNote: No Blockly workspace context is currently available." | |
| if vars != "": | |
| instructions += f"\n\nCurrent Blockly variables:\n{vars}" | |
| else: | |
| instructions += "\n\nNote: No Blockly variables are currently available." | |
| # Iteration control | |
| accumulated_response = "" | |
| max_iterations = 10 | |
| current_iteration = 0 | |
| # Start with original user message | |
| current_prompt = message | |
| temp_input_items = input_items.copy() | |
| # MAIN LOOP | |
| while current_iteration < max_iterations: | |
| current_iteration += 1 | |
| try: | |
| # Build dynamic tools list with MCP support | |
| dynamic_tools = tools.copy() if tools else [] | |
| # Inject MCP tool if a server is registered | |
| global current_mcp_server_url, deployment_just_happened, deployment_message | |
| space_building_status = None # Track if space is building | |
| if current_mcp_server_url: | |
| mcp_injection_successful = False | |
| try: | |
| # Try to verify the MCP server is available before injecting | |
| space_is_running = False | |
| try: | |
| # Extract username and space name from URL | |
| # URL format: https://huggingface.co/spaces/username/space_name | |
| url_parts = current_mcp_server_url.split("/spaces/") | |
| if len(url_parts) == 2: | |
| space_id = url_parts[1] | |
| api = HfApi() | |
| runtime_info = api.get_space_runtime(space_id) | |
| print(f"[MCP] Space runtime status: {runtime_info}") | |
| # Check if space is running | |
| if runtime_info and runtime_info.stage == "RUNNING": | |
| space_is_running = True | |
| # Space is running - deployment is complete | |
| deployment_just_happened = False | |
| print(f"[MCP] Space is RUNNING") | |
| else: | |
| # Space is not running - it's likely building | |
| space_building_status = runtime_info.stage if runtime_info else "unknown" | |
| print(f"[MCP] Space is not running yet (stage: {space_building_status})") | |
| except Exception as check_error: | |
| print(f"[MCP] Could not verify space runtime: {check_error}") | |
| # Only inject the MCP tool if the space is verified running | |
| if space_is_running: | |
| def convert_repo_to_live_mcp(url): | |
| # input: https://huggingface.co/spaces/user/space | |
| # output: https://user-space.hf.space/gradio_api/mcp | |
| parts = url.split("/spaces/") | |
| user, space = parts[1].split("/") | |
| return f"https://{user}-{space}.hf.space/gradio_api/mcp" | |
| live_mcp_url = convert_repo_to_live_mcp(current_mcp_server_url) | |
| mcp_tool = { | |
| "type": "mcp", | |
| "server_url": live_mcp_url, | |
| "server_label": "user_mcp_server", | |
| "require_approval": "never" | |
| } | |
| dynamic_tools.append(mcp_tool) | |
| print(f"[MCP] Injected MCP tool for server: {current_mcp_server_url}") | |
| else: | |
| print(f"[MCP] Skipping MCP tool injection - space not running yet") | |
| except Exception as mcp_error: | |
| print(f"[MCP ERROR] Failed during MCP injection: {mcp_error}") | |
| print(f"[MCP] Continuing without MCP tools...") | |
| # Continue without MCP - don't crash | |
| # Add deployment status message to instructions if deployment just happened and space is not running | |
| deployment_instructions = instructions | |
| if deployment_just_happened and space_building_status and space_building_status != "RUNNING": | |
| deployment_instructions = instructions + f"\n\n**MCP DEPLOYMENT STATUS:** {deployment_message}" | |
| # Create Responses API call | |
| response = client.responses.create( | |
| model="gpt-4o", | |
| instructions=deployment_instructions, | |
| input=temp_input_items + [{"role": "user", "content": current_prompt}], | |
| tools=dynamic_tools, | |
| tool_choice="auto" | |
| ) | |
| # print(response) | |
| # Extract outputs | |
| ai_response = "" | |
| tool_calls = [] | |
| for item in response.output: | |
| if item.type == "message": | |
| # Extract assistant text | |
| for content in item.content: | |
| if content.type == "output_text": | |
| ai_response = content.text | |
| elif item.type == "function_call": | |
| # Collect tool calls | |
| tool_calls.append(item) | |
| # PROCESSING TOOL CALLS | |
| if tool_calls: | |
| # Show assistant text FIRST if it exists | |
| if ai_response: | |
| if accumulated_response: | |
| accumulated_response += "\n\n" | |
| accumulated_response += ai_response | |
| yield accumulated_response | |
| # Now process each tool call, one by one | |
| for tool_call in tool_calls: | |
| function_name = tool_call.name | |
| function_args = json.loads(tool_call.arguments) | |
| call_id = tool_call.call_id | |
| temp_input_items.append({"role": "user", "content": current_prompt}) | |
| temp_input_items.append({"role": "assistant", "content": ai_response}) | |
| temp_input_items.append({ | |
| "type": "function_call", | |
| "call_id": call_id, | |
| "name": function_name, | |
| "arguments": tool_call.arguments | |
| }) | |
| # Execute the tool | |
| tool_result = None | |
| result_label = "" | |
| if function_name == "delete_block": | |
| block_id = function_args.get("id", "") | |
| print(Fore.YELLOW + f"Agent deleted block with ID `{block_id}`." + Style.RESET_ALL) | |
| tool_result = delete_block(block_id) | |
| result_label = "Delete Operation" | |
| elif function_name == "create_block": | |
| command = function_args.get("command", "") | |
| under_block_id = function_args.get("under", None) | |
| if under_block_id is None: | |
| print(Fore.YELLOW + f"Agent created block with command `{command}`." + Style.RESET_ALL) | |
| else: | |
| print(Fore.YELLOW + f"Agent created block with command `{command}`, under block ID `{under_block_id}`." + Style.RESET_ALL) | |
| tool_result = create_block(command, under_block_id) | |
| result_label = "Create Operation" | |
| elif function_name == "create_variable": | |
| name = function_args.get("name", "") | |
| print(Fore.YELLOW + f"Agent created variable with name `{name}`." + Style.RESET_ALL) | |
| tool_result = create_variable(name) | |
| result_label = "Create Var Operation" | |
| elif function_name == "deploy_to_huggingface": | |
| space_name = function_args.get("space_name", "") | |
| print(Fore.YELLOW + f"Agent deploying to Hugging Face Space `{space_name}`." + Style.RESET_ALL) | |
| tool_result = deploy_to_huggingface(space_name) | |
| result_label = "Deployment Result" | |
| # SHOW TOOL RESULT IMMEDIATELY | |
| if tool_result is not None: | |
| print(Fore.YELLOW + f"[TOOL RESULT] {tool_result}" + Style.RESET_ALL) | |
| if accumulated_response: | |
| accumulated_response += "\n\n" | |
| accumulated_response += f"**{result_label}:** {tool_result}" | |
| yield accumulated_response | |
| # Append the tool result into the conversation for the model | |
| temp_input_items.append({ | |
| "type": "function_call_output", | |
| "call_id": tool_call.call_id, | |
| "output": str(tool_result) | |
| }) | |
| # Tell model to respond to tool result | |
| current_prompt = "The tool has been executed with the result shown above. Please respond appropriately." | |
| continue # Continue the main loop | |
| else: | |
| if ai_response: | |
| if accumulated_response: | |
| accumulated_response += "\n\n" | |
| accumulated_response += ai_response | |
| yield accumulated_response | |
| break | |
| except Exception as e: | |
| if accumulated_response: | |
| yield f"{accumulated_response}\n\nError in iteration {current_iteration}: {str(e)}" | |
| else: | |
| yield f"Error: {str(e)}" | |
| return | |
| # Max iterations reached | |
| if current_iteration >= max_iterations: | |
| accumulated_response += f"\n\n*(Reached maximum of {max_iterations} consecutive responses)*" | |
| yield accumulated_response | |
| # Attach to Gradio ChatInterface | |
| demo = gr.ChatInterface( | |
| fn=chat_with_context, | |
| title="AI Assistant", | |
| ) | |
| return demo | |
| def get_chat_gradio_interface(): | |
| return create_gradio_interface() | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface() | |
| app = gr.mount_gradio_app(app, demo, path="/") |