MCPサーバにKeycloakを用いて認証を付ける

published
2025-05-25

はじめに

最近、Model Context Protocol(MCP)のドキュメントを読んでいたら、2025-3-26のSpecificationにで認証まわりの仕様が追加されているのを見つけました。
そこで丁度認証・認可系の技術を触っていたところだったので、軽くMCPサーバを作って、実際にどう動くのか試してみました。のメモになります。

環境

  • docker 27.4.0
  • python 3.11.12
  • uv 0.6.17
  • mcp 1.8.1
  • modelcontextprotocol/inspector 0.11.0
  • keycloak 26.1.0

MCPサーバ

まずは簡単なMCPサーバを立てていこうと思います。
環境構築手順はMCPドキュメントを参考に構築しました。

MCPサーバ自体は、前回個人的に運勢占いをを行う簡易的なtoolsのMCPサーバを実装したのでそちらを流用します。

from mcp.server.fastmcp import FastMCP
import random

mcp = FastMCP("fortune")

@mcp.tool()
def fortune():
    fortunes = [
        "大吉",
        "中吉",
        "小吉",
        "吉",
        "末吉",
        "凶",
        "大凶",
    ]
    fortune = random.choice(fortunes)

    return f"今日の運勢は: {fortune}"

if __name__ == "__main__":
    mcp.run(transport="sse")

transportはauthorizationのドキュメントに記載があるようにstdioは用いずにsseを指定します。

Implementations using an STDIO transport SHOULD NOT follow this specification, and instead retrieve credentials from the environment.

keycloakの設定

認証として利用するkeycloakの準備を行います。

1. keycloak構築

まずはdocker composeを用いてkeycloakを構築します。

version: "3.8"
services:
  keycloak:
    image: quay.io/keycloak/keycloak:26.1.0
    container_name: keycloak_mcp
    command: start-dev
    environment:
      - KC_BOOTSTRAP_ADMIN_USERNAME=admin
      - KC_BOOTSTRAP_ADMIN_PASSWORD=admin
    volumes:
      - keycloak_data:/opt/keycloak/data
    network_mode: "host"
volumes:
  keycloak_data:

以下コマンドで立ち上げておきます。

$ docker compose up -d

2. リソース作成

mcpサーバで使用するクライアントの作成を行います。

localhost:8080にアクセス後、ユーザーadminパスワードadminでログインを行います。
Clients >>> Create clientから新しくクライアントを作成します。

20250521-keycloak-clients

20250521-keycloak-clients-2

20250521-keycloak-clients-3

わかりやすくClient IDはmcpで作成しました。
また、Valid redirect URIsにはMCPサーバへのリダイレクト先となるhttp://localhost:8000/auth/callbackを設定しておきます。

認証機能を付ける

次に、MCPサーバに認証機能を実装していきます。
※ここでは、認証・認可の詳細なフローについての解説は省略します。詳しく知りたい方は別途調べていただければと思います。

認証の実装方法

MCPに認証機能を実装する方法として

の2つが紹介されていました。

今後MCPサーバ自体が認証認可を担うことはなくなっていくと思われますので、認可はサードパーティに委任するフローで実装をします。
(フローを見る限り、認証サーバは利用しますが、認可コードからMCPクライアントとMCPサーバ間のためのMCP Tokenを発行したりと、認証以外のトークン管理等は独自で行う必要があるみたいです)

Note

Draftバージョンでは、上記のサードパーティ認可フローに変更が加えられています(MCPサーバー自身が認可機能も担うフローが削除されています)。今後もフローが変更される可能性があるため、都度最新情報を確認することを推奨します。
https://modelcontextprotocol.io/specification/draft/basic/authorization#2-6-authorization-flow-steps

参考例

MCPサーバに認証機能を実装する方法として、python SDKでは以下READMEに認証実装についての記述があります。

Note

1.8.1までREADMEにtypoがあったのですが、修正されてました 🎉
https://github.com/modelcontextprotocol/python-sdk/pull/676

また、examplesにもGithubの認証サーバを用いた実装例があります。今回はこちらを参考に実装していきます。

実装

上記を参考に実装したものは以下になります。

import random
import secrets
import time

