Skip to content

Write your own LLM client

A custom LLM-side client comes in two flavors. Style A is what most clients should do: let an MCP library handle auth and call dispatch tools, get JSON back. Style B is for clients that need to observe bus traffic the server didn’t originate on their behalf — progress events, dispatches from other LLMs, event broadcasts.

NeedStyle A (library-mediated)Style B (custom subscription)
Read scene info, run code, download assetsYesYes
Listen for progress on long-running bakesNoYes
Observe dispatches you didn’t originateNoYes
Approval-workflow inboxNoYes
Multi-LLM coordination via broadcastsNoYes
Auth boilerplate to writeNone (library handles)Same library handles it
Bus boilerplate to writeNoneHandler + JobWaiter + filter

Start with Style A. Only fall back to Style B when the pattern requires it.

Style A — library-mediated dispatch tool calls

Section titled “Style A — library-mediated dispatch tool calls”

The minimal happy path. Three things: connect, call, parse.

fastmcp.Client discovers /.well-known/oauth-protected-resource, registers via DCR, runs the PKCE flow in your browser, and caches the token to ~/.config/fastmcp/. Subsequent runs reuse the cached token until it expires.

from fastmcp import Client
SERVER = "https://mcp.blender.bet/"
async with Client(SERVER) as client:
# First call pops your browser. Subsequent calls are silent.
...

If you want to drive the OAuth wire yourself, see the Authentication reference for discovery URLs, DCR, and PKCE shapes. Most clients should not.

import asyncio, json
from fastmcp import Client
SERVER = "https://mcp.blender.bet/"
async def main():
async with Client(SERVER) as client:
# Read the active scene. Returns a JSON string; parse it.
raw = await client.call_tool("blender_get_scene_info", {})
envelope = json.loads(raw.content[0].text)
print(envelope)
# {"status": "completed", "command": "get_scene_info",
# "target_uuid": "blender-demo01", "job_id": "j-...",
# "result": "{...scene info as JSON string...}", "error": ""}
# Run arbitrary Python in Blender.
raw = await client.call_tool("blender_execute_code", {
"code": "import bpy; print(len(bpy.data.objects))"
})
print(json.loads(raw.content[0].text)["result"]) # "3\n"
# Download a PolyHaven HDRI. Status probe first.
status = json.loads(
(await client.call_tool("blender_get_polyhaven_status", {})).content[0].text
)
if status["status"] == "completed":
await client.call_tool("blender_download_polyhaven_asset", {
"asset_id": "kloofendal_43d_clear_puresky",
"asset_type": "hdris",
"resolution": "2k",
})
asyncio.run(main())

That’s a complete Style A client. No registration, no from_uuid, no job_id. The server’s BlenderDispatchComponent handles all of it.

Auto-pick works when exactly one blender client is connected to YOUR bus. With zero or more than one, dispatch tools return structured errors:

{"status": "no_client", "command": "get_scene_info",
"hint": "No Blender client connected to your bus. Open Blender, enable the BlenderMCP addon, click Login then Connect."}
{"status": "ambiguous_target", "command": "get_scene_info",
"candidates": ["blender-abc", "blender-def"],
"hint": "Multiple Blender clients connected; pass target_uuid=<one of candidates> to disambiguate."}

Pass target_uuid in the arguments dict to pick one:

await client.call_tool("blender_get_scene_info", {"target_uuid": "blender-abc"})

Every dispatch tool accepts _timeout (underscore prefix avoids collision with handler kwargs):

await client.call_tool("blender_download_polyhaven_asset", {
"asset_id": "...", "asset_type": "models",
"_timeout": 600.0, # default for downloads is 180s; bump for big models
})

Per-tool defaults: status probes 15s, most reads 30s, code exec / Rodin jobs 60s, downloads 180s.

Style B — custom _message_bus subscription

Section titled “Style B — custom _message_bus subscription”

For long-running progress streams, observing other LLMs’ dispatches, or receiving broadcasts. Same auth as Style A — the library still handles the OAuth flow; you just add a message handler and bus registration on top.

Stage 2 — Message handler that filters bus traffic

Section titled “Stage 2 — Message handler that filters bus traffic”
import json
MESSAGE_BUS_LOGGER = "_message_bus"
async def on_message(message):
inner = getattr(message, "root", message)
if getattr(inner, "method", None) != "notifications/message":
return
params = getattr(inner, "params", None)
if params is None:
return
logger = getattr(params, "logger", None) or (
params.get("logger") if isinstance(params, dict) else None
)
if logger != MESSAGE_BUS_LOGGER:
return
data = getattr(params, "data", None) or (
params.get("data") if isinstance(params, dict) else None
)
if isinstance(data, str):
data = json.loads(data)
# data shape:
# {user_id, from_uuid, target_uuid, routing, payload,
# job_id, message_id, priority, timestamp}
handle_bus_record(data)

The same shape arrives at every bus subscriber. The filter on logger == "_message_bus" is what keeps other MCP log traffic from being misinterpreted.

import asyncio
class JobWaiter:
def __init__(self):
self._futures: dict[str, asyncio.Future] = {}
def register(self, job_id: str) -> asyncio.Future:
loop = asyncio.get_event_loop()
fut = loop.create_future()
self._futures[job_id] = fut
return fut
def resolve(self, data: dict):
# called from on_message when a job_update arrives
payload = data.get("payload", {})
if payload.get("kind") != "job_update":
return
fut = self._futures.pop(payload["job_id"], None)
if fut and not fut.done():
fut.set_result(payload)

Wire waiter.resolve(data) into your on_message.

async def main():
waiter = JobWaiter()
async def handler(message):
# reuse on_message from stage 2, plus:
# waiter.resolve(decoded_data)
...
async with Client(SERVER, message_handler=handler) as client:
# Subscribe to every priority level (default is "warning").
await client.set_logging_level("debug")
# Join the bus.
my_uuid = "llm-myclient-001"
await client.call_tool("blender_register_client", {
"client_uuid": my_uuid,
"client_type": "llm",
"is_persistent": False,
"capabilities": ["chat"],
})
# Dispatch a script.
job_id = "job-abc123"
fut = waiter.register(job_id)
await client.call_tool("blender_send_message", {
"target_uuid": "blender-demo01",
"from_uuid": my_uuid,
"priority": "info",
"payload": {
"message_type": "job_dispatch",
"job_id": job_id,
"script": "import bpy\nprint(len(bpy.data.objects))",
},
})
# Wait for the addon's job_update reply.
result = await asyncio.wait_for(fut, timeout=30)
print(result) # {kind:'job_update', status:'completed', result:'3\n', ...}
asyncio.run(main())
await client.call_tool("blender_unregister_client", {"client_uuid": my_uuid})

Ephemeral clients (is_persistent=False) age out of the bus when the session drops, but an explicit unregister keeps the client list clean and saves the next list_available_clients consumer from seeing stale entries.

Nothing stops you from doing both. A Style A client that occasionally needs to observe broadcasts can wire a Style B message_handler onto its Client and keep using dispatch tools for the request/response paths. The server doesn’t care which style produced a given tool call.