Files
mcp-synology-filestation/src/mcp_synology_filestation/tools/filestation.py
T
marcus 4bf655236d fix: treat DSM 599 as task-not-ready in _poll_task, poll until 60s timeout
DirSize/MD5 return error 599 while the async task is still initialising on
the NAS, not only after the task is gone. Remove the 5-consecutive-599 abort
limit and the debug stderr logging; instead pass on 599 and keep polling
until the existing 60 s timeout fires. Rename the test that checked the old
limit to reflect the new timeout-based behaviour.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:19:32 +02:00

934 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FileStation MCP tool registrations."""
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_filestation.client import FileStationClient
from mcp_synology_filestation.config import AppConfig
logger = logging.getLogger(__name__)
# Valid sort_by values accepted by SYNO.FileStation.List::list
_VALID_SORT_BY = frozenset(
{"name", "size", "user", "group", "mtime", "atime", "crtime", "posix", "type"}
)
_VALID_SORT_DIR = frozenset({"asc", "desc"})
# Cap on items returned by list_dir — DSM hard limit is 10000, we enforce lower.
_MAX_LIMIT = 500
def _fmt_size(size: int | None) -> str:
"""Format a byte count as a human-readable string."""
if size is None:
return "-"
for unit in ("B", "KB", "MB", "GB", "TB"):
if size < 1024:
return f"{size:.0f} {unit}"
size /= 1024 # type: ignore[assignment]
return f"{size:.1f} PB"
def _fmt_time(epoch: int | None) -> str:
"""Format a Unix timestamp as a local date-time string."""
if epoch is None:
return "-"
import datetime
return datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M")
def register_filestation(
mcp: FastMCP,
config: AppConfig, # noqa: ARG001 — kept for consistency with other modules
client: FileStationClient,
) -> None:
"""Register all FileStation tools with the MCP server.
Args:
mcp: FastMCP server instance.
config: Application configuration (reserved for future use).
client: FileStationClient for DSM API calls.
"""
# ── internal polling helper ───────────────────────────────────────────
async def _poll_task(
api: str,
version: int,
taskid: str,
initial_delay: float = 0.2,
) -> tuple[bool, dict[str, Any] | str]:
"""Poll a DSM async task until finished or timeout.
Args:
api: DSM API name (e.g. "SYNO.FileStation.CopyMove").
version: API version to use for the status call.
taskid: Task ID returned by the corresponding start method.
initial_delay: Seconds to wait before the first status poll.
Set to 0.0 for tasks that may finish before the first poll
interval (e.g. DirSize on small directories, MD5 on small files).
Returns:
``(True, status_dict)`` on success, or ``(False, "Error: …")`` on
DSM error or timeout.
"""
from mcp_synology_filestation.client import SynologyError as _SynologyError
delay = 0.2
elapsed = initial_delay
timeout = 60.0
if initial_delay > 0:
await asyncio.sleep(initial_delay)
while True:
try:
status_data = await client.request(
api,
"status",
version=version,
params={"taskid": taskid},
)
except _SynologyError as e:
if e.code == 599:
# DSM returns 599 while the async task is still initialising
# or running (task-not-yet-available). Treat it the same as
# finished=False and keep polling until the 60 s timeout.
pass
else:
return False, f"Error: {e}"
else:
if status_data.get("finished"):
return True, status_data
if elapsed >= timeout:
return (
False,
"Error: Operation timed out after 60 seconds — check NAS manually.",
)
await asyncio.sleep(delay)
elapsed += delay
delay = min(delay * 2, 2.0)
@mcp.tool()
async def list_shares():
"""List all shared folders. Returns name/path/volume-usage table."""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.List",
"list_share",
params={"additional": json.dumps(["volume_status"])},
)
except SynologyError as e:
return f"Error: {e}"
shares: list[dict] = data.get("shares", [])
if not shares:
return "No shared folders found."
# Build table manually with consistent column widths
rows = []
for share in shares:
name = share.get("name", "")
path = share.get("path", f"/{name}")
vol = share.get("additional", {}).get("volume_status", {})
total = vol.get("totalspace")
used = vol.get("usedspace")
if total and used:
pct = used / total * 100
usage = f"{_fmt_size(used)} / {_fmt_size(total)} ({pct:.0f}%)"
else:
usage = "-"
rows.append((name, path, usage))
# Column widths
w_name = max(len("Share"), *(len(r[0]) for r in rows))
w_path = max(len("Path"), *(len(r[1]) for r in rows))
w_usage = max(len("Usage"), *(len(r[2]) for r in rows))
sep = f"+{'-' * (w_name + 2)}+{'-' * (w_path + 2)}+{'-' * (w_usage + 2)}+"
header = f"| {'Share':<{w_name}} | {'Path':<{w_path}} | {'Usage':<{w_usage}} |"
lines = [sep, header, sep]
for name, path, usage in rows:
lines.append(f"| {name:<{w_name}} | {path:<{w_path}} | {usage:<{w_usage}} |")
lines.append(sep)
lines.append(f"\n{len(shares)} share(s) found.")
return "\n".join(lines)
@mcp.tool()
async def list_dir(
path: str,
offset: int = 0,
limit: int = 100,
sort_by: str = "name",
sort_direction: str = "asc",
):
"""List directory contents. path: share-relative (e.g. /docker).
offset/limit for pagination, sort_by/sort_direction for ordering."""
from mcp_synology_filestation.client import SynologyError
# Validate inputs
if sort_by not in _VALID_SORT_BY:
return f"Error: sort_by must be one of {sorted(_VALID_SORT_BY)}"
if sort_direction not in _VALID_SORT_DIR:
return "Error: sort_direction must be 'asc' or 'desc'"
limit = max(1, min(limit, _MAX_LIMIT))
offset = max(0, offset)
try:
data = await client.request(
"SYNO.FileStation.List",
"list",
params={
"folder_path": path,
"offset": offset,
"limit": limit,
"sort_by": sort_by,
"sort_direction": sort_direction,
"additional": json.dumps(["size", "time"]),
},
)
except SynologyError as e:
return f"Error: {e}"
files: list[dict] = data.get("files", [])
total: int = data.get("total", len(files))
if not files:
return f"Directory '{path}' is empty (or does not exist)."
# Build table
rows = []
for f in files:
name = f.get("name", "")
is_dir = f.get("isdir", False)
ftype = "dir" if is_dir else "file"
add = f.get("additional", {})
size_str = "-" if is_dir else _fmt_size(add.get("size"))
mtime_str = _fmt_time(add.get("time", {}).get("mtime"))
rows.append((name, ftype, size_str, mtime_str))
w_name = max(len("Name"), *(len(r[0]) for r in rows))
w_type = max(len("Type"), *(len(r[1]) for r in rows))
w_size = max(len("Size"), *(len(r[2]) for r in rows))
w_mtime = max(len("Modified"), *(len(r[3]) for r in rows))
sep = (
f"+{'-' * (w_name + 2)}"
f"+{'-' * (w_type + 2)}"
f"+{'-' * (w_size + 2)}"
f"+{'-' * (w_mtime + 2)}+"
)
header = (
f"| {'Name':<{w_name}} "
f"| {'Type':<{w_type}} "
f"| {'Size':<{w_size}} "
f"| {'Modified':<{w_mtime}} |"
)
lines = [f"Path: {path}", sep, header, sep]
for name, ftype, size_str, mtime_str in rows:
lines.append(
f"| {name:<{w_name}} "
f"| {ftype:<{w_type}} "
f"| {size_str:<{w_size}} "
f"| {mtime_str:<{w_mtime}} |"
)
lines.append(sep)
end = offset + len(files)
lines.append(f"\nShowing {offset + 1}{end} of {total} item(s).")
if end < total:
lines.append(f"Use offset={end} to fetch the next page.")
return "\n".join(lines)
@mcp.tool()
async def search(
path: str,
pattern: str,
recursive: bool = True,
max_results: int = 200,
):
"""Search files by glob pattern under path. pattern: e.g. "*.yaml".
recursive/max_results optional."""
from mcp_synology_filestation.client import SynologyError
limit = max(1, min(max_results, 1000))
# 1. Start search task
try:
start_data = await client.request(
"SYNO.FileStation.Search",
"start",
params={
"folder_path": path,
"recursive": "true" if recursive else "false",
"pattern": pattern,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a search task ID."
# 2. Poll until finished=True (exponential backoff: 200 ms → 2 s, timeout 60 s)
delay = 0.2
elapsed = 0.0
timeout = 60.0
files: list[dict] = []
while True:
await asyncio.sleep(delay)
elapsed += delay
try:
poll_data = await client.request(
"SYNO.FileStation.Search",
"list",
params={
"taskid": taskid,
"offset": 0,
"limit": limit,
"additional": json.dumps(["size", "time"]),
},
)
except SynologyError as e:
# Best-effort cleanup before surfacing the error
with contextlib.suppress(SynologyError):
await client.request(
"SYNO.FileStation.Search", "clean", params={"taskid": taskid}
)
return f"Error: {e}"
current_files: list[dict] = poll_data.get("files", [])
if current_files:
files = current_files # keep the last non-empty result set
finished: bool = poll_data.get("finished", False)
if finished:
break
if elapsed >= timeout:
with contextlib.suppress(SynologyError):
await client.request(
"SYNO.FileStation.Search", "clean", params={"taskid": taskid}
)
return "Error: Search timed out after 60 seconds."
delay = min(delay * 2, 2.0)
# 3. Clean up the search task
with contextlib.suppress(SynologyError):
await client.request("SYNO.FileStation.Search", "clean", params={"taskid": taskid})
if not files:
return f"No files matching '{pattern}' found under '{path}'."
# 4. Format results
rows = []
for f in files:
item_path = f.get("path") or f.get("name", "")
is_dir = f.get("isdir", False)
ftype = "dir" if is_dir else "file"
add = f.get("additional", {})
size_str = "-" if is_dir else _fmt_size(add.get("size"))
mtime_str = _fmt_time((add.get("time") or {}).get("mtime"))
rows.append((item_path, ftype, size_str, mtime_str))
w_path = max(len("Path"), *(len(r[0]) for r in rows))
w_type = max(len("Type"), *(len(r[1]) for r in rows))
w_size = max(len("Size"), *(len(r[2]) for r in rows))
w_mtime = max(len("Modified"), *(len(r[3]) for r in rows))
sep = (
f"+{'-' * (w_path + 2)}"
f"+{'-' * (w_type + 2)}"
f"+{'-' * (w_size + 2)}"
f"+{'-' * (w_mtime + 2)}+"
)
header = (
f"| {'Path':<{w_path}} "
f"| {'Type':<{w_type}} "
f"| {'Size':<{w_size}} "
f"| {'Modified':<{w_mtime}} |"
)
lines = [f"Search: '{pattern}' under '{path}'", sep, header, sep]
for item_path, ftype, size_str, mtime_str in rows:
lines.append(
f"| {item_path:<{w_path}} "
f"| {ftype:<{w_type}} "
f"| {size_str:<{w_size}} "
f"| {mtime_str:<{w_mtime}} |"
)
lines.append(sep)
lines.append(f"\n{len(rows)} match(es) found.")
return "\n".join(lines)
@mcp.tool()
async def download(path: str):
"""Download a file as base64 (max 10 MB). path: share-relative.
Returns JSON {filename, size, content_base64}."""
import base64
from mcp_synology_filestation.client import SynologyError
max_download_bytes = 10 * 1024 * 1024 # 10 MB
try:
filename, content = await client.download_bytes(path)
except SynologyError as e:
return f"Error: {e}"
size = len(content)
if size > max_download_bytes:
return (
f"Error: File '{filename}' is {_fmt_size(size)}, which exceeds the 10 MB limit "
"for MCP downloads. Use SFTP or another file-transfer method instead."
)
return json.dumps(
{
"filename": filename,
"size": size,
"content_base64": base64.b64encode(content).decode(),
}
)
@mcp.tool()
async def get_info(path: str):
"""Get metadata (type/size/owner/permissions/timestamps) for one or more paths.
path: comma-separated share-relative paths."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
try:
data = await client.request(
"SYNO.FileStation.List",
"getinfo",
params={
"path": json.dumps(paths),
"additional": json.dumps(
["real_path", "size", "time", "perm", "owner", "type"]
),
},
)
except SynologyError as e:
return f"Error: {e}"
files: list[dict] = data.get("files", [])
if not files:
return "No information returned for the given path(s)."
rows = []
for f in files:
item_path = f.get("path") or f.get("name", "")
is_dir = f.get("isdir", False)
ftype = "dir" if is_dir else "file"
add = f.get("additional", {})
size_str = "-" if is_dir else _fmt_size(add.get("size"))
mtime_str = _fmt_time((add.get("time") or {}).get("mtime"))
crtime_str = _fmt_time((add.get("time") or {}).get("crtime"))
owner_info = add.get("owner") or {}
owner = owner_info.get("user", "-")
group = owner_info.get("group", "-")
perm_info = add.get("perm") or {}
posix = perm_info.get("posix", 0)
perm_str = oct(posix)[2:] if posix else "-"
real_path = (add.get("real_path") or "").strip() or "-"
rows.append(
(
item_path,
ftype,
size_str,
owner,
group,
perm_str,
mtime_str,
crtime_str,
real_path,
)
)
headers = (
"Path",
"Type",
"Size",
"Owner",
"Group",
"Perm",
"Modified",
"Created",
"Real path",
)
col_widths = [max(len(h), *(len(r[i]) for r in rows)) for i, h in enumerate(headers)]
def _sep() -> str:
return "+" + "+".join("-" * (w + 2) for w in col_widths) + "+"
def _row(vals: tuple[str, ...]) -> str:
return "| " + " | ".join(f"{v:<{col_widths[i]}}" for i, v in enumerate(vals)) + " |"
lines = [_sep(), _row(headers), _sep()]
for r in rows:
lines.append(_row(r))
lines.append(_sep())
lines.append(f"\n{len(rows)} item(s).")
return "\n".join(lines)
@mcp.tool()
async def check_exist(path: str):
"""Check if one or more paths exist. path: comma-separated share-relative paths.
Returns Yes/No table."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
try:
data = await client.request(
"SYNO.FileStation.List",
"getinfo",
params={
"path": json.dumps(paths),
"additional": json.dumps([]),
},
)
except SynologyError as e:
return f"Error: {e}"
files: list[dict] = data.get("files", [])
if not files:
return "No information returned for the given path(s)."
# A path that doesn't exist still gets an entry but with name=None
rows = [(f.get("path", ""), "Yes" if f.get("name") is not None else "No") for f in files]
w_path = max(len("Path"), *(len(r[0]) for r in rows))
w_exists = len("Exists") # "Yes" / "No" always shorter
sep = f"+{'-' * (w_path + 2)}+{'-' * (w_exists + 2)}+"
header = f"| {'Path':<{w_path}} | {'Exists':<{w_exists}} |"
lines = [sep, header, sep]
for item_path, exists_str in rows:
lines.append(f"| {item_path:<{w_path}} | {exists_str:<{w_exists}} |")
lines.append(sep)
lines.append(f"\n{len(rows)} path(s) checked.")
return "\n".join(lines)
# ── write tools ───────────────────────────────────────────────────────
@mcp.tool()
async def create_folder(
path: str,
name: str,
create_parents: bool = False,
):
"""Create a folder. path: parent dir, name: folder name (not full path).
create_parents: make missing parents."""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.CreateFolder",
"create",
params={
"folder_path": json.dumps(path),
"name": json.dumps(name),
"force_parent": "true" if create_parents else "false",
},
)
except SynologyError as e:
return f"Error: {e}"
folders = data.get("folders", [])
created_path = folders[0].get("path", f"{path}/{name}") if folders else f"{path}/{name}"
return f"Created: {created_path}"
@mcp.tool()
async def rename(path: str, new_name: str):
"""Rename a file or folder. path: current share-relative path,
new_name: bare name only (not full path)."""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.Rename",
"rename",
params={
"path": json.dumps(path),
"name": json.dumps(new_name),
},
)
except SynologyError as e:
return f"Error: {e}"
files = data.get("files", [])
parent = path.rsplit("/", 1)[0] or "/"
new_path = files[0].get("path", f"{parent}/{new_name}") if files else f"{parent}/{new_name}"
return f"Renamed to: {new_path}"
@mcp.tool()
async def copy(src: str, dst: str, overwrite: bool = False):
"""Copy src to dst directory.
WARNING: overwrite=True replaces existing items (default False)."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.CopyMove",
"start",
version=3,
params={
"path": json.dumps(src),
"dest_folder_path": json.dumps(dst),
"overwrite": "true" if overwrite else "false",
"remove_src": "false",
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.CopyMove", 3, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest_folder = status.get("dest_folder_path", dst)
filename = src.rsplit("/", 1)[-1]
return f"Copied to: {dest_folder}/{filename}"
@mcp.tool()
async def move(src: str, dst: str, overwrite: bool = False):
"""Move src to dst directory.
WARNING: overwrite=True replaces existing items (default False)."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.CopyMove",
"start",
version=3,
params={
"path": json.dumps(src),
"dest_folder_path": json.dumps(dst),
"overwrite": "true" if overwrite else "false",
"remove_src": "true",
},
)
except SynologyError as e:
return f"Error: {e}"
taskid = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.CopyMove", 3, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest_folder = status.get("dest_folder_path", dst)
filename = src.rsplit("/", 1)[-1]
return f"Moved to: {dest_folder}/{filename}"
@mcp.tool()
async def delete(path: str, confirmed: bool = False):
"""Delete a file or folder. IRREVERSIBLE.
confirmed=False (default) shows preview only; pass confirmed=True to actually delete."""
from mcp_synology_filestation.client import SynologyError
if not confirmed:
return (
f"Would permanently delete: {path}\n"
"This cannot be undone. Pass confirmed=True to proceed."
)
try:
start_data = await client.request(
"SYNO.FileStation.Delete",
"start",
params={
"path": json.dumps(path),
"recursive": "true",
"accurate_progress": "false",
},
)
except SynologyError as e:
return f"Error: {e}"
taskid = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Delete", 2, taskid)
if not ok:
return result # type: ignore[return-value]
return f"Deleted: {path}"
@mcp.tool()
async def compress(
paths: list[str],
dest_file_path: str,
level: str = "moderate",
mode: str = "add",
format: str = "zip",
password: str = "",
):
"""Compress paths into an archive. dest_file_path: full path incl. filename.
level: store/fastest/fast/normal/moderate/maximum. format: zip/7z."""
from mcp_synology_filestation.client import SynologyError
_valid_levels = {"store", "fastest", "fast", "normal", "moderate", "maximum"}
_valid_modes = {"add", "update", "refreshen"}
_valid_formats = {"zip", "7z"}
if level not in _valid_levels:
return f"Error: level must be one of {sorted(_valid_levels)}"
if mode not in _valid_modes:
return f"Error: mode must be one of {sorted(_valid_modes)}"
if format not in _valid_formats:
return f"Error: format must be one of {sorted(_valid_formats)}"
if not paths:
return "Error: paths list must not be empty."
try:
start_data = await client.request(
"SYNO.FileStation.Compress",
"start",
version=3,
params={
"path": json.dumps(paths),
"dest_file_path": json.dumps(dest_file_path),
"level": level,
"mode": mode,
"format": format,
"compress_password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Compress", 3, taskid)
if not ok:
return result # type: ignore[return-value]
return f"Compressed to: {dest_file_path}"
@mcp.tool()
async def extract(
file_path: str,
dest_folder_path: str,
overwrite: bool = False,
keep_dir: bool = True,
create_subfolder: bool = False,
password: str = "",
):
"""Extract a ZIP or 7z archive to dest_folder_path.
overwrite/keep_dir/create_subfolder/password optional."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.Extract",
"start",
version=2,
params={
"file_path": json.dumps(file_path),
"dest_folder_path": json.dumps(dest_folder_path),
"overwrite": "true" if overwrite else "false",
"keep_dir": "true" if keep_dir else "false",
"create_subfolder": "true" if create_subfolder else "false",
"codepage": "enu",
"password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Extract", 2, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest = status.get("dest_folder_path", dest_folder_path)
return f"Extracted to: {dest}"
@mcp.tool()
async def dir_size(path: str):
"""Get total size, file count and folder count for one or more directories.
path: comma-separated share-relative paths."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
try:
start_data = await client.request(
"SYNO.FileStation.DirSize",
"start",
version=2,
params={"path": json.dumps(paths)},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.DirSize", 1, taskid, initial_delay=0.0)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
num_dir = status.get("num_dir", 0)
num_file = status.get("num_file", 0)
total_size = status.get("total_size", 0)
path_label = ", ".join(paths)
w_path = max(len("Path"), len(path_label))
num_dir_str = str(num_dir)
num_file_str = str(num_file)
size_str = _fmt_size(total_size)
sep = (
f"+{'-' * (w_path + 2)}"
f"+{'-' * (max(len('Folders'), len(num_dir_str)) + 2)}"
f"+{'-' * (max(len('Files'), len(num_file_str)) + 2)}"
f"+{'-' * (max(len('Total Size'), len(size_str)) + 2)}+"
)
w_dir = max(len("Folders"), len(num_dir_str))
w_file = max(len("Files"), len(num_file_str))
w_size = max(len("Total Size"), len(size_str))
sep = (
f"+{'-' * (w_path + 2)}+{'-' * (w_dir + 2)}+{'-' * (w_file + 2)}+{'-' * (w_size + 2)}+"
)
header = (
f"| {'Path':<{w_path}} "
f"| {'Folders':<{w_dir}} "
f"| {'Files':<{w_file}} "
f"| {'Total Size':<{w_size}} |"
)
row = (
f"| {path_label:<{w_path}} "
f"| {num_dir_str:<{w_dir}} "
f"| {num_file_str:<{w_file}} "
f"| {size_str:<{w_size}} |"
)
return "\n".join([sep, header, sep, row, sep])
@mcp.tool()
async def get_md5(path: str):
"""Compute the MD5 checksum of a file on the NAS. path: share-relative file path."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.MD5",
"start",
version=2,
params={"file_path": json.dumps(path)},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.MD5", 1, taskid, initial_delay=0.0)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
md5 = status.get("md5", "")
if not md5:
return "Error: DSM returned no MD5 hash."
return f"MD5 of {path}: {md5}"
@mcp.tool()
async def upload(
path: str,
filename: str,
content_base64: str,
overwrite: bool = False,
create_parents: bool = True,
):
"""Upload base64-encoded content as filename into path (max 50 MB).
WARNING: overwrite=True replaces existing file (default False)."""
import base64
from mcp_synology_filestation.client import SynologyError
max_upload_bytes = 50 * 1024 * 1024 # 50 MB
try:
content = base64.b64decode(content_base64)
except Exception:
return "Error: content_base64 is not valid base64."
if len(content) > max_upload_bytes:
return (
f"Error: File is {_fmt_size(len(content))}, which exceeds the 50 MB limit "
"for MCP uploads. Use SFTP or another file-transfer method instead."
)
try:
await client.upload_bytes(
dest_folder=path,
filename=filename,
content=content,
overwrite=overwrite,
create_parents=create_parents,
)
except SynologyError as e:
return f"Error: {e}"
return f"Uploaded: {path}/{filename}"