from keycloak import KeycloakOpenID
from mcp.server.auth.provider import (
    AccessToken,
    AuthorizationCode,
    AuthorizationParams,
    RefreshToken,
    construct_redirect_uri,
)
from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions
from mcp.server.fastmcp.server import FastMCP
from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
from pydantic import AnyHttpUrl
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse, Response

KEYCLOAK_SERVER_URL = "http://localhost:8080"
KEYCLOAK_REALM = "master"
KEYCLOAK_CLIENT_ID = "mcp"
KEYCLOAK_CLIENT_SECRET = "2moDkh5opHeDN4QZroovSrnzZhfmrWSR"

class MyOAuthServerProvider:
    def __init__(self) -> None:
        self.keycloak_openid = KeycloakOpenID(
            server_url=KEYCLOAK_SERVER_URL,
            client_id=KEYCLOAK_CLIENT_ID,
            realm_name=KEYCLOAK_REALM,
            client_secret_key=KEYCLOAK_CLIENT_SECRET,
        )
        self.tokens: dict[str, AccessToken] = {}
        self.clients: dict[str, OAuthClientInformationFull] = {}
        self.auth_codes: dict[str, AuthorizationCode] = {}
        self.state_mapping: dict[str, dict[str, str]] = {}
        self.token_mapping: dict[str, str] = {}

    async def get_client(self, client_id: str):
        # raise NotImplementedError
        return self.clients.get(client_id)

    async def register_client(self, client_info: OAuthClientInformationFull):
        # raise NotImplementedError
        self.clients[client_info.client_id] = client_info

    async def authorize(
        self, client: OAuthClientInformationFull, params: AuthorizationParams
    ):
        state = params.state or secrets.token_hex(16)

        self.state_mapping[state] = {
            "redirect_uri": str(params.redirect_uri),
            "code_challenge": params.code_challenge,
            "redirect_uri_provided_explicitly": str(
                params.redirect_uri_provided_explicitly
            ),
            "client_id": client.client_id,
        }

        auth_url = self.keycloak_openid.auth_url(
            redirect_uri="http://localhost:8000/auth/callback",
            state=state,
        )

        return auth_url

    async def handle_auth_callback(self, code: str, state: str) -> str:
        """Handle GitHub OAuth callback."""
        state_data = self.state_mapping.get(state)
        if not state_data:
            raise HTTPException(400, "Invalid state parameter")

        redirect_uri = state_data["redirect_uri"]
        code_challenge = state_data["code_challenge"]
        redirect_uri_provided_explicitly = (
            state_data["redirect_uri_provided_explicitly"] == "True"
        )
        client_id = state_data["client_id"]
        keycloak_token = await self.keycloak_openid.a_token(
            grant_type="authorization_code",
            code=code,
            redirect_uri="http://localhost:8000/auth/callback",
        )

        keycloak_token = keycloak_token["access_token"]

        new_code = f"mcp_{secrets.token_hex(16)}"
        auth_code = AuthorizationCode(
            code=new_code,
            client_id=client_id,
            redirect_uri=AnyHttpUrl(redirect_uri),
            redirect_uri_provided_explicitly=redirect_uri_provided_explicitly,
            expires_at=time.time() + 300,
            scopes=["fortune"],
            code_challenge=code_challenge,
        )
        self.auth_codes[new_code] = auth_code

        self.tokens[keycloak_token] = AccessToken(
            token=keycloak_token,
            client_id=client_id,
            scopes=["email", "profile"],
            expires_at=None,
        )
        del self.state_mapping[state]
        return construct_redirect_uri(redirect_uri, code=new_code, state=state)

    async def load_authorization_code(
        self, client: OAuthClientInformationFull, authorization_code: str
    ) -> AuthorizationCode | None:
        return self.auth_codes.get(authorization_code)

    async def exchange_authorization_code(
        self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode
    ):
        if authorization_code.code not in self.auth_codes:
            raise ValueError("Invalid authorization code")

        mcp_token = f"mcp_{secrets.token_hex(32)}"

        self.tokens[mcp_token] = AccessToken(
            token=mcp_token,
            client_id=client.client_id,
            scopes=authorization_code.scopes,
            expires_at=int(time.time()) + 3600,
        )

        del self.auth_codes[authorization_code.code]

        return OAuthToken(
            access_token=mcp_token,
            token_type="bearer",
            expires_in=3600,
            scope=" ".join(authorization_code.scopes),
        )

    async def load_refresh_token(self, client, refresh_token):
        """Load a refresh token - not supported."""
        return None

    async def exchange_refresh_token(
        self,
        client: OAuthClientInformationFull,
        refresh_token: RefreshToken,
        scopes: list[str],
    ) -> OAuthToken:
        """Exchange refresh token"""
        raise NotImplementedError("Not supported")

    async def load_access_token(self, token: str) -> AccessToken | None:
        access_token = self.tokens.get(token)
        if not access_token:
            return None

        if access_token.expires_at and access_token.expires_at < time.time():
            del self.tokens[token]
            return None

        return access_token

    async def revoke_token(self, token: AccessToken | RefreshToken) -> None:
        # Tokenを無効化する処理
        if token.token in self.tokens:
            del self.tokens[token.token]

