Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions src/mcp/client/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,46 +128,52 @@ def get_client_metadata_scopes(

def build_oauth_authorization_server_metadata_discovery_urls(auth_server_url: str | None, server_url: str) -> list[str]:
"""
Generate ordered list of (url, type) tuples for discovery attempts.
Generate an ordered list of discovery URLs for authorization server metadata.

Args:
auth_server_url: URL for the OAuth Authorization Metadata URL if found, otherwise None
server_url: URL for the MCP server, used as a fallback if auth_server_url is None
"""

if not auth_server_url:
# Legacy path using the 2025-03-26 spec:
# link: https://modelcontextprotocol.io/specification/2025-03-26/basic/authorization
parsed = urlparse(server_url)
return [f"{parsed.scheme}://{parsed.netloc}/.well-known/oauth-authorization-server"]

urls: list[str] = []
parsed = urlparse(auth_server_url)

# Prefer the advertised authorization server URL when available, otherwise fall back
# to the configured server URL from the client configuration.
source_url = auth_server_url or server_url
parsed = urlparse(source_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
path = (parsed.path or "").rstrip("/")

# RFC 8414: Path-aware OAuth discovery
if parsed.path and parsed.path != "/":
oauth_path = f"/.well-known/oauth-authorization-server{parsed.path.rstrip('/')}"
urls.append(urljoin(base_url, oauth_path))
# If a direct metadata URL was provided, use it first without modification
if auth_server_url and (
auth_server_url.endswith("/.well-known/openid-configuration")
or auth_server_url.endswith("/.well-known/oauth-authorization-server")
):
urls.append(auth_server_url)

if path and path != "":
# RFC 8414 section 5: Path-aware OIDC discovery
# See https://www.rfc-editor.org/rfc/rfc8414.html#section-5
oidc_path = f"/.well-known/openid-configuration{parsed.path.rstrip('/')}"
urls.append(urljoin(base_url, oidc_path))

urls.append(urljoin(base_url, f"/.well-known/oauth-authorization-server{path}"))
# RFC 8414: also allows using the same logic for OpenID discovery.
# https://openid.net/specs/openid-connect-discovery-1_0.html
oidc_path = f"{parsed.path.rstrip('/')}/.well-known/openid-configuration"
urls.append(urljoin(base_url, oidc_path))
return urls

# OAuth root
urls.append(urljoin(base_url, f"/.well-known/openid-configuration{path}"))
# Symmetric path-local OAuth discovery
urls.append(urljoin(base_url, f"{path}/.well-known/oauth-authorization-server"))
urls.append(urljoin(base_url, f"{path}/.well-known/openid-configuration"))
# Always include root-based fallbacks for servers that only expose metadata at the origin.
urls.append(urljoin(base_url, "/.well-known/oauth-authorization-server"))

# OIDC 1.0 fallback (appends to full URL per OIDC spec)
# https://openid.net/specs/openid-connect-discovery-1_0.html
urls.append(urljoin(base_url, "/.well-known/openid-configuration"))

return urls
# De-duplicate while preserving order
seen: set[str] = set()
deduped_urls: list[str] = []
for url in urls:
if url not in seen:
seen.add(url)
deduped_urls.append(url)

return deduped_urls


async def handle_protected_resource_response(
Expand Down
80 changes: 68 additions & 12 deletions tests/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,14 @@ async def test_oauth_discovery_legacy_fallback_when_no_prm(self):
# When auth_server_url is None (PRM failed), we use server_url and only try root
discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(None, "https://mcp.linear.app/sse")

# Should only try the root URL (legacy behavior)
# Current behavior: try path-aware variants first, then root fallbacks
assert discovery_urls == [
"https://mcp.linear.app/.well-known/oauth-authorization-server/sse",
"https://mcp.linear.app/.well-known/openid-configuration/sse",
"https://mcp.linear.app/sse/.well-known/oauth-authorization-server",
"https://mcp.linear.app/sse/.well-known/openid-configuration",
"https://mcp.linear.app/.well-known/oauth-authorization-server",
"https://mcp.linear.app/.well-known/openid-configuration",
]

@pytest.mark.anyio
Expand All @@ -337,11 +342,14 @@ async def test_oauth_discovery_path_aware_when_auth_server_has_path(self):
"https://auth.example.com/tenant1", "https://api.example.com/mcp"
)

# Should try path-based URLs only (no root URLs)
# Current behavior: path-aware variants (including symmetric OAuth path-local), then root fallbacks
assert discovery_urls == [
"https://auth.example.com/.well-known/oauth-authorization-server/tenant1",
"https://auth.example.com/.well-known/openid-configuration/tenant1",
"https://auth.example.com/tenant1/.well-known/oauth-authorization-server",
"https://auth.example.com/tenant1/.well-known/openid-configuration",
"https://auth.example.com/.well-known/oauth-authorization-server",
"https://auth.example.com/.well-known/openid-configuration",
]

@pytest.mark.anyio
Expand All @@ -351,7 +359,7 @@ async def test_oauth_discovery_root_when_auth_server_has_no_path(self):
"https://auth.example.com", "https://api.example.com/mcp"
)

# Should try root URLs only
# Should try root URLs only (no path-aware when no path)
assert discovery_urls == [
"https://auth.example.com/.well-known/oauth-authorization-server",
"https://auth.example.com/.well-known/openid-configuration",
Expand All @@ -364,7 +372,7 @@ async def test_oauth_discovery_root_when_auth_server_has_only_slash(self):
"https://auth.example.com/", "https://api.example.com/mcp"
)

# Should try root URLs only
# Should try root URLs only (trailing slash treated as root)
assert discovery_urls == [
"https://auth.example.com/.well-known/oauth-authorization-server",
"https://auth.example.com/.well-known/openid-configuration",
Expand All @@ -383,7 +391,10 @@ async def test_oauth_discovery_fallback_order(self, oauth_provider: OAuthClientP
assert discovery_urls == [
"https://api.example.com/.well-known/oauth-authorization-server/v1/mcp",
"https://api.example.com/.well-known/openid-configuration/v1/mcp",
"https://api.example.com/v1/mcp/.well-known/oauth-authorization-server",
"https://api.example.com/v1/mcp/.well-known/openid-configuration",
"https://api.example.com/.well-known/oauth-authorization-server",
"https://api.example.com/.well-known/openid-configuration",
]

@pytest.mark.anyio
Expand Down Expand Up @@ -459,9 +470,12 @@ async def test_oauth_discovery_fallback_conditions(self, oauth_provider: OAuthCl
request=oauth_metadata_request_2,
)

# Next request should be OIDC path-appended URL
# Next request should be OAuth path-local URL (symmetric), then OIDC path-local
oauth_metadata_request_3 = await auth_flow.asend(oauth_metadata_response_2)
assert str(oauth_metadata_request_3.url) == "https://auth.example.com/v1/mcp/.well-known/openid-configuration"
assert (
str(oauth_metadata_request_3.url)
== "https://auth.example.com/v1/mcp/.well-known/oauth-authorization-server"
)
assert oauth_metadata_request_3.method == "GET"

# Send a 500 response
Expand All @@ -471,12 +485,12 @@ async def test_oauth_discovery_fallback_conditions(self, oauth_provider: OAuthCl
request=oauth_metadata_request_3,
)

# Mock the authorization process to minimize unnecessary state in this test
# Mock the authorization process to minimize unnecessary state before authorization triggers
oauth_provider._perform_authorization_code_grant = mock.AsyncMock(
return_value=("test_auth_code", "test_code_verifier")
)

# All path-based URLs failed, flow continues with default endpoints
# All path-based URLs failed; flow continues with default endpoints
# Next request should be token exchange using MCP server base URL (fallback when OAuth metadata not found)
token_request = await auth_flow.asend(oauth_metadata_response_3)
assert str(token_request.url) == "https://api.example.com/token"
Expand Down Expand Up @@ -505,6 +519,46 @@ async def test_oauth_discovery_fallback_conditions(self, oauth_provider: OAuthCl
except StopAsyncIteration:
pass # Expected - generator should complete

@pytest.mark.anyio
async def test_oauth_discovery_path_aware_issuer_with_origin_only_metadata(self):
"""Servers may publish metadata only at the origin even when issuer has a path;
ensure we include origin fallbacks."""
# Path-aware issuer URL
discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(
"https://auth.example.com/tenant1", "https://api.example.com/v1/mcp"
)

# Must include origin-based fallbacks at the end, after path-aware variants
assert discovery_urls[-2:] == [
"https://auth.example.com/.well-known/oauth-authorization-server",
"https://auth.example.com/.well-known/openid-configuration",
]

@pytest.mark.anyio
async def test_oauth_discovery_direct_metadata_url_precedence(self):
"""If a direct metadata URL is provided, it should be tried first before derived well-known locations."""
# Simulate PRM providing a direct OIDC configuration URL
direct_metadata_url = "https://auth.example.com/.well-known/openid-configuration"
discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(
direct_metadata_url, "https://api.example.com/v1/mcp"
)

# First entry should be the provided URL exactly
assert discovery_urls[0] == direct_metadata_url

@pytest.mark.anyio
async def test_oauth_discovery_oidc_only_metadata(self):
"""Some servers expose only OIDC metadata; ensure OIDC paths are included in order and allow fallback."""
discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(
"https://auth.example.com/tenant1", "https://api.example.com/v1/mcp"
)

# Ensure OIDC path-aware and path-local are present early
assert "https://auth.example.com/.well-known/openid-configuration/tenant1" in discovery_urls[:3]
assert "https://auth.example.com/tenant1/.well-known/openid-configuration" in discovery_urls[:5]

# No flow needed here; presence in discovery list is sufficient

@pytest.mark.anyio
async def test_handle_metadata_response_success(self, oauth_provider: OAuthClientProvider):
"""Test successful metadata response handling."""
Expand Down Expand Up @@ -1311,9 +1365,9 @@ async def callback_handler() -> tuple[str, str | None]:
# PRM returns 404 again - all PRM URLs failed
prm_response_2 = httpx.Response(404, request=prm_request_2)

# Should fall back to root OAuth discovery (March 2025 spec behavior)
# Current behavior: fall back to path-aware OAuth discovery first using server path
oauth_metadata_request = await auth_flow.asend(prm_response_2)
assert str(oauth_metadata_request.url) == "https://mcp.linear.app/.well-known/oauth-authorization-server"
assert str(oauth_metadata_request.url) == "https://mcp.linear.app/.well-known/oauth-authorization-server/sse"
assert oauth_metadata_request.method == "GET"

# Send successful OAuth metadata response
Expand Down Expand Up @@ -1419,9 +1473,11 @@ async def callback_handler() -> tuple[str, str | None]:
# Also returns 404 - all PRM URLs failed
prm_response_3 = httpx.Response(404, request=prm_request_3)

# Should fall back to root OAuth discovery
# Current behavior: fall back to path-aware OAuth discovery based on server path
oauth_metadata_request = await auth_flow.asend(prm_response_3)
assert str(oauth_metadata_request.url) == "https://api.example.com/.well-known/oauth-authorization-server"
assert (
str(oauth_metadata_request.url) == "https://api.example.com/.well-known/oauth-authorization-server/v1/mcp"
)

# Complete the flow
oauth_metadata_response = httpx.Response(
Expand Down
Loading