"""DSM HTTP client with session management and auto-re-login. Thin async client wrapping Synology DSM Web API conventions: - GET-only requests (DSM APIs work with GET params) - Session ID injection (_sid parameter) - Automatic re-login on session errors (codes 106, 107, 119) - File upload via POST multipart (SYNO.FileStation.Upload only) - File download via streaming GET (SYNO.FileStation.Download) """ from __future__ import annotations import asyncio import logging import sys from typing import TYPE_CHECKING, Any import httpx if TYPE_CHECKING: from mcp_synology_container.auth import AuthManager logger = logging.getLogger(__name__) # Session error codes that trigger transparent re-auth _SESSION_ERROR_CODES = frozenset({106, 107, 119}) # Parameters to mask in debug logging _SENSITIVE_PARAMS = frozenset({"passwd", "_sid", "device_id", "otp_code", "device_token"}) class SynologyError(Exception): """Raised when DSM API returns a non-success response.""" def __init__(self, message: str, code: int | None = None) -> None: self.code = code super().__init__(message) def _error_message(code: int, api: str = "") -> str: """Map DSM error code to human-readable message.""" # Common codes common = { 100: "Unknown error", 101: "Invalid parameter", 102: "API does not exist on this NAS", 103: "Method does not exist", 104: "API version not supported", 105: "Permission denied — check DSM user permissions", 106: "Session timeout", 107: "Session displaced by another login", 119: "Session invalid", } # Auth codes auth = { 400: "Incorrect username or password", 401: "Account disabled", 402: "Permission denied for this service", 403: "2FA code required", 404: "2FA code incorrect or expired", 407: "Too many failed login attempts — account temporarily locked", 408: "IP blocked due to excessive failed attempts", } # Docker API codes docker = { 1: "Project not found", 2: "Container not found", } if "Auth" in api and code in auth: return auth[code] if code in common: return common[code] return f"DSM error code {code}" class DsmClient: """Async HTTP client for Synology DSM API. Usage: async with DsmClient(base_url, verify_ssl=True) as client: await client.query_api_info() auth_manager = AuthManager(config) await auth_manager.login(client) data = await client.request("SYNO.Docker.Project", "list") """ def __init__( self, base_url: str, verify_ssl: bool = True, timeout: int = 30, ) -> None: self._base_url = base_url.rstrip("/") self._verify_ssl = verify_ssl self._timeout = timeout self._http: httpx.AsyncClient | None = None self._api_cache: dict[str, dict[str, Any]] = {} self._sid: str | None = None self._auth_manager: AuthManager | None = None self._reauth_lock = asyncio.Lock() self._init_lock = asyncio.Lock() self._initialized = False self._initializing = False # True while inside _ensure_initialized logger.debug( "DsmClient: base_url=%s verify_ssl=%s timeout=%d", self._base_url, verify_ssl, timeout, ) @property def sid(self) -> str | None: """Current session ID.""" return self._sid @sid.setter def sid(self, value: str | None) -> None: self._sid = value def set_auth_manager(self, auth_manager: AuthManager) -> None: """Register the AuthManager for automatic re-login on session errors.""" self._auth_manager = auth_manager async def _ensure_initialized(self) -> None: """Connect to NAS and authenticate on first use (lazy init). Subsequent calls are no-ops. Thread-safe via asyncio.Lock. """ if self._initialized: return async with self._init_lock: if self._initialized: # re-check inside lock return self._initializing = True try: sys.stderr.write(f"[dsm] Connecting to {self._base_url}...\n") sys.stderr.flush() logger.debug("Lazy init: querying API info from %s", self._base_url) await self.query_api_info() sys.stderr.write(f"[dsm] API info OK ({len(self._api_cache)} APIs)\n") sys.stderr.flush() if self._auth_manager: sys.stderr.write("[dsm] Authenticating...\n") sys.stderr.flush() logger.debug("Lazy init: authenticating") self._sid = await self._auth_manager.login(self) sys.stderr.write("[dsm] Auth OK\n") sys.stderr.flush() self._initialized = True logger.debug("Lazy init complete") finally: self._initializing = False async def __aenter__(self) -> DsmClient: logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) self._http = httpx.AsyncClient( verify=self._verify_ssl, timeout=httpx.Timeout(connect=10.0, read=float(self._timeout), write=10.0, pool=5.0), ) return self async def __aexit__(self, *args: object) -> None: if self._http: await self._http.aclose() self._http = None def _get_http(self) -> httpx.AsyncClient: if self._http is None: msg = "DsmClient must be used as an async context manager." raise RuntimeError(msg) return self._http async def query_api_info(self) -> dict[str, dict[str, Any]]: """Query SYNO.API.Info to discover all available APIs and cache them. Must be called before any other API requests. Returns: Dict mapping API name -> {path, minVersion, maxVersion}. """ http = self._get_http() url = f"{self._base_url}/webapi/query.cgi" params = { "api": "SYNO.API.Info", "version": "1", "method": "query", "query": "ALL", } logger.debug("Querying API info from %s", url) resp = await http.get(url, params=params) resp.raise_for_status() body = resp.json() if not body.get("success"): code = body.get("error", {}).get("code", 0) raise SynologyError(_error_message(code, "SYNO.API.Info"), code=code) data: dict[str, Any] = body.get("data", {}) self._api_cache = { name: { "path": info["path"], "minVersion": info.get("minVersion", 1), "maxVersion": info.get("maxVersion", 1), } for name, info in data.items() } logger.debug("API info cached: %d APIs available", len(self._api_cache)) return self._api_cache async def request( self, api: str, method: str, version: int | None = None, params: dict[str, Any] | None = None, *, _is_retry: bool = False, ) -> dict[str, Any]: """Make a GET request to the DSM API. Resolves the API path from the cache, injects session ID, parses the response envelope, and handles errors. On session errors (106/107/119), re-authenticates and retries once. Args: api: DSM API name (e.g. "SYNO.Docker.Project"). method: API method (e.g. "list"). version: API version. Defaults to maxVersion from API info. params: Additional query parameters. Returns: Response data dict from the "data" field of the envelope. Raises: SynologyError: On API errors. """ sys.stderr.write(f"[dsm] request: {api}/{method}\n") sys.stderr.flush() # Skip init guard if we are already inside _ensure_initialized (e.g. login call). # The API cache is populated before login, so the cache is ready at this point. if not self._initializing: await self._ensure_initialized() http = self._get_http() if api not in self._api_cache: raise SynologyError( f"API '{api}' not found. Call query_api_info() first.", code=102, ) info = self._api_cache[api] resolved_version = version if version is not None else info["maxVersion"] url = f"{self._base_url}/webapi/{info['path']}" req_params: dict[str, Any] = { "api": api, "version": str(resolved_version), "method": method, } if params: req_params.update(params) if self._sid: req_params["_sid"] = self._sid # Log with sensitive fields masked log_params = {k: ("***" if k in _SENSITIVE_PARAMS else v) for k, v in req_params.items()} retry_tag = " (retry)" if _is_retry else "" logger.debug("DSM GET%s: %s/%s v%d — %s", retry_tag, api, method, resolved_version, log_params) resp = await http.get(url, params=req_params) resp.raise_for_status() body = resp.json() if body.get("success"): data: dict[str, Any] = body.get("data") or {} logger.debug("DSM response: %s/%s — success", api, method) return data code = body.get("error", {}).get("code", 0) logger.debug("DSM response: %s/%s — error code %d", api, method, code) # Transparent re-auth on session errors (one retry only) if code in _SESSION_ERROR_CODES and not _is_retry and self._auth_manager: logger.info("Session error %d on %s/%s, re-authenticating...", code, api, method) async with self._reauth_lock: self._sid = None try: self._sid = await self._auth_manager.login(self) except Exception as e: raise SynologyError(f"Re-authentication failed: {e}", code=code) from e return await self.request(api, method, version, params, _is_retry=True) raise SynologyError(_error_message(code, api), code=code) async def upload_text( self, dest_folder: str, filename: str, content: str, *, overwrite: bool = True, ) -> dict[str, Any]: """Upload text content as a file via SYNO.FileStation.Upload. Used for writing compose files to the NAS. Args: dest_folder: Target folder path on NAS (e.g. "/volume1/docker/myapp"). filename: Name for the uploaded file. content: Text content to upload. overwrite: Whether to overwrite existing file. Returns: Response data dict. """ api = "SYNO.FileStation.Upload" await self._ensure_initialized() http = self._get_http() if api not in self._api_cache: raise SynologyError(f"API '{api}' not found. Call query_api_info() first.", code=102) info = self._api_cache[api] resolved_version = min(info["maxVersion"], 2) # Pin to v2 url = f"{self._base_url}/webapi/{info['path']}" form_data: dict[str, str] = { "api": api, "version": str(resolved_version), "method": "upload", "path": dest_folder, "overwrite": str(overwrite).lower(), "create_parents": "true", } query_params: dict[str, str] = {} if self._sid: query_params["_sid"] = self._sid logger.debug("DSM POST: %s/upload v%d — path=%s filename=%s", api, resolved_version, dest_folder, filename) encoded = content.encode("utf-8") resp = await http.post( url, params=query_params, data=form_data, files={"file": (filename, encoded, "text/plain")}, timeout=httpx.Timeout(60.0), ) resp.raise_for_status() body = resp.json() if body.get("success"): return body.get("data") or {} code = body.get("error", {}).get("code", 0) raise SynologyError(_error_message(code, api), code=code) async def download_text(self, path: str) -> str: """Download a text file from the NAS via SYNO.FileStation.Download. Args: path: Full file path on NAS. Returns: File content as string. """ api = "SYNO.FileStation.Download" await self._ensure_initialized() http = self._get_http() if api not in self._api_cache: raise SynologyError(f"API '{api}' not found. Call query_api_info() first.", code=102) info = self._api_cache[api] resolved_version = info["maxVersion"] url = f"{self._base_url}/webapi/{info['path']}" params: dict[str, str] = { "api": api, "version": str(resolved_version), "method": "download", "path": path, "mode": "download", } if self._sid: params["_sid"] = self._sid log_params = {k: ("***" if k in _SENSITIVE_PARAMS else v) for k, v in params.items()} logger.debug("DSM GET: %s/download v%d — %s", api, resolved_version, log_params) resp = await http.get(url, params=params, timeout=httpx.Timeout(60.0)) resp.raise_for_status() content_type = resp.headers.get("content-type", "") if "application/json" in content_type: body = resp.json() code = body.get("error", {}).get("code", 0) raise SynologyError(_error_message(code, api), code=code) return resp.text