最近、Model Context Protocol(MCP)のドキュメントを読んでいたら、2025-3-26のSpecificationにで認証まわりの仕様が追加されているのを見つけました。
そこで丁度認証・認可系の技術を触っていたところだったので、軽くMCPサーバを作って、実際にどう動くのか試してみました。のメモになります。
まずは簡単なMCPサーバを立てていこうと思います。
環境構築手順はMCPドキュメントを参考に構築しました。
MCPサーバ自体は、前回個人的に運勢占いをを行う簡易的なtoolsのMCPサーバを実装したのでそちらを流用します。
from mcp.server.fastmcp import FastMCP import random mcp = FastMCP("fortune") @mcp.tool() def fortune(): fortunes = [ "大吉", "中吉", "小吉", "吉", "末吉", "凶", "大凶", ] fortune = random.choice(fortunes) return f"今日の運勢は: {fortune}" if __name__ == "__main__": mcp.run(transport="sse")
transportはauthorizationのドキュメントに記載があるようにstdio
は用いずにsse
を指定します。
Implementations using an STDIO transport SHOULD NOT follow this specification, and instead retrieve credentials from the environment.
認証として利用するkeycloakの準備を行います。
まずはdocker composeを用いてkeycloakを構築します。
version: "3.8" services: keycloak: image: quay.io/keycloak/keycloak:26.1.0 container_name: keycloak_mcp command: start-dev environment: - KC_BOOTSTRAP_ADMIN_USERNAME=admin - KC_BOOTSTRAP_ADMIN_PASSWORD=admin volumes: - keycloak_data:/opt/keycloak/data network_mode: "host" volumes: keycloak_data:
以下コマンドで立ち上げておきます。
$ docker compose up -d
mcpサーバで使用するクライアントの作成を行います。
localhost:8080
にアクセス後、ユーザーadmin
パスワードadmin
でログインを行います。
Clients >>> Create client
から新しくクライアントを作成します。
わかりやすくClient IDはmcp
で作成しました。
また、Valid redirect URIsにはMCPサーバへのリダイレクト先となるhttp://localhost:8000/auth/callback
を設定しておきます。
次に、MCPサーバに認証機能を実装していきます。
※ここでは、認証・認可の詳細なフローについての解説は省略します。詳しく知りたい方は別途調べていただければと思います。
MCPに認証機能を実装する方法として
の2つが紹介されていました。
今後MCPサーバ自体が認証認可を担うことはなくなっていくと思われますので、認可はサードパーティに委任するフローで実装をします。
(フローを見る限り、認証サーバは利用しますが、認可コードからMCPクライアントとMCPサーバ間のためのMCP Token
を発行したりと、認証以外のトークン管理等は独自で行う必要があるみたいです)
Note
Draftバージョンでは、上記のサードパーティ認可フローに変更が加えられています(MCPサーバー自身が認可機能も担うフローが削除されています)。今後もフローが変更される可能性があるため、都度最新情報を確認することを推奨します。
https://modelcontextprotocol.io/specification/draft/basic/authorization#2-6-authorization-flow-steps
MCPサーバに認証機能を実装する方法として、python SDK
では以下READMEに認証実装についての記述があります。
Note
1.8.1までREADMEにtypoがあったのですが、修正されてました 🎉
https://github.com/modelcontextprotocol/python-sdk/pull/676
また、examples
にもGithubの認証サーバを用いた実装例があります。今回はこちらを参考に実装していきます。
上記を参考に実装したものは以下になります。
import random import secrets import time from keycloak import KeycloakOpenID from mcp.server.auth.provider import ( AccessToken, AuthorizationCode, AuthorizationParams, RefreshToken, construct_redirect_uri, ) from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions from mcp.server.fastmcp.server import FastMCP from mcp.shared.auth import OAuthClientInformationFull, OAuthToken from pydantic import AnyHttpUrl from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse, RedirectResponse, Response KEYCLOAK_SERVER_URL = "http://localhost:8080" KEYCLOAK_REALM = "master" KEYCLOAK_CLIENT_ID = "mcp" KEYCLOAK_CLIENT_SECRET = "2moDkh5opHeDN4QZroovSrnzZhfmrWSR" class MyOAuthServerProvider: def __init__(self) -> None: self.keycloak_openid = KeycloakOpenID( server_url=KEYCLOAK_SERVER_URL, client_id=KEYCLOAK_CLIENT_ID, realm_name=KEYCLOAK_REALM, client_secret_key=KEYCLOAK_CLIENT_SECRET, ) self.tokens: dict[str, AccessToken] = {} self.clients: dict[str, OAuthClientInformationFull] = {} self.auth_codes: dict[str, AuthorizationCode] = {} self.state_mapping: dict[str, dict[str, str]] = {} self.token_mapping: dict[str, str] = {} async def get_client(self, client_id: str): # raise NotImplementedError return self.clients.get(client_id) async def register_client(self, client_info: OAuthClientInformationFull): # raise NotImplementedError self.clients[client_info.client_id] = client_info async def authorize( self, client: OAuthClientInformationFull, params: AuthorizationParams ): state = params.state or secrets.token_hex(16) self.state_mapping[state] = { "redirect_uri": str(params.redirect_uri), "code_challenge": params.code_challenge, "redirect_uri_provided_explicitly": str( params.redirect_uri_provided_explicitly ), "client_id": client.client_id, } auth_url = self.keycloak_openid.auth_url( redirect_uri="http://localhost:8000/auth/callback", state=state, ) return auth_url async def handle_auth_callback(self, code: str, state: str) -> str: """Handle GitHub OAuth callback.""" state_data = self.state_mapping.get(state) if not state_data: raise HTTPException(400, "Invalid state parameter") redirect_uri = state_data["redirect_uri"] code_challenge = state_data["code_challenge"] redirect_uri_provided_explicitly = ( state_data["redirect_uri_provided_explicitly"] == "True" ) client_id = state_data["client_id"] keycloak_token = await self.keycloak_openid.a_token( grant_type="authorization_code", code=code, redirect_uri="http://localhost:8000/auth/callback", ) keycloak_token = keycloak_token["access_token"] new_code = f"mcp_{secrets.token_hex(16)}" auth_code = AuthorizationCode( code=new_code, client_id=client_id, redirect_uri=AnyHttpUrl(redirect_uri), redirect_uri_provided_explicitly=redirect_uri_provided_explicitly, expires_at=time.time() + 300, scopes=["fortune"], code_challenge=code_challenge, ) self.auth_codes[new_code] = auth_code self.tokens[keycloak_token] = AccessToken( token=keycloak_token, client_id=client_id, scopes=["email", "profile"], expires_at=None, ) del self.state_mapping[state] return construct_redirect_uri(redirect_uri, code=new_code, state=state) async def load_authorization_code( self, client: OAuthClientInformationFull, authorization_code: str ) -> AuthorizationCode | None: return self.auth_codes.get(authorization_code) async def exchange_authorization_code( self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode ): if authorization_code.code not in self.auth_codes: raise ValueError("Invalid authorization code") mcp_token = f"mcp_{secrets.token_hex(32)}" self.tokens[mcp_token] = AccessToken( token=mcp_token, client_id=client.client_id, scopes=authorization_code.scopes, expires_at=int(time.time()) + 3600, ) del self.auth_codes[authorization_code.code] return OAuthToken( access_token=mcp_token, token_type="bearer", expires_in=3600, scope=" ".join(authorization_code.scopes), ) async def load_refresh_token(self, client, refresh_token): """Load a refresh token - not supported.""" return None async def exchange_refresh_token( self, client: OAuthClientInformationFull, refresh_token: RefreshToken, scopes: list[str], ) -> OAuthToken: """Exchange refresh token""" raise NotImplementedError("Not supported") async def load_access_token(self, token: str) -> AccessToken | None: access_token = self.tokens.get(token) if not access_token: return None if access_token.expires_at and access_token.expires_at < time.time(): del self.tokens[token] return None return access_token async def revoke_token(self, token: AccessToken | RefreshToken) -> None: # Tokenを無効化する処理 if token.token in self.tokens: del self.tokens[token.token] oauth_provider = MyOAuthServerProvider() mcp = FastMCP( "auth app", auth_server_provider=oauth_provider, auth=AuthSettings( issuer_url=AnyHttpUrl("http://localhost:8000"), client_registration_options=ClientRegistrationOptions( enabled=True, client_secret_expiry_seconds=3600, valid_scopes=["fortune"], default_scopes=["fortune"], ), required_scopes=["fortune"], ), host="localhost", port=8000, debug=True, ) @mcp.custom_route("/auth/callback", methods=["GET"]) async def auth_callback_handler(request: Request) -> Response: code = request.query_params.get("code") state = request.query_params.get("state") if not code or not state: raise HTTPException(400, "Missing code or state parameter") try: redirect_uri = await oauth_provider.handle_auth_callback(code, state) return RedirectResponse(status_code=302, url=redirect_uri) except HTTPException: raise except Exception: return JSONResponse( status_code=500, content={ "error": "server_error", "error_description": "Unexpected error", }, ) @mcp.tool() def fortune(): fortunes = [ "大吉", "中吉", "小吉", "吉", "末吉", "凶", "大凶", ] fortune = random.choice(fortunes) return f"今日の運勢は: {fortune}" if __name__ == "__main__": mcp.run(transport="sse")
中身はほぼ参考例と同様のコードとなっています。
またMyOAuthServerProvider
に実装が必要な各メソッドは、各メソッドの説明もdocstringに載っている為以下OAuthAuthorizationServerProvider
クラスを参考にすることをお勧めします。
inspectorを用いて実際に認証フローが行われるか確認します。
以下コマンドにてinspectorを起動します。
$ npx @modelcontextprotocol/[email protected] mcp run main.py npm WARN cli npm v10.5.1 does not support Node.js v18.16.0. This version of npm supports the following node versions: `^18.17.0 || >=20.5.0`. You can find the latest version at https://nodejs.org/. Starting MCP inspector... 🔍 MCP Inspector is up and running at http://127.0.0.1:6274 🚀 ⚙️ Proxy server listening on port 6277
Caution
現在の最新バージョン(0.12.0)のinspectorには、認証フローが正常に完了しないバグがありますのでご注意ください。
https://github.com/modelcontextprotocol/inspector/issues/390
次にMCPサーバを以下コマンドで立ち上げます。
$ uv run main.py INFO: Started server process [29468] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
localhost:6274
にアクセスして、Transport TypeをSSE
にします。
Connect
をクリックすると、認証フローが開始され、Keycloak
に認可リダイレクトします。
認証に成功すると、MCP Inspectorにリダイレクトされ、認証が必要なToolsの利用が可能になっています。
また、実際にToolsを用いて運勢を取得できることを確認しました。
正常にサードパーティ認可を必要とするMCPサーバが実装出来ました。
以下GitHubに上げてあります。