213 lines
7.1 KiB
Python
213 lines
7.1 KiB
Python
"""FileStation MCP tool registrations."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from typing import TYPE_CHECKING
|
||
|
||
if TYPE_CHECKING:
|
||
from mcp.server.fastmcp import FastMCP
|
||
|
||
from mcp_synology_filestation.client import FileStationClient
|
||
from mcp_synology_filestation.config import AppConfig
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Valid sort_by values accepted by SYNO.FileStation.List::list
|
||
_VALID_SORT_BY = frozenset(
|
||
{"name", "size", "user", "group", "mtime", "atime", "crtime", "posix", "type"}
|
||
)
|
||
_VALID_SORT_DIR = frozenset({"asc", "desc"})
|
||
|
||
# Cap on items returned by list_dir — DSM hard limit is 10000, we enforce lower.
|
||
_MAX_LIMIT = 500
|
||
|
||
|
||
def _fmt_size(size: int | None) -> str:
|
||
"""Format a byte count as a human-readable string."""
|
||
if size is None:
|
||
return "-"
|
||
for unit in ("B", "KB", "MB", "GB", "TB"):
|
||
if size < 1024:
|
||
return f"{size:.0f} {unit}"
|
||
size /= 1024 # type: ignore[assignment]
|
||
return f"{size:.1f} PB"
|
||
|
||
|
||
def _fmt_time(epoch: int | None) -> str:
|
||
"""Format a Unix timestamp as a local date-time string."""
|
||
if epoch is None:
|
||
return "-"
|
||
import datetime
|
||
|
||
return datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M")
|
||
|
||
|
||
def register_filestation(
|
||
mcp: FastMCP,
|
||
config: AppConfig, # noqa: ARG001 — kept for consistency with other modules
|
||
client: FileStationClient,
|
||
) -> None:
|
||
"""Register all FileStation tools with the MCP server.
|
||
|
||
Args:
|
||
mcp: FastMCP server instance.
|
||
config: Application configuration (reserved for future use).
|
||
client: FileStationClient for DSM API calls.
|
||
"""
|
||
|
||
@mcp.tool()
|
||
async def list_shares() -> str:
|
||
"""List all shared folders visible to the authenticated user.
|
||
|
||
Returns a formatted table with share name, path, and volume status.
|
||
"""
|
||
from mcp_synology_filestation.client import SynologyError
|
||
|
||
try:
|
||
data = await client.request(
|
||
"SYNO.FileStation.List",
|
||
"list_share",
|
||
params={"additional": json.dumps(["real_path", "volume_status"])},
|
||
)
|
||
except SynologyError as e:
|
||
return f"Error: {e}"
|
||
|
||
shares: list[dict] = data.get("shares", [])
|
||
if not shares:
|
||
return "No shared folders found."
|
||
|
||
# Build table manually with consistent column widths
|
||
rows = []
|
||
for share in shares:
|
||
name = share.get("name", "")
|
||
path = share.get("additional", {}).get("real_path") or share.get("path", "")
|
||
vol = share.get("additional", {}).get("volume_status", {})
|
||
total = vol.get("totalspace")
|
||
used = vol.get("usedspace")
|
||
if total and used:
|
||
pct = used / total * 100
|
||
usage = f"{_fmt_size(used)} / {_fmt_size(total)} ({pct:.0f}%)"
|
||
else:
|
||
usage = "-"
|
||
rows.append((name, path, usage))
|
||
|
||
# Column widths
|
||
w_name = max(len("Share"), *(len(r[0]) for r in rows))
|
||
w_path = max(len("Path"), *(len(r[1]) for r in rows))
|
||
w_usage = max(len("Usage"), *(len(r[2]) for r in rows))
|
||
|
||
sep = f"+{'-' * (w_name + 2)}+{'-' * (w_path + 2)}+{'-' * (w_usage + 2)}+"
|
||
header = f"| {'Share':<{w_name}} | {'Path':<{w_path}} | {'Usage':<{w_usage}} |"
|
||
|
||
lines = [sep, header, sep]
|
||
for name, path, usage in rows:
|
||
lines.append(f"| {name:<{w_name}} | {path:<{w_path}} | {usage:<{w_usage}} |")
|
||
lines.append(sep)
|
||
lines.append(f"\n{len(shares)} share(s) found.")
|
||
|
||
return "\n".join(lines)
|
||
|
||
@mcp.tool()
|
||
async def list_dir(
|
||
path: str,
|
||
offset: int = 0,
|
||
limit: int = 100,
|
||
sort_by: str = "name",
|
||
sort_direction: str = "asc",
|
||
) -> str:
|
||
"""List the contents of a directory on the NAS.
|
||
|
||
Args:
|
||
path: Absolute path on the NAS (e.g. "/volume1/data").
|
||
offset: Number of items to skip (for pagination).
|
||
limit: Maximum items to return (1-500, default 100).
|
||
sort_by: Sort field — one of: name, size, user, group, mtime, atime,
|
||
crtime, posix, type.
|
||
sort_direction: "asc" or "desc".
|
||
|
||
Returns:
|
||
Formatted table with name, type, size, and modification time,
|
||
plus the total item count for pagination context.
|
||
"""
|
||
from mcp_synology_filestation.client import SynologyError
|
||
|
||
# Validate inputs
|
||
if sort_by not in _VALID_SORT_BY:
|
||
return f"Error: sort_by must be one of {sorted(_VALID_SORT_BY)}"
|
||
if sort_direction not in _VALID_SORT_DIR:
|
||
return "Error: sort_direction must be 'asc' or 'desc'"
|
||
limit = max(1, min(limit, _MAX_LIMIT))
|
||
offset = max(0, offset)
|
||
|
||
try:
|
||
data = await client.request(
|
||
"SYNO.FileStation.List",
|
||
"list",
|
||
params={
|
||
"folder_path": path,
|
||
"offset": offset,
|
||
"limit": limit,
|
||
"sort_by": sort_by,
|
||
"sort_direction": sort_direction,
|
||
"additional": json.dumps(["size", "time"]), # DEBUG: size + time
|
||
},
|
||
)
|
||
except SynologyError as e:
|
||
return f"Error: {e}"
|
||
|
||
files: list[dict] = data.get("files", [])
|
||
total: int = data.get("total", len(files))
|
||
|
||
if not files:
|
||
return f"Directory '{path}' is empty (or does not exist)."
|
||
|
||
# Build table
|
||
rows = []
|
||
for f in files:
|
||
name = f.get("name", "")
|
||
is_dir = f.get("isdir", False)
|
||
ftype = "dir" if is_dir else "file"
|
||
add = f.get("additional", {})
|
||
size_val = add.get("size")
|
||
size_str = "-" if is_dir else _fmt_size(size_val)
|
||
mtime = add.get("time", {}).get("mtime")
|
||
mtime_str = _fmt_time(mtime)
|
||
rows.append((name, ftype, size_str, mtime_str))
|
||
|
||
w_name = max(len("Name"), *(len(r[0]) for r in rows))
|
||
w_type = max(len("Type"), *(len(r[1]) for r in rows))
|
||
w_size = max(len("Size"), *(len(r[2]) for r in rows))
|
||
w_mtime = max(len("Modified"), *(len(r[3]) for r in rows))
|
||
|
||
sep = (
|
||
f"+{'-' * (w_name + 2)}"
|
||
f"+{'-' * (w_type + 2)}"
|
||
f"+{'-' * (w_size + 2)}"
|
||
f"+{'-' * (w_mtime + 2)}+"
|
||
)
|
||
header = (
|
||
f"| {'Name':<{w_name}} "
|
||
f"| {'Type':<{w_type}} "
|
||
f"| {'Size':<{w_size}} "
|
||
f"| {'Modified':<{w_mtime}} |"
|
||
)
|
||
|
||
lines = [f"Path: {path}", sep, header, sep]
|
||
for name, ftype, size_str, mtime_str in rows:
|
||
lines.append(
|
||
f"| {name:<{w_name}} "
|
||
f"| {ftype:<{w_type}} "
|
||
f"| {size_str:<{w_size}} "
|
||
f"| {mtime_str:<{w_mtime}} |"
|
||
)
|
||
lines.append(sep)
|
||
|
||
end = offset + len(files)
|
||
lines.append(f"\nShowing {offset + 1}–{end} of {total} item(s).")
|
||
if end < total:
|
||
lines.append(f"Use offset={end} to fetch the next page.")
|
||
|
||
return "\n".join(lines)
|