feat: initial v0.1.0 of mcp-sonarqube-proxy

Stdio MCP server that proxies tools from an upstream SonarQube MCP server
over streamable HTTP. Tools are forwarded 1:1 with full schema preservation
(inputSchema, outputSchema, annotations, title, _meta); CallToolResult is
forwarded including isError and structuredContent.

- proxy.py: persistent upstream ClientSession, low-level Server with
  @list_tools and @call_tool(validate_input=False) handlers — the upstream
  is the sole schema authority.
- cli.py: Click-based 'serve' (stdio) and 'check' (probe) commands;
  logging strictly on stderr (stdout reserved for JSON-RPC).
- Targets mcp 1.27.x decorator API (pinned <2 to guard against the
  unreleased constructor-API rewrite on main).
- pytest suite (14 tests) covering env-var resolution, schema passthrough,
  CallToolResult forwarding, registration, dispatch end-to-end, and CLI
  success/error paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-06 20:26:38 +02:00
commit a6fd188c14
10 changed files with 647 additions and 0 deletions
+49
View File
@@ -0,0 +1,49 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
# Distribution / packaging
build/
dist/
*.egg-info/
*.egg
wheels/
.eggs/
# Virtual environments
.venv/
venv/
env/
ENV/
# uv
.uv/
uv.lock
# Testing
.pytest_cache/
.coverage
.coverage.*
htmlcov/
.tox/
.nox/
# Type checkers / linters
.mypy_cache/
.ruff_cache/
.pyright/
# Editors
.vscode/
.idea/
*.swp
*.swo
.DS_Store
Thumbs.db
# Environment files
.env
.env.local
+90
View File
@@ -0,0 +1,90 @@
# CLAUDE.md
Entwicklungshinweise fuer `mcp-sonarqube-proxy`.
## Projekt
Stdio-MCP-Server, der beim Start eine Streamable-HTTP-Verbindung zum Upstream
SonarQube-MCP-Server oeffnet, dessen Tools 1:1 weiterreicht und Aufrufe an
diesen weiterleitet. Claude Desktop / Claude App spawnt diesen Prozess via stdio.
## Architektur
- `src/mcp_sonarqube_proxy/proxy.py` — eine `upstream_session()` (async context
manager) haelt die persistente `ClientSession` zum Upstream. `build_proxy_server()`
registriert zwei Handler auf einem `mcp.server.lowlevel.Server`, die Aufrufe
transparent weiterleiten.
- `src/mcp_sonarqube_proxy/cli.py` — Click-CLI mit `serve` (stdio) und `check`
(einmaliger Verbindungstest).
### Zentrale Architekturentscheidungen
1. **Low-Level `Server`, nicht `FastMCP`.** FastMCP introspiziert die
Handler-Signatur, um ein `inputSchema` zu generieren. Bei einem generischen
`**kwargs`-Handler ist das Schema leer und der Client weiss nicht, wie er
das Tool aufrufen soll. Der Low-Level-Server uebernimmt dagegen das
`Tool`-Objekt mit allen Feldern (`inputSchema`, `outputSchema`,
`annotations`, `title`, `_meta`) unveraendert vom Upstream.
2. **Eine persistente Upstream-Session, kein Per-Call-Reconnect.** Die fruehere
Variante hat fuer jeden Tool-Call eine neue HTTP-Session geoeffnet. Das
bricht die Streamable-HTTP-Session-Id, multipliziert die Latenz und macht
`initialize()` zur teuersten Operation im Hot Path. Jetzt: einmal beim Start
verbinden, bei `serve`-Lifetime offenhalten, beim Beenden sauber schliessen.
3. **`validate_input=False` auf dem `@call_tool`-Decorator.** Der Upstream ist
die einzige Wahrheit ueber Schemata. Lokale Validierung wuerde nur
duplizieren und koennte bei Schema-Drift zu falschen Ablehnungen fuehren.
4. **`server.create_initialization_options()` statt manueller Capabilities.**
Die Helper-Methode liefert die korrekten `NotificationOptions()` und
`experimental_capabilities={}` und ueberlebt SDK-Updates besser als ein
handgeschriebener Aufruf.
5. **Stateless Tool-Liste — kein Caching.** Jeder `tools/list`-Request
re-fetcht den Upstream. Weniger Code, immer aktuell, kein Cache-Coherency-
Problem.
### Logging und stdout/stderr
`stdout` ist fuer JSON-RPC reserviert. Jegliche Ausgabe (Logging, Startmeldungen,
Fehlermeldungen) muss auf `stderr`. `_configure_logging` setzt das fuer das
Logging-Modul, `_stderr()` ist der Helper fuer direkte Meldungen.
## Lokale Entwicklung
```bash
# Editable install + Dev-Group
uv sync
# Upstream-Verbindung testen
SONARQUBE_MCP_URL=http://192.168.0.2:8765/mcp uv run mcp-sonarqube-proxy check
# Server lokal starten (stdio — z.B. via MCP Inspector)
SONARQUBE_MCP_URL=http://192.168.0.2:8765/mcp uv run mcp-sonarqube-proxy serve
# Tests
uv run pytest
# Lint / Format
uv run ruff check .
uv run ruff format .
```
## Installation als Tool
```bash
uv tool install git+https://gitea.gecheckt.de/marcus/mcp-sonarqube-proxy.git
```
## Bekannte offene Punkte
- **Reconnect bei Verbindungsabbruch:** Wenn der Upstream waehrend `serve`
weggeht, scheitern nachfolgende Tool-Calls bis zum Neustart des Proxies.
Eine Reconnect-Logik mit Retry on `anyio.ClosedResourceError`/`httpx.ReadError`
waere ein sinnvoller naechster Schritt, ist fuer v0.1 aber bewusst weggelassen.
- **Concurrent Tool-Calls:** Die `ClientSession` korreliert Antworten ueber
Request-IDs und sollte parallele Calls verkraften. Nicht explizit getestet.
- **SDK-Versionssprung:** Auf `main` der python-sdk wurden die Decorators
durch Konstruktor-Kwargs ersetzt. Bei Migration auf eine neue Major-Version
muss `build_proxy_server()` angepasst werden.
+83
View File
@@ -0,0 +1,83 @@
# mcp-sonarqube-proxy
Stdio-MCP-Server, der einen Upstream-SonarQube-MCP-Server (Streamable HTTP)
in Claude Desktop / Claude App einbindet. Der Proxy wird via stdio gespawnt,
verbindet sich beim Start mit dem Upstream und reicht alle Tools 1:1 weiter —
inklusive `inputSchema`, `outputSchema` und `annotations`, sodass Claude die
Tools korrekt aufrufen kann.
## Installation
```bash
uv tool install git+https://gitea.gecheckt.de/marcus/mcp-sonarqube-proxy.git
```
## Verwendung
### Verbindung pruefen
```bash
SONARQUBE_MCP_URL=http://192.168.0.2:8765/mcp mcp-sonarqube-proxy check
```
Listet alle Tools auf, die der Upstream bereitstellt. Exit-Code 0 bei Erfolg,
1 wenn der Upstream nicht erreichbar ist.
### Claude Desktop / Claude App Konfiguration
In `claude_desktop_config.json`:
```json
{
"mcpServers": {
"sonarqube": {
"command": "mcp-sonarqube-proxy",
"args": ["serve"],
"env": {
"SONARQUBE_MCP_URL": "http://192.168.0.2:8765/mcp"
}
}
}
}
```
Auf Windows ist `command` typischerweise der absolute Pfad zur per
`uv tool install` installierten EXE, z.B.
`%USERPROFILE%\.local\bin\mcp-sonarqube-proxy.exe`.
## Umgebungsvariablen
| Variable | Default | Beschreibung |
|----------------------|----------------------------------|-------------------------------------------|
| `SONARQUBE_MCP_URL` | `http://192.168.0.2:8765/mcp` | Upstream-MCP-Endpoint (Streamable HTTP) |
## Kommandos
| Kommando | Beschreibung |
|--------------------------------|----------------------------------------------------|
| `mcp-sonarqube-proxy serve` | Startet den MCP-Server auf stdio. |
| `mcp-sonarqube-proxy check` | Testet Upstream-Verbindung und listet Tools. |
| `mcp-sonarqube-proxy --version`| Gibt die Proxy-Version aus. |
## Funktionsweise
1. Beim Start oeffnet `serve` eine Streamable-HTTP-Session zum Upstream und
ruft `initialize()` auf. Schlaegt das fehl, beendet sich der Prozess mit
Exit-Code 1 und schreibt die Fehlermeldung auf stderr.
2. Diese Session bleibt fuer die Lebensdauer des Proxies offen.
3. `tools/list`-Requests vom Client werden 1:1 an den Upstream weitergeleitet —
die `Tool`-Objekte (mit allen Schemata und Annotations) kommen unveraendert
beim Client an.
4. `tools/call`-Requests werden ebenfalls weitergeleitet. Das vollstaendige
`CallToolResult` (inklusive `isError`, `structuredContent` und allen
Content-Bloecken) wird an den Client zurueckgegeben.
5. `stdout` ist ausschliesslich fuer JSON-RPC reserviert. Logging und
Statusmeldungen gehen auf `stderr`.
Der Proxy validiert Tool-Argumente bewusst nicht lokal — der Upstream ist die
einzige Schema-Autoritaet und uebernimmt die Validierung.
## Anforderungen
- Python `>= 3.12`
- `mcp >= 1.27, < 2`
+43
View File
@@ -0,0 +1,43 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mcp-sonarqube-proxy"
version = "0.1.0"
description = "Stdio MCP server that proxies tools from an upstream SonarQube MCP server over streamable HTTP."
readme = "README.md"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Marcus" },
]
dependencies = [
"mcp>=1.27.0,<2",
"click>=8.1",
"anyio>=4.0",
]
[project.scripts]
mcp-sonarqube-proxy = "mcp_sonarqube_proxy.cli:main"
[dependency-groups]
dev = [
"ruff>=0.1",
"pytest>=8.0",
"pytest-asyncio>=0.23",
]
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_sonarqube_proxy"]
[tool.ruff]
line-length = 100
target-version = "py312"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
+1
View File
@@ -0,0 +1 @@
__version__ = "0.1.0"
+87
View File
@@ -0,0 +1,87 @@
"""CLI entry point: ``serve`` (run the stdio proxy) and ``check`` (probe upstream)."""
from __future__ import annotations
import asyncio
import logging
import sys
import click
from mcp import types
from mcp_sonarqube_proxy import __version__
from mcp_sonarqube_proxy.proxy import (
SERVER_NAME,
build_proxy_server,
list_tools_via,
upstream_session,
upstream_url,
)
logger = logging.getLogger(__name__)
def _configure_logging(level: str = "warning") -> None:
"""Send all logging to stderr — stdout is reserved for JSON-RPC."""
logging.basicConfig(
level=getattr(logging, level.upper(), logging.WARNING),
format="%(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
def _stderr(msg: str) -> None:
print(msg, file=sys.stderr, flush=True)
@click.group()
@click.version_option(__version__, prog_name=SERVER_NAME)
def main() -> None:
"""MCP-Proxy fuer SonarQube."""
@main.command()
def serve() -> None:
"""Start the MCP proxy on stdio."""
_configure_logging("warning")
_stderr(f"{SERVER_NAME} {__version__}: connecting to {upstream_url()}")
try:
asyncio.run(_run_serve())
except KeyboardInterrupt:
sys.exit(130)
except Exception as e:
_stderr(f"{SERVER_NAME}: fatal: {e}")
sys.exit(1)
async def _run_serve() -> None:
from mcp.server.stdio import stdio_server
async with upstream_session() as upstream:
server = build_proxy_server(upstream)
_stderr(f"{SERVER_NAME}: upstream connected, serving on stdio")
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())
@main.command()
def check() -> None:
"""Probe the upstream connection and list its tools."""
_configure_logging("warning")
try:
tools = asyncio.run(_run_check())
except Exception as e:
click.echo(click.style(f"Upstream nicht erreichbar: {e}", fg="red"), err=True)
sys.exit(1)
click.echo(click.style(f"Upstream erreichbar — {len(tools)} Tools:", fg="green"))
for t in tools:
title = getattr(t, "title", None) or t.name
suffix = f" ({t.name})" if title != t.name else ""
click.echo(f" - {title}{suffix}: {t.description or '-'}")
sys.exit(0)
async def _run_check() -> list[types.Tool]:
async with upstream_session() as upstream:
return await list_tools_via(upstream)
+73
View File
@@ -0,0 +1,73 @@
"""Transparent stdio MCP proxy: forwards list_tools and call_tool to an upstream server.
The proxy preserves every Tool field that the upstream advertises (inputSchema,
outputSchema, annotations, title, _meta, ...) by passing the Tool objects through
unchanged. Tool calls are forwarded as full CallToolResult instances, preserving
isError, structuredContent and content blocks.
"""
from __future__ import annotations
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any
from mcp import ClientSession, types
from mcp.client.streamable_http import streamablehttp_client
from mcp.server.lowlevel import Server
DEFAULT_UPSTREAM_URL = "http://192.168.0.2:8765/mcp"
SERVER_NAME = "mcp-sonarqube-proxy"
def upstream_url() -> str:
"""Resolve the upstream URL at call time (not at import time)."""
return os.environ.get("SONARQUBE_MCP_URL", DEFAULT_UPSTREAM_URL)
@asynccontextmanager
async def upstream_session(url: str | None = None) -> AsyncIterator[ClientSession]:
"""Open a streamable-HTTP MCP client session to the upstream and yield it.
The session is initialized before yielding. The HTTP transport and session are
torn down when the context exits.
"""
target = url or upstream_url()
async with streamablehttp_client(target) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
yield session
async def list_tools_via(upstream: ClientSession) -> list[types.Tool]:
"""Fetch the upstream tool list and return it 1:1."""
result = await upstream.list_tools()
return list(result.tools)
async def call_tool_via(
upstream: ClientSession, name: str, arguments: dict[str, Any]
) -> types.CallToolResult:
"""Forward a tool call to the upstream and return the full CallToolResult."""
return await upstream.call_tool(name, arguments)
def build_proxy_server(upstream: ClientSession) -> Server:
"""Build a low-level MCP Server whose tool handlers forward to ``upstream``.
Input validation is disabled (validate_input=False): the upstream is the
authoritative schema owner, so we let it reject malformed calls rather than
duplicate validation locally.
"""
server: Server = Server(SERVER_NAME)
@server.list_tools()
async def _list_tools() -> list[types.Tool]:
return await list_tools_via(upstream)
@server.call_tool(validate_input=False)
async def _call_tool(name: str, arguments: dict[str, Any]) -> types.CallToolResult:
return await call_tool_via(upstream, name, arguments)
return server
View File
+71
View File
@@ -0,0 +1,71 @@
"""Tests for the Click CLI: serve startup errors, check success/failure."""
from __future__ import annotations
import pytest
from click.testing import CliRunner
from mcp import types
from mcp_sonarqube_proxy import cli
def test_check_success_lists_tools(monkeypatch: pytest.MonkeyPatch) -> None:
fake_tools = [
types.Tool(
name="search",
description="Search issues.",
inputSchema={"type": "object"},
),
types.Tool(
name="get_metric",
title="Project Metric",
description=None,
inputSchema={"type": "object"},
),
]
async def fake_run_check() -> list[types.Tool]:
return fake_tools
monkeypatch.setattr(cli, "_run_check", fake_run_check)
result = CliRunner().invoke(cli.main, ["check"])
assert result.exit_code == 0
assert "2 Tools" in result.output
assert "search" in result.output
assert "Project Metric" in result.output
def test_check_failure_exits_one_with_stderr(monkeypatch: pytest.MonkeyPatch) -> None:
async def fake_run_check() -> list[types.Tool]:
raise ConnectionError("connection refused")
monkeypatch.setattr(cli, "_run_check", fake_run_check)
result = CliRunner().invoke(cli.main, ["check"])
assert result.exit_code == 1
assert "Upstream nicht erreichbar" in result.stderr
assert "connection refused" in result.stderr
def test_serve_startup_error_exits_one(monkeypatch: pytest.MonkeyPatch) -> None:
async def fake_run_serve() -> None:
raise RuntimeError("upstream initialize failed")
monkeypatch.setattr(cli, "_run_serve", fake_run_serve)
result = CliRunner().invoke(cli.main, ["serve"])
assert result.exit_code == 1
assert "fatal" in result.stderr
assert "upstream initialize failed" in result.stderr
def test_version_flag() -> None:
from mcp_sonarqube_proxy import __version__
result = CliRunner().invoke(cli.main, ["--version"])
assert result.exit_code == 0
assert __version__ in result.output
+150
View File
@@ -0,0 +1,150 @@
"""Tests for the proxy core: discovery, schema preservation, call forwarding."""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock
import pytest
from mcp import types
from mcp_sonarqube_proxy.proxy import (
DEFAULT_UPSTREAM_URL,
build_proxy_server,
call_tool_via,
list_tools_via,
upstream_url,
)
def _full_tool() -> types.Tool:
"""A Tool with every passthrough field populated."""
return types.Tool(
name="rich_tool",
title="Rich Tool",
description="A tool with full metadata.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "search query"},
"limit": {"type": "integer", "minimum": 1, "default": 10},
},
"required": ["query"],
"additionalProperties": False,
},
outputSchema={
"type": "object",
"properties": {"hits": {"type": "array", "items": {"type": "string"}}},
},
annotations=types.ToolAnnotations(
title="Rich Tool",
readOnlyHint=True,
idempotentHint=True,
openWorldHint=False,
),
)
def _fake_upstream(*, list_tools_return: Any = None, call_tool_return: Any = None) -> AsyncMock:
upstream = AsyncMock()
if list_tools_return is not None:
upstream.list_tools.return_value = list_tools_return
if call_tool_return is not None:
upstream.call_tool.return_value = call_tool_return
return upstream
def test_upstream_url_default(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("SONARQUBE_MCP_URL", raising=False)
assert upstream_url() == DEFAULT_UPSTREAM_URL
def test_upstream_url_env_override(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("SONARQUBE_MCP_URL", "http://example.test:9000/mcp")
assert upstream_url() == "http://example.test:9000/mcp"
async def test_list_tools_via_returns_full_tool_objects() -> None:
tool = _full_tool()
upstream = _fake_upstream(list_tools_return=types.ListToolsResult(tools=[tool]))
tools = await list_tools_via(upstream)
assert len(tools) == 1
# Round-trip dump must match exactly: schemas, annotations, title all survive.
assert tools[0].model_dump(by_alias=True) == tool.model_dump(by_alias=True)
upstream.list_tools.assert_awaited_once()
async def test_list_tools_via_returns_empty_list() -> None:
upstream = _fake_upstream(list_tools_return=types.ListToolsResult(tools=[]))
tools = await list_tools_via(upstream)
assert tools == []
async def test_list_tools_via_propagates_errors() -> None:
upstream = AsyncMock()
upstream.list_tools.side_effect = RuntimeError("upstream gone")
with pytest.raises(RuntimeError, match="upstream gone"):
await list_tools_via(upstream)
async def test_call_tool_via_forwards_full_result() -> None:
expected = types.CallToolResult(
content=[types.TextContent(type="text", text="ok")],
structuredContent={"score": 42},
isError=False,
)
upstream = _fake_upstream(call_tool_return=expected)
result = await call_tool_via(upstream, "do_thing", {"x": 1})
assert result is expected
upstream.call_tool.assert_awaited_once_with("do_thing", {"x": 1})
async def test_call_tool_via_preserves_is_error_results() -> None:
expected = types.CallToolResult(
content=[types.TextContent(type="text", text="boom")],
isError=True,
)
upstream = _fake_upstream(call_tool_return=expected)
result = await call_tool_via(upstream, "broken", {})
assert result.isError is True
assert result.content[0].text == "boom"
async def test_call_tool_via_propagates_errors() -> None:
upstream = AsyncMock()
upstream.call_tool.side_effect = ConnectionError("link down")
with pytest.raises(ConnectionError, match="link down"):
await call_tool_via(upstream, "anything", {})
def test_build_proxy_server_registers_handlers() -> None:
upstream = AsyncMock()
server = build_proxy_server(upstream)
assert types.ListToolsRequest in server.request_handlers
assert types.CallToolRequest in server.request_handlers
async def test_build_proxy_server_handler_forwards_tools_through_dispatch() -> None:
"""End-to-end: dispatch a ListToolsRequest through the registered handler."""
tool = _full_tool()
upstream = _fake_upstream(list_tools_return=types.ListToolsResult(tools=[tool]))
server = build_proxy_server(upstream)
handler = server.request_handlers[types.ListToolsRequest]
server_result = await handler(types.ListToolsRequest(method="tools/list"))
inner = server_result.root # ServerResult wraps a ListToolsResult
assert isinstance(inner, types.ListToolsResult)
assert len(inner.tools) == 1
assert inner.tools[0].model_dump(by_alias=True) == tool.model_dump(by_alias=True)