feat: implement auth infrastructure and first two FileStation tools

- auth.py: AuthManager with OS keyring, env var fallback, 2FA device token flow
- config.py: AppConfig/ConnectionConfig dataclasses, YAML load/save, env overrides
- client.py: FileStationClient with lazy init, session re-auth, upload/download
- cli.py: setup / check / serve subcommands (anyio.run throughout)
- server.py: create_server factory wiring FastMCP to FileStation tools
- tools/filestation.py: list_shares and list_dir with ASCII table output,
  pagination hints, input validation, DSM error mapping
- tests: 30 unit tests, all passing (auth, config, tools)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-14 08:23:34 +02:00
parent 9fc5a3d68c
commit 59301ae760
10 changed files with 2945 additions and 8 deletions
@@ -1,2 +1,212 @@
"""FileStation MCP tool registrations."""
# Implementation pending approval — see SPEC.md
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_filestation.client import FileStationClient
from mcp_synology_filestation.config import AppConfig
logger = logging.getLogger(__name__)
# Valid sort_by values accepted by SYNO.FileStation.List::list
_VALID_SORT_BY = frozenset(
{"name", "size", "user", "group", "mtime", "atime", "crtime", "posix", "type"}
)
_VALID_SORT_DIR = frozenset({"asc", "desc"})
# Cap on items returned by list_dir — DSM hard limit is 10000, we enforce lower.
_MAX_LIMIT = 500
def _fmt_size(size: int | None) -> str:
"""Format a byte count as a human-readable string."""
if size is None:
return "-"
for unit in ("B", "KB", "MB", "GB", "TB"):
if size < 1024:
return f"{size:.0f} {unit}"
size /= 1024 # type: ignore[assignment]
return f"{size:.1f} PB"
def _fmt_time(epoch: int | None) -> str:
"""Format a Unix timestamp as a local date-time string."""
if epoch is None:
return "-"
import datetime
return datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M")
def register_filestation(
mcp: FastMCP,
config: AppConfig, # noqa: ARG001 — kept for consistency with other modules
client: FileStationClient,
) -> None:
"""Register all FileStation tools with the MCP server.
Args:
mcp: FastMCP server instance.
config: Application configuration (reserved for future use).
client: FileStationClient for DSM API calls.
"""
@mcp.tool()
async def list_shares() -> str:
"""List all shared folders visible to the authenticated user.
Returns a formatted table with share name, path, and volume status.
"""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.List",
"list_share",
params={"additional": json.dumps(["real_path", "volume_status"])},
)
except SynologyError as e:
return f"Error: {e}"
shares: list[dict] = data.get("shares", [])
if not shares:
return "No shared folders found."
# Build table manually with consistent column widths
rows = []
for share in shares:
name = share.get("name", "")
path = share.get("additional", {}).get("real_path") or share.get("path", "")
vol = share.get("additional", {}).get("volume_status", {})
total = vol.get("totalspace")
used = vol.get("usedspace")
if total and used:
pct = used / total * 100
usage = f"{_fmt_size(used)} / {_fmt_size(total)} ({pct:.0f}%)"
else:
usage = "-"
rows.append((name, path, usage))
# Column widths
w_name = max(len("Share"), *(len(r[0]) for r in rows))
w_path = max(len("Path"), *(len(r[1]) for r in rows))
w_usage = max(len("Usage"), *(len(r[2]) for r in rows))
sep = f"+{'-' * (w_name + 2)}+{'-' * (w_path + 2)}+{'-' * (w_usage + 2)}+"
header = f"| {'Share':<{w_name}} | {'Path':<{w_path}} | {'Usage':<{w_usage}} |"
lines = [sep, header, sep]
for name, path, usage in rows:
lines.append(f"| {name:<{w_name}} | {path:<{w_path}} | {usage:<{w_usage}} |")
lines.append(sep)
lines.append(f"\n{len(shares)} share(s) found.")
return "\n".join(lines)
@mcp.tool()
async def list_dir(
path: str,
offset: int = 0,
limit: int = 100,
sort_by: str = "name",
sort_direction: str = "asc",
) -> str:
"""List the contents of a directory on the NAS.
Args:
path: Absolute path on the NAS (e.g. "/volume1/data").
offset: Number of items to skip (for pagination).
limit: Maximum items to return (1-500, default 100).
sort_by: Sort field — one of: name, size, user, group, mtime, atime,
crtime, posix, type.
sort_direction: "asc" or "desc".
Returns:
Formatted table with name, type, size, and modification time,
plus the total item count for pagination context.
"""
from mcp_synology_filestation.client import SynologyError
# Validate inputs
if sort_by not in _VALID_SORT_BY:
return f"Error: sort_by must be one of {sorted(_VALID_SORT_BY)}"
if sort_direction not in _VALID_SORT_DIR:
return "Error: sort_direction must be 'asc' or 'desc'"
limit = max(1, min(limit, _MAX_LIMIT))
offset = max(0, offset)
try:
data = await client.request(
"SYNO.FileStation.List",
"list",
params={
"folder_path": path,
"offset": offset,
"limit": limit,
"sort_by": sort_by,
"sort_direction": sort_direction,
"additional": json.dumps(["real_path", "size", "time", "perm", "type"]),
},
)
except SynologyError as e:
return f"Error: {e}"
files: list[dict] = data.get("files", [])
total: int = data.get("total", len(files))
if not files:
return f"Directory '{path}' is empty (or does not exist)."
# Build table
rows = []
for f in files:
name = f.get("name", "")
is_dir = f.get("isdir", False)
ftype = "dir" if is_dir else "file"
add = f.get("additional", {})
size_val = add.get("size")
size_str = "-" if is_dir else _fmt_size(size_val)
mtime = add.get("time", {}).get("mtime")
mtime_str = _fmt_time(mtime)
rows.append((name, ftype, size_str, mtime_str))
w_name = max(len("Name"), *(len(r[0]) for r in rows))
w_type = max(len("Type"), *(len(r[1]) for r in rows))
w_size = max(len("Size"), *(len(r[2]) for r in rows))
w_mtime = max(len("Modified"), *(len(r[3]) for r in rows))
sep = (
f"+{'-' * (w_name + 2)}"
f"+{'-' * (w_type + 2)}"
f"+{'-' * (w_size + 2)}"
f"+{'-' * (w_mtime + 2)}+"
)
header = (
f"| {'Name':<{w_name}} "
f"| {'Type':<{w_type}} "
f"| {'Size':<{w_size}} "
f"| {'Modified':<{w_mtime}} |"
)
lines = [f"Path: {path}", sep, header, sep]
for name, ftype, size_str, mtime_str in rows:
lines.append(
f"| {name:<{w_name}} "
f"| {ftype:<{w_type}} "
f"| {size_str:<{w_size}} "
f"| {mtime_str:<{w_mtime}} |"
)
lines.append(sep)
end = offset + len(files)
lines.append(f"\nShowing {offset + 1}{end} of {total} item(s).")
if end < total:
lines.append(f"Use offset={end} to fetch the next page.")
return "\n".join(lines)