feat(auth): forward SONARQUBE_TOKEN to upstream as Bearer header

The upstream MCP container requires a SonarQube user token in the
Authorization header. Without one, every call returns 401.

- proxy: read SONARQUBE_TOKEN via sonarqube_token() at session-open
  time; raise TokenMissingError when unset/blank. upstream_session()
  attaches the token as "Authorization: Bearer <token>" via
  streamablehttp_client(headers=...).
- cli: fail fast in serve and check with a clear stderr message and
  exit 1 when the token is missing, before any network attempt. All
  exception text written to stderr passes through _redact() so an
  accidentally-leaked token from a third-party exception is replaced
  with [REDACTED] before display.
- The token is never stored on any object, never logged, and the
  TokenMissingError message contains no token material (it only
  describes how to generate one in SonarQube).
- Tests: header forwarding via mocked streamablehttp_client, missing-
  token exit code, redaction in CLI error paths, whitespace stripping
  on the token. Total: 25 tests.
- Docs: README/CLAUDE updated with the new env-var, Claude Desktop
  config snippet, and the security guarantees. CHANGELOG added.

Bumps version to 0.2.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-06 20:42:51 +02:00
parent a6fd188c14
commit d76c16d9a7
10 changed files with 307 additions and 16 deletions
+15
View File
@@ -0,0 +1,15 @@
"""Shared test fixtures."""
from __future__ import annotations
import pytest
@pytest.fixture(autouse=True)
def _default_token(monkeypatch: pytest.MonkeyPatch) -> None:
"""Provide a non-empty SONARQUBE_TOKEN by default.
Tests that exercise the missing-token path explicitly call
``monkeypatch.delenv("SONARQUBE_TOKEN", raising=False)``.
"""
monkeypatch.setenv("SONARQUBE_TOKEN", "test-token-default")
+63
View File
@@ -69,3 +69,66 @@ def test_version_flag() -> None:
result = CliRunner().invoke(cli.main, ["--version"])
assert result.exit_code == 0
assert __version__ in result.output
# --- SONARQUBE_TOKEN handling ------------------------------------------------
def test_check_missing_token_exits_one_with_clean_message(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SONARQUBE_TOKEN", raising=False)
result = CliRunner().invoke(cli.main, ["check"])
assert result.exit_code == 1
assert "SONARQUBE_TOKEN" in result.stderr
assert "Security" in result.stderr
def test_serve_missing_token_exits_one_with_clean_message(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SONARQUBE_TOKEN", raising=False)
result = CliRunner().invoke(cli.main, ["serve"])
assert result.exit_code == 1
assert "SONARQUBE_TOKEN" in result.stderr
def test_check_redacts_token_when_error_includes_it(
monkeypatch: pytest.MonkeyPatch,
) -> None:
secret = "secret-token-do-not-leak"
monkeypatch.setenv("SONARQUBE_TOKEN", secret)
async def fake_run_check() -> list[types.Tool]:
# Simulate a worst-case dependency that leaks the token in its message.
raise RuntimeError(f"401 Unauthorized; sent header Bearer {secret}")
monkeypatch.setattr(cli, "_run_check", fake_run_check)
result = CliRunner().invoke(cli.main, ["check"])
assert result.exit_code == 1
assert secret not in result.stderr
assert "[REDACTED]" in result.stderr
def test_serve_redacts_token_when_error_includes_it(
monkeypatch: pytest.MonkeyPatch,
) -> None:
secret = "another-secret-token-leak"
monkeypatch.setenv("SONARQUBE_TOKEN", secret)
async def fake_run_serve() -> None:
raise RuntimeError(f"upstream said Bearer {secret} is invalid")
monkeypatch.setattr(cli, "_run_serve", fake_run_serve)
result = CliRunner().invoke(cli.main, ["serve"])
assert result.exit_code == 1
assert secret not in result.stderr
assert "[REDACTED]" in result.stderr
+82
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any
from unittest.mock import AsyncMock
@@ -10,9 +11,12 @@ from mcp import types
from mcp_sonarqube_proxy.proxy import (
DEFAULT_UPSTREAM_URL,
TokenMissingError,
build_proxy_server,
call_tool_via,
list_tools_via,
sonarqube_token,
upstream_session,
upstream_url,
)
@@ -148,3 +152,81 @@ async def test_build_proxy_server_handler_forwards_tools_through_dispatch() -> N
assert isinstance(inner, types.ListToolsResult)
assert len(inner.tools) == 1
assert inner.tools[0].model_dump(by_alias=True) == tool.model_dump(by_alias=True)
# --- SONARQUBE_TOKEN handling ------------------------------------------------
def test_sonarqube_token_returns_value(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("SONARQUBE_TOKEN", "my-token-xyz")
assert sonarqube_token() == "my-token-xyz"
def test_sonarqube_token_strips_whitespace(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("SONARQUBE_TOKEN", " trimmed-token ")
assert sonarqube_token() == "trimmed-token"
def test_sonarqube_token_raises_when_missing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SONARQUBE_TOKEN", raising=False)
with pytest.raises(TokenMissingError):
sonarqube_token()
def test_sonarqube_token_raises_when_blank(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("SONARQUBE_TOKEN", " ")
with pytest.raises(TokenMissingError):
sonarqube_token()
def test_token_missing_error_message_carries_no_token() -> None:
msg = str(TokenMissingError())
assert "SONARQUBE_TOKEN" in msg
assert "Security" in msg
# No token material is ever in this message — there is no token to leak.
async def test_upstream_session_attaches_bearer_header(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""The token must be forwarded as Authorization: Bearer <token>."""
monkeypatch.setenv("SONARQUBE_TOKEN", "header-test-token")
captured: dict[str, Any] = {}
fake_session = AsyncMock()
@asynccontextmanager
async def fake_streamable(url: str, headers: dict[str, str] | None = None, **_: Any):
captured["url"] = url
captured["headers"] = headers
yield ("READ", "WRITE", lambda: None)
class FakeClientSession:
def __init__(self, read: Any, write: Any) -> None:
self.read = read
self.write = write
async def __aenter__(self) -> AsyncMock:
return fake_session
async def __aexit__(self, *exc: Any) -> None:
return None
monkeypatch.setattr("mcp_sonarqube_proxy.proxy.streamablehttp_client", fake_streamable)
monkeypatch.setattr("mcp_sonarqube_proxy.proxy.ClientSession", FakeClientSession)
async with upstream_session(url="http://x.example/mcp") as session:
assert session is fake_session
assert captured["url"] == "http://x.example/mcp"
assert captured["headers"] == {"Authorization": "Bearer header-test-token"}
fake_session.initialize.assert_awaited_once()
async def test_upstream_session_aborts_when_token_missing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("SONARQUBE_TOKEN", raising=False)
with pytest.raises(TokenMissingError):
async with upstream_session(url="http://x.example/mcp"):
pass