Write your own LLM client
A custom LLM-side client comes in two flavors. Style A is what most clients should do: let an MCP library handle auth and call dispatch tools, get JSON back. Style B is for clients that need to observe bus traffic the server didn’t originate on their behalf — progress events, dispatches from other LLMs, event broadcasts.
Pick a style
Section titled “Pick a style”| Need | Style A (library-mediated) | Style B (custom subscription) |
|---|---|---|
| Read scene info, run code, download assets | Yes | Yes |
| Listen for progress on long-running bakes | No | Yes |
| Observe dispatches you didn’t originate | No | Yes |
| Approval-workflow inbox | No | Yes |
| Multi-LLM coordination via broadcasts | No | Yes |
| Auth boilerplate to write | None (library handles) | Same library handles it |
| Bus boilerplate to write | None | Handler + JobWaiter + filter |
Start with Style A. Only fall back to Style B when the pattern requires it.
Style A — library-mediated dispatch tool calls
Section titled “Style A — library-mediated dispatch tool calls”The minimal happy path. Three things: connect, call, parse.
Stage 1 — Connect
Section titled “Stage 1 — Connect”fastmcp.Client discovers /.well-known/oauth-protected-resource, registers via DCR, runs the PKCE flow in your browser, and caches the token to ~/.config/fastmcp/. Subsequent runs reuse the cached token until it expires.
from fastmcp import Client
SERVER = "https://mcp.blender.bet/"
async with Client(SERVER) as client: # First call pops your browser. Subsequent calls are silent. ...If you want to drive the OAuth wire yourself, see the Authentication reference for discovery URLs, DCR, and PKCE shapes. Most clients should not.
Stage 2 — Call dispatch tools
Section titled “Stage 2 — Call dispatch tools”import asyncio, jsonfrom fastmcp import Client
SERVER = "https://mcp.blender.bet/"
async def main(): async with Client(SERVER) as client: # Read the active scene. Returns a JSON string; parse it. raw = await client.call_tool("blender_get_scene_info", {}) envelope = json.loads(raw.content[0].text) print(envelope) # {"status": "completed", "command": "get_scene_info", # "target_uuid": "blender-demo01", "job_id": "j-...", # "result": "{...scene info as JSON string...}", "error": ""}
# Run arbitrary Python in Blender. raw = await client.call_tool("blender_execute_code", { "code": "import bpy; print(len(bpy.data.objects))" }) print(json.loads(raw.content[0].text)["result"]) # "3\n"
# Download a PolyHaven HDRI. Status probe first. status = json.loads( (await client.call_tool("blender_get_polyhaven_status", {})).content[0].text ) if status["status"] == "completed": await client.call_tool("blender_download_polyhaven_asset", { "asset_id": "kloofendal_43d_clear_puresky", "asset_type": "hdris", "resolution": "2k", })
asyncio.run(main())That’s a complete Style A client. No registration, no from_uuid, no job_id. The server’s BlenderDispatchComponent handles all of it.
When target_uuid matters
Section titled “When target_uuid matters”Auto-pick works when exactly one blender client is connected to YOUR bus. With zero or more than one, dispatch tools return structured errors:
{"status": "no_client", "command": "get_scene_info", "hint": "No Blender client connected to your bus. Open Blender, enable the BlenderMCP addon, click Login then Connect."}{"status": "ambiguous_target", "command": "get_scene_info", "candidates": ["blender-abc", "blender-def"], "hint": "Multiple Blender clients connected; pass target_uuid=<one of candidates> to disambiguate."}Pass target_uuid in the arguments dict to pick one:
await client.call_tool("blender_get_scene_info", {"target_uuid": "blender-abc"})Tuning timeouts
Section titled “Tuning timeouts”Every dispatch tool accepts _timeout (underscore prefix avoids collision with handler kwargs):
await client.call_tool("blender_download_polyhaven_asset", { "asset_id": "...", "asset_type": "models", "_timeout": 600.0, # default for downloads is 180s; bump for big models})Per-tool defaults: status probes 15s, most reads 30s, code exec / Rodin jobs 60s, downloads 180s.
Style B — custom _message_bus subscription
Section titled “Style B — custom _message_bus subscription”For long-running progress streams, observing other LLMs’ dispatches, or receiving broadcasts. Same auth as Style A — the library still handles the OAuth flow; you just add a message handler and bus registration on top.
Stage 1 — Connect (same as Style A)
Section titled “Stage 1 — Connect (same as Style A)”Stage 2 — Message handler that filters bus traffic
Section titled “Stage 2 — Message handler that filters bus traffic”import json
MESSAGE_BUS_LOGGER = "_message_bus"
async def on_message(message): inner = getattr(message, "root", message) if getattr(inner, "method", None) != "notifications/message": return params = getattr(inner, "params", None) if params is None: return
logger = getattr(params, "logger", None) or ( params.get("logger") if isinstance(params, dict) else None ) if logger != MESSAGE_BUS_LOGGER: return
data = getattr(params, "data", None) or ( params.get("data") if isinstance(params, dict) else None ) if isinstance(data, str): data = json.loads(data)
# data shape: # {user_id, from_uuid, target_uuid, routing, payload, # job_id, message_id, priority, timestamp} handle_bus_record(data)The same shape arrives at every bus subscriber. The filter on logger == "_message_bus" is what keeps other MCP log traffic from being misinterpreted.
Stage 3 — Client-side JobWaiter
Section titled “Stage 3 — Client-side JobWaiter”import asyncio
class JobWaiter: def __init__(self): self._futures: dict[str, asyncio.Future] = {}
def register(self, job_id: str) -> asyncio.Future: loop = asyncio.get_event_loop() fut = loop.create_future() self._futures[job_id] = fut return fut
def resolve(self, data: dict): # called from on_message when a job_update arrives payload = data.get("payload", {}) if payload.get("kind") != "job_update": return fut = self._futures.pop(payload["job_id"], None) if fut and not fut.done(): fut.set_result(payload)Wire waiter.resolve(data) into your on_message.
Stage 4 — Register, dispatch, await
Section titled “Stage 4 — Register, dispatch, await”async def main(): waiter = JobWaiter()
async def handler(message): # reuse on_message from stage 2, plus: # waiter.resolve(decoded_data) ...
async with Client(SERVER, message_handler=handler) as client: # Subscribe to every priority level (default is "warning"). await client.set_logging_level("debug")
# Join the bus. my_uuid = "llm-myclient-001" await client.call_tool("blender_register_client", { "client_uuid": my_uuid, "client_type": "llm", "is_persistent": False, "capabilities": ["chat"], })
# Dispatch a script. job_id = "job-abc123" fut = waiter.register(job_id)
await client.call_tool("blender_send_message", { "target_uuid": "blender-demo01", "from_uuid": my_uuid, "priority": "info", "payload": { "message_type": "job_dispatch", "job_id": job_id, "script": "import bpy\nprint(len(bpy.data.objects))", }, })
# Wait for the addon's job_update reply. result = await asyncio.wait_for(fut, timeout=30) print(result) # {kind:'job_update', status:'completed', result:'3\n', ...}
asyncio.run(main())Cleanup
Section titled “Cleanup”await client.call_tool("blender_unregister_client", {"client_uuid": my_uuid})Ephemeral clients (is_persistent=False) age out of the bus when the session drops, but an explicit unregister keeps the client list clean and saves the next list_available_clients consumer from seeing stale entries.
Mixing styles
Section titled “Mixing styles”Nothing stops you from doing both. A Style A client that occasionally needs to observe broadcasts can wire a Style B message_handler onto its Client and keep using dispatch tools for the request/response paths. The server doesn’t care which style produced a given tool call.
Reference
Section titled “Reference”- Authentication — the OAuth flow if you want to implement it yourself
- Dispatch tools — all 24 signatures, return shapes, timeouts
- Bus tools —
register_client,send_message,job_update,list_available_clients,unregister_client - Priority levels — what
priority="info"actually means - Routing modes — direct, group, type-filter, broadcast
- ClientInfo — what
register_clientwrites - Command dispatch vs script dispatch — the underlying design