oauth_provider = MyOAuthServerProvider()
mcp = FastMCP(
    "auth app",
    auth_server_provider=oauth_provider,
    auth=AuthSettings(
        issuer_url=AnyHttpUrl("http://localhost:8000"),
        client_registration_options=ClientRegistrationOptions(
            enabled=True,
            client_secret_expiry_seconds=3600,
            valid_scopes=["fortune"],
            default_scopes=["fortune"],
        ),
        required_scopes=["fortune"],
    ),
    host="localhost",
    port=8000,
    debug=True,
)

@mcp.custom_route("/auth/callback", methods=["GET"])
async def auth_callback_handler(request: Request) -> Response:
    code = request.query_params.get("code")
    state = request.query_params.get("state")

    if not code or not state:
        raise HTTPException(400, "Missing code or state parameter")

    try:
        redirect_uri = await oauth_provider.handle_auth_callback(code, state)
        return RedirectResponse(status_code=302, url=redirect_uri)
    except HTTPException:
        raise
    except Exception:
        return JSONResponse(
            status_code=500,
            content={
                "error": "server_error",
                "error_description": "Unexpected error",
            },
        )

@mcp.tool()
def fortune():
    fortunes = [
        "大吉",
        "中吉",
        "小吉",
        "吉",
        "末吉",
        "凶",
        "大凶",
    ]
    fortune = random.choice(fortunes)

    return f"今日の運勢は: {fortune}"

if __name__ == "__main__":
    mcp.run(transport="sse")

中身はほぼ参考例と同様のコードとなっています。

またMyOAuthServerProviderに実装が必要な各メソッドは、各メソッドの説明もdocstringに載っている為以下OAuthAuthorizationServerProviderクラスを参考にすることをお勧めします。

動作確認

inspectorを用いて実際に認証フローが行われるか確認します。
以下コマンドにてinspectorを起動します。

$ npx @modelcontextprotocol/[email protected] mcp run main.py
npm WARN cli npm v10.5.1 does not support Node.js v18.16.0. This version of npm supports the following node versions: 
`^18.17.0 || >=20.5.0`. You can find the latest version at https://nodejs.org/.
Starting MCP inspector...
🔍 MCP Inspector is up and running at http://127.0.0.1:6274 🚀
⚙️ Proxy server listening on port 6277

Caution

現在の最新バージョン(0.12.0)のinspectorには、認証フローが正常に完了しないバグがありますのでご注意ください。
https://github.com/modelcontextprotocol/inspector/issues/390

次にMCPサーバを以下コマンドで立ち上げます。

$ uv run main.py
INFO:     Started server process [29468]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)

localhost:6274にアクセスして、Transport TypeをSSEにします。

20250521-inspector-1

Connectをクリックすると、認証フローが開始され、Keycloakに認可リダイレクトします。

20250521-inspector-2

認証に成功すると、MCP Inspectorにリダイレクトされ、認証が必要なToolsの利用が可能になっています。
また、実際にToolsを用いて運勢を取得できることを確認しました。

20250521-inspector-3

正常にサードパーティ認可を必要とするMCPサーバが実装出来ました。

コード

以下GitHubに上げてあります。