# Build a custom MCP server with OAuth 2.1

### **What you'll build:**

> A minimal, runnable Python MCP server that Blockbrain can connect to via OAuth 2.1 — so internal tools or data you expose can be safely consumed by a Blockbrain agent.
>
> **Audience:** a developer at your side. Copy the four code blocks below into a folder, run five commands, point Blockbrain at the resulting URL, and you have a working authenticated integration.
>
> **Time required:** \~20 minutes for a working local server; \~45 minutes including hardening.
>
> See also: [MCP Server — admin UI walkthrough](https://docs.en.theblockbrain.ai/for-admins/mcp-server).

***

### Prerequisites

* Python **3.11+**
* A public HTTPS URL for your local server while testing — easiest options: an HTTPS tunnelling tool such as `cloudflared tunnel` or `ngrok`
* **Tenant Administrator** access to your Blockbrain tenant
* Familiarity with OAuth 2.1 Authorization Code flow with PKCE (helpful but not required)

***

### When to use which authentication mode

Blockbrain's MCP-server registration UI offers four authentication methods. Choose based on how your MCP server should know who is calling:

| Method                       | Use when…                                                                                              | Trade-off                                                            |
| ---------------------------- | ------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------- |
| **None**                     | Internal/dev only and the server URL is not public                                                     | Anyone who reaches the URL can call the tools                        |
| **API Key** (Fixed Token)    | Service-to-service. One shared secret. No per-user identity needed.                                    | Cannot distinguish individual Blockbrain users on your side          |
| **OAuth 2.1** ← *this guide* | Production scenarios. Blockbrain's tenant admin authorizes the connection once via consent flow.       | More moving parts, but the standard MCP-spec way                     |
| **User Token** (delegated)   | You want per-user authorization on your MCP server (e.g. enforce that User A only sees their own data) | Your server must validate Blockbrain's forwarded JWT — see section 8 |

***

### The OAuth 2.1 discovery flow Blockbrain uses

When you select **OAuth 2.1** in the admin UI and click *Discover OAuth*, Blockbrain follows [RFC 9728](https://datatracker.ietf.org/doc/html/rfc9728) + [RFC 8414](https://datatracker.ietf.org/doc/html/rfc8414) to find your authorization server, then runs Authorization Code + PKCE:

```mermaid
sequenceDiagram
    participant BB as Blockbrain
    participant MCP as Your MCP Server

    BB->>MCP: 1. GET /.well-known/oauth-protected-resource
    MCP-->>BB: { "authorization_servers": [...] }

    BB->>MCP: 2. GET /.well-known/oauth-authorization-server
    MCP-->>BB: { "authorization_endpoint": ..., "token_endpoint": ... }

    BB->>MCP: 3. Redirect admin → /authorize<br/>(response_type=code, code_challenge, ...)

    BB->>MCP: 4. POST /token  (code + code_verifier)
    MCP-->>BB: { "access_token": "...", "token_type": "Bearer" }

    BB->>MCP: 5. Subsequent /mcp calls with<br/>Authorization: Bearer <access_token>
```

***

### The Python example

Three files in one folder. Copy each block verbatim.

#### `server.py`

```python
"""
MCP Server Starter — OAuth Integration with Blockbrain
A minimal example demonstrating the full OAuth 2.1 + PKCE flow.
Run: python server.py
"""
import os
import secrets
import hashlib
import base64
from datetime import datetime, timedelta, timezone
from typing import Optional
from urllib.parse import urlencode

import httpx
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Form, Request, Response
from fastapi.responses import JSONResponse, RedirectResponse
from jose import jwt
from mcp.server.fastmcp import FastMCP

load_dotenv()

# ---- Configuration ---------------------------------------------------------
SERVER_HOST          = os.getenv("SERVER_HOST", "http://localhost:8080")
JWKS_URL             = os.getenv("JWKS_URL", "https://auth.theblockbrain.ai/oauth/v2/keys")
AUDIENCE             = os.getenv("AUDIENCE", "your-api-audience")
OAUTH_CLIENT_ID      = os.getenv("OAUTH_CLIENT_ID", "demo-client-id")
OAUTH_CLIENT_SECRET  = os.getenv("OAUTH_CLIENT_SECRET", "demo-client-secret")
VALIDATE_USER_TOKEN  = os.getenv("VALIDATE_USER_TOKEN", "0") == "1"

# Scopes this server advertises in both .well-known documents and validates
# every /authorize request against.
SUPPORTED_SCOPES = {"mcp:read", "mcp:write"}

# ---- In-memory stores (DEMO ONLY — use Redis/DB in production) -------------
auth_codes: dict = {}
access_tokens: dict = {}

# ---- MCP server: tools + resources -----------------------------------------
mcp_server = FastMCP("blockbrain-oauth-example")

@mcp_server.tool()
def echo(message: str) -> str:
    """Echo back the provided message — verifies end-to-end connectivity."""
    return f"echo: {message}"

@mcp_server.resource("static://welcome")
def welcome() -> str:
    """A static welcome resource."""
    return "Hello from your custom MCP server!"

# ---- FastAPI app: OAuth endpoints + mounted MCP transport ------------------
app = FastAPI(title="MCP OAuth Example for Blockbrain")

# RFC 9728 — Protected Resource Metadata
@app.get("/.well-known/oauth-protected-resource")
async def oauth_protected_resource():
    return {
        "resource": SERVER_HOST,
        "authorization_servers": [SERVER_HOST],
        "scopes_supported": ["mcp:read", "mcp:write"],
        "bearer_methods_supported": ["header"],
    }

# RFC 8414 — Authorization Server Metadata
@app.get("/.well-known/oauth-authorization-server")
async def oauth_authorization_server():
    return {
        "issuer": SERVER_HOST,
        "authorization_endpoint": f"{SERVER_HOST}/authorize",
        "token_endpoint": f"{SERVER_HOST}/token",
        "response_types_supported": ["code"],
        "grant_types_supported": ["authorization_code"],
        "code_challenge_methods_supported": ["S256"],
        "scopes_supported": ["mcp:read", "mcp:write"],
        "token_endpoint_auth_methods_supported": ["client_secret_post"],
    }

# Authorization endpoint — issues an auth code bound to the PKCE challenge
@app.get("/authorize")
async def authorize(
    response_type: str,
    client_id: str,
    redirect_uri: str,
    code_challenge: str,
    code_challenge_method: str = "S256",
    scope: str = "mcp:read",
    state: Optional[str] = None,
):
    if response_type != "code":
        raise HTTPException(400, "unsupported response_type")
    if client_id != OAUTH_CLIENT_ID:
        raise HTTPException(400, "unknown client_id")
    if code_challenge_method != "S256":
        raise HTTPException(400, "code_challenge_method must be S256")

    # RFC 6749 §5.2 — reject any scope the server doesn't advertise.
    requested_scopes = set(scope.split())
    unknown = requested_scopes - SUPPORTED_SCOPES
    if unknown:
        raise HTTPException(400, f"invalid_scope: {sorted(unknown)}")

    code = secrets.token_urlsafe(32)
    auth_codes[code] = {
        "client_id":      client_id,
        "redirect_uri":   redirect_uri,
        "code_challenge": code_challenge,
        "scope":          " ".join(sorted(requested_scopes)),
        "expires_at":     datetime.now(timezone.utc) + timedelta(minutes=10),
    }
    params = {"code": code}
    if state:
        params["state"] = state
    return RedirectResponse(f"{redirect_uri}?{urlencode(params)}")

# Token endpoint — exchanges code + verifier for an access token
@app.post("/token")
async def token(
    grant_type:    str = Form(...),
    code:          str = Form(...),
    redirect_uri:  str = Form(...),
    client_id:     str = Form(...),
    client_secret: str = Form(...),
    code_verifier: str = Form(...),
):
    if grant_type != "authorization_code":
        raise HTTPException(400, "unsupported grant_type")
    if client_id != OAUTH_CLIENT_ID or client_secret != OAUTH_CLIENT_SECRET:
        raise HTTPException(401, "invalid client credentials")

    record = auth_codes.pop(code, None)
    if not record or record["expires_at"] < datetime.now(timezone.utc):
        raise HTTPException(400, "invalid or expired code")
    if record["redirect_uri"] != redirect_uri:
        raise HTTPException(400, "redirect_uri mismatch")

    # PKCE verification: base64url(SHA256(code_verifier)) == code_challenge
    expected = base64.urlsafe_b64encode(
        hashlib.sha256(code_verifier.encode()).digest()
    ).rstrip(b"=").decode()
    if expected != record["code_challenge"]:
        raise HTTPException(400, "invalid code_verifier")

    access_token = secrets.token_urlsafe(32)
    access_tokens[access_token] = {
        "client_id":  client_id,
        "scope":      record["scope"],
        "expires_at": datetime.now(timezone.utc) + timedelta(hours=1),
    }
    return {
        "access_token": access_token,
        "token_type":   "Bearer",
        "expires_in":   3600,
        "scope":        record["scope"],
    }

# ---- Optional: validate Blockbrain's forwarded user JWT (delegated mode) ---
async def validate_user_jwt(authorization_header: str) -> dict:
    token_str = authorization_header.replace("Bearer ", "", 1)
    if not token_str:
        raise HTTPException(401, "missing bearer token")
    async with httpx.AsyncClient() as client:
        jwks = (await client.get(JWKS_URL)).json()["keys"]
    header = jwt.get_unverified_header(token_str)
    key = next((k for k in jwks if k["kid"] == header["kid"]), None)
    if not key:
        raise HTTPException(401, "signing key not found in JWKs")
    return jwt.decode(token_str, key, algorithms=["RS256"], audience=AUDIENCE)

# ---- Bearer-token middleware for /mcp --------------------------------------
# RFC 6750: every MCP transport call must carry a valid access token.
# This runs BEFORE the request reaches the mounted MCP app.
@app.middleware("http")
async def require_bearer_for_mcp(request: Request, call_next):
    if request.url.path.startswith("/mcp"):
        auth = request.headers.get("authorization", "")
        if not auth.lower().startswith("bearer "):
            return JSONResponse(
                {"error": "invalid_token", "error_description": "missing bearer token"},
                status_code=401,
                headers={"WWW-Authenticate": 'Bearer realm="mcp"'},
            )
        token_str = auth.split(" ", 1)[1].strip()
        record = access_tokens.get(token_str)
        if not record or record["expires_at"] < datetime.now(timezone.utc):
            return JSONResponse(
                {"error": "invalid_token", "error_description": "expired or unknown token"},
                status_code=401,
                headers={"WWW-Authenticate": 'Bearer error="invalid_token"'},
            )
        # Optionally enforce per-call scope here, e.g. require "mcp:write"
        # for state-changing tools by inspecting record["scope"].
    return await call_next(request)

# ---- Mount MCP server (streamable HTTP transport) at /mcp ------------------
app.mount("/mcp", mcp_server.streamable_http_app())

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)
```

#### `requirements.txt`

```
mcp>=1.2.0
fastapi>=0.110.0
uvicorn[standard]>=0.27.0
python-jose[cryptography]>=3.3.0
httpx>=0.26.0
python-dotenv>=1.0.0
```

#### `.env.example`

> **Replace the demo client credentials with strong random values before deploying anywhere reachable from the public internet.**

```
# Public URL where your server is reachable from Blockbrain.
# When tunnelling locally with cloudflared/ngrok, set this to the tunnel URL.
SERVER_HOST=http://localhost:8080

# Blockbrain's JWKs endpoint — only used when VALIDATE_USER_TOKEN=1
JWKS_URL=https://auth.theblockbrain.ai/oauth/v2/keys
AUDIENCE=your-api-audience

# Static OAuth client credentials. Generate strong random values for production.
OAUTH_CLIENT_ID=demo-client-id
OAUTH_CLIENT_SECRET=demo-client-secret

# Set to 1 to additionally validate Blockbrain's forwarded user JWT
# on every /mcp request (delegated-access mode). Leave 0 for OAuth-only.
VALIDATE_USER_TOKEN=0
```

***

### Run it locally

```
python -m venv .venv
source .venv/bin/activate          # Windows: .venv\Scripts\activate
pip install -r requirements.txt
cp .env.example .env               # then edit .env
python server.py                   # listens on :8080
```

***

### Verify the endpoints with curl

```
# Discovery — RFC 9728
curl -s http://localhost:8080/.well-known/oauth-protected-resource | jq
# → { "resource": "...", "authorization_servers": ["..."], ... }

# Discovery — RFC 8414
curl -s http://localhost:8080/.well-known/oauth-authorization-server | jq
# → { "authorization_endpoint": "...", "token_endpoint": "...",
#     "code_challenge_methods_supported": ["S256"], ... }

# MCP transport handshake
npx @modelcontextprotocol/inspector http://localhost:8080/mcp
# → lists tool "echo" and resource "static://welcome"
```

***

### Register your MCP server in Blockbrain

1. Expose your local server publicly: `cloudflared tunnel --url http://localhost:8080` (or `ngrok http 8080`). Note the public HTTPS URL.
2. Set `SERVER_HOST` in `.env` to that public URL and restart the server.
3. Open Blockbrain → **Admin** → **Agents** → **MCP Servers** → **+ Add MCP Server**.
4. Fill in:
   * **Server Name:** e.g. *my-mcp-demo*
   * **Server URL:** `https://<your-tunnel>/mcp`
   * **Transport:** `HTTP`
   * **Authentication:** `OAuth 2.1`
5. Click **Discover OAuth**. Blockbrain reads your two `.well-known` endpoints and pre-fills the OAuth config.
6. Click **Configure**, complete the consent flow, and confirm the access token comes back.
7. Save. Assign the integration to a test agent. In a chat, ask the agent to call `echo "hello"` — you should see `echo: hello` come back.

Full admin-UI walkthrough with screenshots: [MCP Server — for admins](https://docs.en.theblockbrain.ai/for-admins/mcp-server).

***

### Optional — validate Blockbrain's forwarded user JWT

If you also want per-user authorization (delegated-access mode), set `VALIDATE_USER_TOKEN=1`. The `validate_user_jwt` helper in `server.py` checks every incoming bearer token against Blockbrain's public JWKs at `https://auth.theblockbrain.ai/oauth/v2/keys`, verifies signature, expiration, and audience, and returns the claims (including `external_user_id`, `urn:zitadel:iam:org:id`, etc.).

***

### Headers Blockbrain sends with every request

Your MCP server can read these to identify the calling user, tenant, and thread context:

| Header               | Description                                              | Example        |
| -------------------- | -------------------------------------------------------- | -------------- |
| `X-User-ID`          | End-user identifier                                      | `user_12345`   |
| `X-External-User-ID` | Your system's user identifier (if activated)             | `ext_user_abc` |
| `X-Tenant-ID`        | Tenant identifier                                        | `tenant_xyz`   |
| `X-Agent-ID`         | Agent making the request                                 | `agent_007`    |
| `X-Data-Room-ID`     | Associated data room                                     | `room_456`     |
| `X-Thread-ID`        | Conversation thread                                      | `thread_789`   |
| `Authorization`      | Bearer token (OAuth access token, or forwarded user JWT) | `Bearer ...`   |

***

### Troubleshooting

| Symptom                                | Likely cause                                                            | Fix                                                                                       |
| -------------------------------------- | ----------------------------------------------------------------------- | ----------------------------------------------------------------------------------------- |
| "Discover OAuth" returns 404           | `.well-known` paths not exposed at the public URL                       | Confirm the tunnel forwards root paths; re-curl both well-known endpoints                 |
| Redirect-URI mismatch on consent       | IdP does not support Dynamic Client Registration (e.g. Microsoft Entra) | Use the static `OAUTH_CLIENT_ID` in this example rather than a dynamically registered one |
| Token returned but tools list is empty | Scopes not preserved during configure                                   | Make sure your authorization-server metadata includes `scopes_supported`                  |

***

### Production hardening checklist

Before shipping to a production environment, replace or add:

* The in-memory `auth_codes` / `access_tokens` dicts → a persistent store (e.g. Redis), so tokens survive restarts and can be shared across replicas.
* Refresh-token rotation. The example issues access tokens only.
* Structured logging and an audit trail for every `/authorize` and `/token` call.
* Move `OAUTH_CLIENT_SECRET` to a secrets manager — never check it into source control.
* Terminate HTTPS in front of the server (reverse proxy, load balancer, or your platform's ingress).
* Rate limiting on `/token` and `/authorize`.
* If multi-tenant, scope `OAUTH_CLIENT_ID` per tenant rather than reusing one shared value.

***

### Next steps

* **SSE transport** — Blockbrain also supports SSE (`https://your-server/sse`). To switch, replace the `app.mount("/mcp", ...)` line with the SSE app from `mcp.server.sse`.
* **Real tools** — replace the `echo` tool with calls into your domain (database queries, internal APIs, etc.).
* **JavaScript / TypeScript example** — a Node/Express equivalent of this guide using `@modelcontextprotocol/sdk` is being prepared as a follow-up.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.blockbrain.ai/for-builders/build-a-custom-mcp-server-with-oauth-2.1.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
