Files
mcp-synology-filestation/src/mcp_synology_filestation/tools/filestation.py
T

213 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FileStation MCP tool registrations."""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_filestation.client import FileStationClient
from mcp_synology_filestation.config import AppConfig
logger = logging.getLogger(__name__)
# Valid sort_by values accepted by SYNO.FileStation.List::list
_VALID_SORT_BY = frozenset(
{"name", "size", "user", "group", "mtime", "atime", "crtime", "posix", "type"}
)
_VALID_SORT_DIR = frozenset({"asc", "desc"})
# Cap on items returned by list_dir — DSM hard limit is 10000, we enforce lower.
_MAX_LIMIT = 500
def _fmt_size(size: int | None) -> str:
"""Format a byte count as a human-readable string."""
if size is None:
return "-"
for unit in ("B", "KB", "MB", "GB", "TB"):
if size < 1024:
return f"{size:.0f} {unit}"
size /= 1024 # type: ignore[assignment]
return f"{size:.1f} PB"
def _fmt_time(epoch: int | None) -> str:
"""Format a Unix timestamp as a local date-time string."""
if epoch is None:
return "-"
import datetime
return datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M")
def register_filestation(
mcp: FastMCP,
config: AppConfig, # noqa: ARG001 — kept for consistency with other modules
client: FileStationClient,
) -> None:
"""Register all FileStation tools with the MCP server.
Args:
mcp: FastMCP server instance.
config: Application configuration (reserved for future use).
client: FileStationClient for DSM API calls.
"""
@mcp.tool()
async def list_shares() -> str:
"""List all shared folders visible to the authenticated user.
Returns a formatted table with share name, path, and volume status.
"""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.List",
"list_share",
params={"additional": json.dumps(["real_path", "volume_status"])},
)
except SynologyError as e:
return f"Error: {e}"
shares: list[dict] = data.get("shares", [])
if not shares:
return "No shared folders found."
# Build table manually with consistent column widths
rows = []
for share in shares:
name = share.get("name", "")
path = share.get("additional", {}).get("real_path") or share.get("path", "")
vol = share.get("additional", {}).get("volume_status", {})
total = vol.get("totalspace")
used = vol.get("usedspace")
if total and used:
pct = used / total * 100
usage = f"{_fmt_size(used)} / {_fmt_size(total)} ({pct:.0f}%)"
else:
usage = "-"
rows.append((name, path, usage))
# Column widths
w_name = max(len("Share"), *(len(r[0]) for r in rows))
w_path = max(len("Path"), *(len(r[1]) for r in rows))
w_usage = max(len("Usage"), *(len(r[2]) for r in rows))
sep = f"+{'-' * (w_name + 2)}+{'-' * (w_path + 2)}+{'-' * (w_usage + 2)}+"
header = f"| {'Share':<{w_name}} | {'Path':<{w_path}} | {'Usage':<{w_usage}} |"
lines = [sep, header, sep]
for name, path, usage in rows:
lines.append(f"| {name:<{w_name}} | {path:<{w_path}} | {usage:<{w_usage}} |")
lines.append(sep)
lines.append(f"\n{len(shares)} share(s) found.")
return "\n".join(lines)
@mcp.tool()
async def list_dir(
path: str,
offset: int = 0,
limit: int = 100,
sort_by: str = "name",
sort_direction: str = "asc",
) -> str:
"""List the contents of a directory on the NAS.
Args:
path: Absolute path on the NAS (e.g. "/volume1/data").
offset: Number of items to skip (for pagination).
limit: Maximum items to return (1-500, default 100).
sort_by: Sort field — one of: name, size, user, group, mtime, atime,
crtime, posix, type.
sort_direction: "asc" or "desc".
Returns:
Formatted table with name, type, size, and modification time,
plus the total item count for pagination context.
"""
from mcp_synology_filestation.client import SynologyError
# Validate inputs
if sort_by not in _VALID_SORT_BY:
return f"Error: sort_by must be one of {sorted(_VALID_SORT_BY)}"
if sort_direction not in _VALID_SORT_DIR:
return "Error: sort_direction must be 'asc' or 'desc'"
limit = max(1, min(limit, _MAX_LIMIT))
offset = max(0, offset)
try:
data = await client.request(
"SYNO.FileStation.List",
"list",
params={
"folder_path": path,
"offset": offset,
"limit": limit,
"sort_by": sort_by,
"sort_direction": sort_direction,
"additional": json.dumps(["time"]), # DEBUG: only "time"
},
)
except SynologyError as e:
return f"Error: {e}"
files: list[dict] = data.get("files", [])
total: int = data.get("total", len(files))
if not files:
return f"Directory '{path}' is empty (or does not exist)."
# Build table
rows = []
for f in files:
name = f.get("name", "")
is_dir = f.get("isdir", False)
ftype = "dir" if is_dir else "file"
add = f.get("additional", {})
size_val = add.get("size")
size_str = "-" if is_dir else _fmt_size(size_val)
mtime = add.get("time", {}).get("mtime")
mtime_str = _fmt_time(mtime)
rows.append((name, ftype, size_str, mtime_str))
w_name = max(len("Name"), *(len(r[0]) for r in rows))
w_type = max(len("Type"), *(len(r[1]) for r in rows))
w_size = max(len("Size"), *(len(r[2]) for r in rows))
w_mtime = max(len("Modified"), *(len(r[3]) for r in rows))
sep = (
f"+{'-' * (w_name + 2)}"
f"+{'-' * (w_type + 2)}"
f"+{'-' * (w_size + 2)}"
f"+{'-' * (w_mtime + 2)}+"
)
header = (
f"| {'Name':<{w_name}} "
f"| {'Type':<{w_type}} "
f"| {'Size':<{w_size}} "
f"| {'Modified':<{w_mtime}} |"
)
lines = [f"Path: {path}", sep, header, sep]
for name, ftype, size_str, mtime_str in rows:
lines.append(
f"| {name:<{w_name}} "
f"| {ftype:<{w_type}} "
f"| {size_str:<{w_size}} "
f"| {mtime_str:<{w_mtime}} |"
)
lines.append(sep)
end = offset + len(files)
lines.append(f"\nShowing {offset + 1}{end} of {total} item(s).")
if end < total:
lines.append(f"Use offset={end} to fetch the next page.")
return "\n".join(lines)