feat: add create_folder, rename, copy, move, delete, upload tools

All path/name params are json.dumps-wrapped per confirmed DSM behaviour.
copy and move use async polling via a shared _poll_task helper
(exponential backoff 200ms→2s, 60s timeout). delete requires
confirmed=True; without it only a preview is returned and no DSM call
is made. upload decodes base64 and enforces a 50 MB cap.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-14 10:28:32 +02:00
parent 544cfb8b06
commit 80ac894165
3 changed files with 677 additions and 1 deletions
@@ -6,7 +6,7 @@ import asyncio
import contextlib
import json
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
@@ -59,6 +59,51 @@ def register_filestation(
client: FileStationClient for DSM API calls.
"""
# ── internal polling helper ───────────────────────────────────────────
async def _poll_task(api: str, version: int, taskid: str) -> tuple[bool, dict[str, Any] | str]:
"""Poll a DSM async task (CopyMove / Delete) until finished or timeout.
Args:
api: DSM API name (e.g. "SYNO.FileStation.CopyMove").
version: API version to use for the status call.
taskid: Task ID returned by the corresponding start method.
Returns:
``(True, status_dict)`` on success, or ``(False, "Error: …")`` on
DSM error or timeout.
"""
from mcp_synology_filestation.client import SynologyError as _SynologyError
delay = 0.2
elapsed = 0.0
timeout = 60.0
while True:
await asyncio.sleep(delay)
elapsed += delay
try:
status_data = await client.request(
api,
"status",
version=version,
params={"taskid": taskid},
)
except _SynologyError as e:
return False, f"Error: {e}"
if status_data.get("finished"):
return True, status_data
if elapsed >= timeout:
return (
False,
"Error: Operation timed out after 60 seconds — check NAS manually.",
)
delay = min(delay * 2, 2.0)
@mcp.tool()
async def list_shares() -> str:
"""List all shared folders visible to the authenticated user.
@@ -492,3 +537,262 @@ def register_filestation(
lines.append(f"\n{len(rows)} item(s).")
return "\n".join(lines)
# ── write tools ───────────────────────────────────────────────────────
@mcp.tool()
async def create_folder(
path: str,
name: str,
create_parents: bool = False,
) -> str:
"""Create a new folder on the NAS.
Args:
path: Parent directory path (e.g. "/docker").
name: New folder name — not a full path (e.g. "my-app").
create_parents: Create missing intermediate parent directories
if True (default False).
Returns:
Full path of the created folder, or an Error: message.
"""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.CreateFolder",
"create",
params={
"folder_path": json.dumps(path),
"name": json.dumps(name),
"force_parent": "true" if create_parents else "false",
},
)
except SynologyError as e:
return f"Error: {e}"
folders = data.get("folders", [])
created_path = folders[0].get("path", f"{path}/{name}") if folders else f"{path}/{name}"
return f"Created: {created_path}"
@mcp.tool()
async def rename(path: str, new_name: str) -> str:
"""Rename a file or folder on the NAS.
Args:
path: Absolute share-relative path to the item
(e.g. "/docker/old-name.yaml").
new_name: New name — not a full path (e.g. "new-name.yaml").
Returns:
New absolute path after rename, or an Error: message.
"""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.Rename",
"rename",
params={
"path": json.dumps(path),
"name": json.dumps(new_name),
},
)
except SynologyError as e:
return f"Error: {e}"
files = data.get("files", [])
parent = path.rsplit("/", 1)[0] or "/"
new_path = files[0].get("path", f"{parent}/{new_name}") if files else f"{parent}/{new_name}"
return f"Renamed to: {new_path}"
@mcp.tool()
async def copy(src: str, dst: str, overwrite: bool = False) -> str:
"""Copy a file or folder to a new location on the NAS.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing item at the destination.
Args:
src: Source absolute path (e.g. "/docker/app/compose.yaml").
dst: Destination directory path (e.g. "/backup/docker").
overwrite: Replace existing item at destination (default False).
Returns:
Destination path on success, or an Error: message.
"""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.CopyMove",
"start",
version=3,
params={
"path": json.dumps(src),
"dest_folder_path": json.dumps(dst),
"overwrite": "true" if overwrite else "false",
"remove_src": "false",
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.CopyMove", 3, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest_folder = status.get("dest_folder_path", dst)
filename = src.rsplit("/", 1)[-1]
return f"Copied to: {dest_folder}/{filename}"
@mcp.tool()
async def move(src: str, dst: str, overwrite: bool = False) -> str:
"""Move a file or folder to a new location on the NAS.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing item at the destination.
Args:
src: Source absolute path (e.g. "/docker/app/old-compose.yaml").
dst: Destination directory path (e.g. "/backup/docker").
overwrite: Replace existing item at destination (default False).
Returns:
Destination path on success, or an Error: message.
"""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.CopyMove",
"start",
version=3,
params={
"path": json.dumps(src),
"dest_folder_path": json.dumps(dst),
"overwrite": "true" if overwrite else "false",
"remove_src": "true",
},
)
except SynologyError as e:
return f"Error: {e}"
taskid = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.CopyMove", 3, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest_folder = status.get("dest_folder_path", dst)
filename = src.rsplit("/", 1)[-1]
return f"Moved to: {dest_folder}/{filename}"
@mcp.tool()
async def delete(path: str, confirmed: bool = False) -> str:
"""Delete a file or folder on the NAS.
WARNING: This operation is irreversible. Without confirmed=True,
returns only a preview — no changes are made.
Args:
path: Absolute share-relative path to delete.
confirmed: Must be True to actually delete. Defaults to False
(preview only — no DSM call).
Returns:
Preview message if confirmed=False; success or Error: message
if confirmed=True.
"""
from mcp_synology_filestation.client import SynologyError
if not confirmed:
return (
f"Would permanently delete: {path}\n"
"This cannot be undone. Pass confirmed=True to proceed."
)
try:
start_data = await client.request(
"SYNO.FileStation.Delete",
"start",
params={
"path": json.dumps(path),
"recursive": "true",
"accurate_progress": "false",
},
)
except SynologyError as e:
return f"Error: {e}"
taskid = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Delete", 2, taskid)
if not ok:
return result # type: ignore[return-value]
return f"Deleted: {path}"
@mcp.tool()
async def upload(
path: str,
filename: str,
content_base64: str,
overwrite: bool = False,
create_parents: bool = True,
) -> str:
"""Upload a file to a directory on the NAS from base64-encoded content.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing file.
Args:
path: Destination directory path on the NAS (e.g. "/docker/app").
filename: Filename to create (e.g. "compose.yaml").
content_base64: Base64-encoded file content.
overwrite: Replace existing file at destination (default False).
create_parents: Create missing parent directories (default True).
Returns:
Full path of the uploaded file, or an Error: message.
"""
import base64
from mcp_synology_filestation.client import SynologyError
max_upload_bytes = 50 * 1024 * 1024 # 50 MB
try:
content = base64.b64decode(content_base64)
except Exception:
return "Error: content_base64 is not valid base64."
if len(content) > max_upload_bytes:
return (
f"Error: File is {_fmt_size(len(content))}, which exceeds the 50 MB limit "
"for MCP uploads. Use SFTP or another file-transfer method instead."
)
try:
await client.upload_bytes(
dest_folder=path,
filename=filename,
content=content,
overwrite=overwrite,
create_parents=create_parents,
)
except SynologyError as e:
return f"Error: {e}"
return f"Uploaded: {path}/{filename}"