Files
mcp-synology-filestation/src/mcp_synology_filestation/auth.py
T
marcus 59301ae760 feat: implement auth infrastructure and first two FileStation tools
- auth.py: AuthManager with OS keyring, env var fallback, 2FA device token flow
- config.py: AppConfig/ConnectionConfig dataclasses, YAML load/save, env overrides
- client.py: FileStationClient with lazy init, session re-auth, upload/download
- cli.py: setup / check / serve subcommands (anyio.run throughout)
- server.py: create_server factory wiring FastMCP to FileStation tools
- tools/filestation.py: list_shares and list_dir with ASCII table output,
  pagination hints, input validation, DSM error mapping
- tests: 30 unit tests, all passing (auth, config, tools)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:23:34 +02:00

192 lines
6.4 KiB
Python

"""Authentication manager: credentials, keyring, login, 2FA device token.
Credential resolution order:
1. Environment variables (SYNOLOGY_USERNAME, SYNOLOGY_PASSWORD)
2. OS Keyring (set by 'setup' command)
Session tokens are held in memory only — never persisted to disk.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp_synology_filestation.client import FileStationClient
from mcp_synology_filestation.config import AppConfig
logger = logging.getLogger(__name__)
# DSM Auth API error code for 2FA required
_ERROR_2FA_REQUIRED = 403
# Keyring keys
_KEY_USERNAME = "username"
_KEY_PASSWORD = "password"
_KEY_DEVICE_TOKEN = "device_token"
class AuthenticationError(Exception):
"""Raised when authentication fails."""
def __init__(self, message: str, code: int | None = None) -> None:
self.code = code
super().__init__(message)
class AuthManager:
"""Manages DSM credentials and session lifecycle."""
def __init__(self, config: AppConfig) -> None:
self._config = config
def resolve_credentials(self) -> tuple[str, str, str | None]:
"""Resolve username, password, and optional device token.
Returns:
Tuple of (username, password, device_token_or_None).
Raises:
AuthenticationError: If no credentials are available.
"""
username: str | None = None
password: str | None = None
device_token: str | None = None
# 1. Environment variables (highest priority)
username = os.environ.get("SYNOLOGY_USERNAME")
password = os.environ.get("SYNOLOGY_PASSWORD")
if username:
logger.debug("Username from env var SYNOLOGY_USERNAME")
if password:
logger.debug("Password from env var SYNOLOGY_PASSWORD")
# 2. OS Keyring
if not username or not password:
try:
import keyring
service = self._config.keyring_service
kr_user = keyring.get_password(service, _KEY_USERNAME)
kr_pass = keyring.get_password(service, _KEY_PASSWORD)
kr_device = keyring.get_password(service, _KEY_DEVICE_TOKEN)
if kr_user and not username:
username = kr_user
logger.debug("Username from keyring")
if kr_pass and not password:
password = kr_pass
logger.debug("Password from keyring")
if kr_device and not device_token:
device_token = kr_device
logger.debug("Device token from keyring")
except Exception:
logger.debug("Keyring not available or failed")
if not username or not password:
msg = (
"No credentials found. Run 'mcp-synology-filestation setup' to store "
"credentials in the OS keyring, or set SYNOLOGY_USERNAME and "
"SYNOLOGY_PASSWORD environment variables."
)
raise AuthenticationError(msg)
return username, password, device_token
def store_credentials(self, username: str, password: str) -> bool:
"""Store credentials in the OS keyring.
Returns:
True on success, False if keyring is unavailable.
"""
try:
import keyring
service = self._config.keyring_service
keyring.set_password(service, _KEY_USERNAME, username)
keyring.set_password(service, _KEY_PASSWORD, password)
logger.debug("Credentials stored in keyring: service=%s", service)
return True
except Exception as e:
logger.warning("Failed to store credentials in keyring: %s", e)
return False
def store_device_token(self, device_token: str) -> None:
"""Store 2FA device token in the OS keyring."""
import keyring
service = self._config.keyring_service
keyring.set_password(service, _KEY_DEVICE_TOKEN, device_token)
logger.debug("Device token stored in keyring: service=%s", service)
async def login(self, client: FileStationClient) -> str:
"""Authenticate against DSM API and return session ID.
Handles both simple login and 2FA device token flow.
The caller is responsible for storing the returned SID.
Args:
client: FileStationClient instance to use for the request.
Returns:
Session ID string.
Raises:
AuthenticationError: If login fails.
"""
username, password, device_token = self.resolve_credentials()
params: dict[str, Any] = {
"account": username,
"passwd": password,
"format": "sid",
}
if device_token:
logger.debug("Login: using 2FA device token")
params["device_id"] = device_token
params["device_name"] = "MCPSynologyFileStation"
else:
logger.debug("Login: simple (no 2FA device token)")
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request("SYNO.API.Auth", "login", version=7, params=params)
except SynologyError as e:
if e.code == _ERROR_2FA_REQUIRED:
raise AuthenticationError(
"2FA is required but no device token is available. "
"Run 'mcp-synology-filestation setup' to complete 2FA setup.",
code=_ERROR_2FA_REQUIRED,
) from e
raise AuthenticationError(str(e), code=e.code) from e
sid: str | None = data.get("sid")
if not sid:
raise AuthenticationError("Login succeeded but no session ID was returned.")
logger.info("Authenticated as '%s'", username)
return sid
async def logout(self, client: FileStationClient) -> None:
"""Log out the current session.
Args:
client: FileStationClient instance with active session.
"""
if not client.sid:
return
logger.debug("Logging out session")
try:
from mcp_synology_filestation.client import SynologyError
await client.request("SYNO.API.Auth", "logout", version=7, params={})
except SynologyError:
logger.debug("Logout failed (session may have already expired)")
finally:
client.sid = None