feat: Gruppe 1 – Projektgerüst, Auth, CLI (v0.1.0)

- pyproject.toml: hatchling build, mcp-familywall entry-point, deps
- src/mcp_familywall/__init__.py: version 0.1.0
- src/mcp_familywall/config.py: YAML config loader (schema_version: 1)
- src/mcp_familywall/auth.py: keyring credential resolution (FW_EMAIL/FW_PASSWORD → keyring)
- src/mcp_familywall/fw_client.py: httpx client, login/logout/call, FW_DEBUG logging
- src/mcp_familywall/cli.py: click CLI with setup / check / serve commands
- .gitignore, README.md, CLAUDE.md, SPEC.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-15 12:18:37 +02:00
parent a58c776298
commit 38da31b0cb
10 changed files with 948 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
__version__ = "0.1.0"
+91
View File
@@ -0,0 +1,91 @@
"""Credential resolution for Family Wall.
Credential resolution order:
1. Environment variables FW_EMAIL, FW_PASSWORD
2. OS keyring (service: 'mcp-familywall', keys: 'email', 'password')
"""
from __future__ import annotations
import logging
import os
logger = logging.getLogger(__name__)
KEYRING_SERVICE = "mcp-familywall"
def get_credentials() -> tuple[str, str]:
"""Resolve Family Wall credentials.
Resolution order:
1. Environment variables FW_EMAIL / FW_PASSWORD
2. OS keyring
Returns:
Tuple of (email, password).
Raises:
RuntimeError: If credentials cannot be resolved from any source.
"""
email: str | None = None
password: str | None = None
# 1. Environment variables (highest priority)
email = os.environ.get("FW_EMAIL")
if email:
logger.debug("Email from env var FW_EMAIL: %s", email)
password = os.environ.get("FW_PASSWORD")
if password:
logger.debug("Password from env var FW_PASSWORD")
# 2. OS keyring
if not email or not password:
try:
import keyring
kr_email = keyring.get_password(KEYRING_SERVICE, "email")
kr_password = keyring.get_password(KEYRING_SERVICE, "password")
if kr_email and not email:
email = kr_email
logger.debug("Email from keyring: %s", email)
if kr_password and not password:
password = kr_password
logger.debug("Password from keyring")
except Exception:
logger.debug("Keyring not available or could not be read.")
if not email or not password:
msg = (
"No credentials found. Run 'mcp-familywall setup' to store credentials "
"in the OS keyring, or set FW_EMAIL and FW_PASSWORD environment variables."
)
raise RuntimeError(msg)
logger.debug("Credentials resolved: email=%s, has_password=yes", email)
return email, password
def store_credentials(email: str, password: str) -> bool:
"""Store credentials in the OS keyring.
Args:
email: Family Wall email address.
password: Family Wall password.
Returns:
True if stored successfully, False if keyring is unavailable.
"""
try:
import keyring
from keyring.errors import KeyringError
keyring.set_password(KEYRING_SERVICE, "email", email)
keyring.set_password(KEYRING_SERVICE, "password", password)
logger.debug("Credentials stored in keyring (service=%s)", KEYRING_SERVICE)
return True
except (ImportError, KeyringError, OSError) as exc:
logger.debug("Failed to store credentials in keyring: %s", exc)
return False
+151
View File
@@ -0,0 +1,151 @@
"""CLI entry point for mcp-familywall.
Commands:
- setup : Interactive credential setup; stores email+password in OS keyring.
- check : Validate stored credentials against the Family Wall API.
- serve : Start the MCP server (launched by Claude Desktop).
"""
from __future__ import annotations
import json
import shutil
import sys
import click
from mcp_familywall import __version__
@click.group(context_settings={"help_option_names": ["-h", "--help"]}, invoke_without_command=True)
@click.version_option(__version__, "-v", "--version", prog_name="mcp-familywall")
@click.pass_context
def app(ctx: click.Context) -> None:
"""mcp-familywall — MCP server for Family Wall (read-only)."""
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
# ---------------------------------------------------------------------------
# setup
# ---------------------------------------------------------------------------
@app.command()
def setup() -> None:
"""Interactive credential setup.
Prompts for email and password, verifies them against the Family Wall API,
stores them in the OS keyring, and prints a Claude Desktop config snippet.
"""
from mcp_familywall.auth import store_credentials
from mcp_familywall.config import save_config
from mcp_familywall.fw_client import FamilyWallClient, FamilyWallError
click.echo("mcp-familywall setup\n")
email = click.prompt("Family Wall email")
password = click.prompt("Family Wall password", hide_input=True)
# Verify credentials
click.echo("\nVerifying credentials...")
try:
with FamilyWallClient() as client:
client.login(email, password)
client.logout()
except FamilyWallError as exc:
click.echo(click.style(f"Login failed: {exc}", fg="red"), err=True)
sys.exit(1)
except Exception as exc:
click.echo(click.style(f"Connection error: {exc}", fg="red"), err=True)
sys.exit(1)
click.echo(click.style("Login successful!", fg="green"))
# Store credentials
ok = store_credentials(email, password)
if ok:
click.echo("Credentials stored in OS keyring.")
else:
click.echo(
click.style("Keyring not available.", fg="yellow")
+ " Set environment variables instead:\n"
f" FW_EMAIL={email}\n"
" FW_PASSWORD=<your-password>"
)
# Ensure config file exists
save_config()
# Print Claude Desktop snippet
_emit_claude_desktop_snippet()
# ---------------------------------------------------------------------------
# check
# ---------------------------------------------------------------------------
@app.command()
def check() -> None:
"""Validate stored credentials against the Family Wall API."""
from mcp_familywall.auth import get_credentials
from mcp_familywall.fw_client import FamilyWallClient, FamilyWallError
click.echo("Checking Family Wall credentials...")
try:
email, password = get_credentials()
except RuntimeError as exc:
click.echo(click.style(f"Error: {exc}", fg="red"), err=True)
sys.exit(1)
click.echo(f"Email: {email}")
try:
with FamilyWallClient() as client:
client.login(email, password)
client.logout()
except FamilyWallError as exc:
click.echo(click.style(f"Authentication failed: {exc}", fg="red"), err=True)
sys.exit(1)
except Exception as exc:
click.echo(click.style(f"Connection error: {exc}", fg="red"), err=True)
sys.exit(1)
click.echo(click.style("Authentication successful!", fg="green"))
# ---------------------------------------------------------------------------
# serve
# ---------------------------------------------------------------------------
@app.command()
def serve() -> None:
"""Start the MCP server (launched by Claude Desktop)."""
from mcp_familywall.server import create_server
server = create_server()
server.run(transport="stdio")
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _emit_claude_desktop_snippet() -> None:
"""Print a Claude Desktop JSON config snippet."""
uvx_path = shutil.which("uvx") or "<path-to-uvx>"
snippet = {
"mcpServers": {
"familywall": {
"command": uvx_path,
"args": ["mcp-familywall", "serve"],
}
}
}
click.echo("\nAdd this to your Claude Desktop config (claude_desktop_config.json):\n")
click.echo(json.dumps(snippet, indent=2))
+73
View File
@@ -0,0 +1,73 @@
"""YAML config loading and validation.
Config path: ~/.config/mcp-familywall/config.yaml
Schema: {schema_version: 1}
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
import yaml
logger = logging.getLogger(__name__)
CURRENT_SCHEMA_VERSION = 1
CONFIG_DIR = Path.home() / ".config" / "mcp-familywall"
CONFIG_PATH = CONFIG_DIR / "config.yaml"
_DEFAULT_CONFIG: dict[str, Any] = {"schema_version": CURRENT_SCHEMA_VERSION}
def load_config() -> dict[str, Any]:
"""Load and validate the config file.
Returns the config dict. Creates a default config if none exists.
Raises:
ValueError: If schema_version does not match the expected version.
"""
if not CONFIG_PATH.exists():
logger.debug("No config found at %s; using defaults", CONFIG_PATH)
return dict(_DEFAULT_CONFIG)
logger.debug("Loading config from %s", CONFIG_PATH)
raw_text = CONFIG_PATH.read_text(encoding="utf-8")
raw: dict[str, Any] = yaml.safe_load(raw_text) or {}
_validate(raw)
logger.debug("Config loaded: schema_version=%s", raw.get("schema_version"))
return raw
def save_config(config: dict[str, Any] | None = None) -> None:
"""Save a config dict to disk (or write defaults if None).
Args:
config: Config dict to save. Defaults to minimal default config.
"""
if config is None:
config = dict(_DEFAULT_CONFIG)
_validate(config)
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
header = "# Generated by mcp-familywall setup\n"
CONFIG_PATH.write_text(header + yaml.dump(config, default_flow_style=False), encoding="utf-8")
logger.debug("Config written to %s", CONFIG_PATH)
def _validate(config: dict[str, Any]) -> None:
"""Validate config dict.
Raises:
ValueError: If schema_version is missing or wrong.
"""
version = config.get("schema_version")
if version != CURRENT_SCHEMA_VERSION:
msg = (
f"Config schema_version is {version!r}, "
f"but this server expects {CURRENT_SCHEMA_VERSION}."
)
raise ValueError(msg)
+184
View File
@@ -0,0 +1,184 @@
"""Family Wall HTTP client.
Session strategy: Login → API call → Logout per MCP tool call. No session caching.
Debug logging is enabled when the environment variable FW_DEBUG=1 is set.
Passwords are always masked in debug output.
"""
from __future__ import annotations
import logging
import os
import sys
from typing import Any
import httpx
logger = logging.getLogger(__name__)
BASE_URL = "https://api.familywall.com/api"
_debug = os.environ.get("FW_DEBUG") == "1"
def _debug_log(label: str, data: Any) -> None:
"""Write debug info to stderr when FW_DEBUG=1."""
if _debug:
print(f"[FW_DEBUG] {label}: {data}", file=sys.stderr)
def _mask_password(data: dict[str, Any]) -> dict[str, Any]:
"""Return a copy of data with the password field masked."""
masked = dict(data)
if "password" in masked:
masked["password"] = "***"
return masked
class FamilyWallError(Exception):
"""Raised when the Family Wall API returns an error response."""
def __init__(self, message: str, response_data: Any = None) -> None:
super().__init__(message)
self.response_data = response_data
class FamilyWallClient:
"""Synchronous HTTP client for the Family Wall API.
Usage::
client = FamilyWallClient()
client.login(email, password)
data = client.call("accgetallfamily", {"a01call": "taskcategorysync"})
client.logout()
Or use as a context manager::
with FamilyWallClient() as client:
client.login(email, password)
data = client.call("famlistfamily")
"""
def __init__(self) -> None:
self._http: httpx.Client = httpx.Client(timeout=30)
self._session_id: str | None = None
def __enter__(self) -> FamilyWallClient:
return self
def __exit__(self, *args: object) -> None:
self._http.close()
def login(self, email: str, password: str) -> None:
"""Authenticate with Family Wall and store the session ID.
Args:
email: Family Wall account email.
password: Family Wall account password.
Raises:
FamilyWallError: If authentication fails.
httpx.HTTPError: On network-level errors.
"""
endpoint = f"{BASE_URL}/log2in"
payload = {
"identifier": email,
"password": password,
"type": "email",
}
_debug_log("LOGIN request", _mask_password(payload))
response = self._http.post(
endpoint,
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
body: dict[str, Any] = response.json()
_debug_log("LOGIN response", body)
if "ex" in body or "un" in body:
error_data = body.get("ex", body.get("un"))
msg = f"Login failed: {error_data}"
raise FamilyWallError(msg, response_data=body)
if "r" not in body:
raise FamilyWallError("Login failed: unexpected response format", response_data=body)
# Extract JSESSIONID from the Set-Cookie header
session_id = response.cookies.get("JSESSIONID")
if not session_id:
raise FamilyWallError("Login succeeded but no JSESSIONID cookie returned.")
self._session_id = session_id
logger.debug("Logged in; session acquired")
# Configure the HTTP client to send the session cookie and CSRF token on all future calls
self._http.cookies.set("JSESSIONID", session_id)
self._http.headers.update({"Tokencsrf": session_id})
def logout(self) -> None:
"""Invalidate the current session server-side.
Silently ignores errors — the session will eventually expire anyway.
"""
if not self._session_id:
return
endpoint = f"{BASE_URL}/log2out"
_debug_log("LOGOUT request", {})
try:
response = self._http.post(
endpoint,
data={},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
_debug_log("LOGOUT response", response.text)
except httpx.HTTPError as exc:
logger.debug("Logout request failed (session may already be expired): %s", exc)
finally:
self._session_id = None
self._http.cookies.clear()
if "Tokencsrf" in self._http.headers:
del self._http.headers["Tokencsrf"]
def call(self, endpoint: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
"""Call a Family Wall API endpoint.
Must be called after a successful :meth:`login`.
Args:
endpoint: API endpoint name (e.g. ``"famlistfamily"``).
params: Optional form parameters to send.
Returns:
Parsed JSON response body.
Raises:
FamilyWallError: If not logged in, or if the API returns an error.
httpx.HTTPError: On network-level errors.
"""
if not self._session_id:
raise FamilyWallError("Not logged in. Call login() first.")
url = f"{BASE_URL}/{endpoint}"
payload = params or {}
_debug_log(f"CALL {endpoint} request", payload)
response = self._http.post(
url,
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
body: dict[str, Any] = response.json()
_debug_log(f"CALL {endpoint} response", body)
if "ex" in body or "un" in body:
error_data = body.get("ex", body.get("un"))
msg = f"API error from {endpoint!r}: {error_data}"
raise FamilyWallError(msg, response_data=body)
return body