"""FileStation MCP tool registrations.""" from __future__ import annotations import asyncio import contextlib import json import logging from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP from mcp_synology_filestation.client import FileStationClient from mcp_synology_filestation.config import AppConfig logger = logging.getLogger(__name__) # Valid sort_by values accepted by SYNO.FileStation.List::list _VALID_SORT_BY = frozenset( {"name", "size", "user", "group", "mtime", "atime", "crtime", "posix", "type"} ) _VALID_SORT_DIR = frozenset({"asc", "desc"}) # Cap on items returned by list_dir — DSM hard limit is 10000, we enforce lower. _MAX_LIMIT = 500 def _fmt_size(size: int | None) -> str: """Format a byte count as a human-readable string.""" if size is None: return "-" for unit in ("B", "KB", "MB", "GB", "TB"): if size < 1024: return f"{size:.0f} {unit}" size /= 1024 # type: ignore[assignment] return f"{size:.1f} PB" def _fmt_time(epoch: int | None) -> str: """Format a Unix timestamp as a local date-time string.""" if epoch is None: return "-" import datetime return datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M") def register_filestation( mcp: FastMCP, config: AppConfig, # noqa: ARG001 — kept for consistency with other modules client: FileStationClient, ) -> None: """Register all FileStation tools with the MCP server. Args: mcp: FastMCP server instance. config: Application configuration (reserved for future use). client: FileStationClient for DSM API calls. """ # ── internal polling helper ─────────────────────────────────────────── async def _poll_task( api: str, version: int, taskid: str, initial_delay: float = 0.2, ) -> tuple[bool, dict[str, Any] | str]: """Poll a DSM async task until finished or timeout. For tasks that return intermediate ``finished=False`` status while running (CopyMove, Delete, Compress, Extract, Search). Use ``_poll_oneshot`` for DirSize and MD5. Args: api: DSM API name (e.g. "SYNO.FileStation.CopyMove"). version: API version to use for the status call. taskid: Task ID returned by the corresponding start method. initial_delay: Seconds to wait before the first status poll. Returns: ``(True, status_dict)`` on success, or ``(False, "Error: …")`` on DSM error or timeout. """ from mcp_synology_filestation.client import SynologyError as _SynologyError delay = 0.2 elapsed = initial_delay timeout = 60.0 if initial_delay > 0: await asyncio.sleep(initial_delay) while True: try: status_data = await client.request( api, "status", version=version, params={"taskid": taskid}, ) except _SynologyError as e: if e.code == 599: pass # task not yet visible — keep polling else: return False, f"Error: {e}" else: if status_data.get("finished"): return True, status_data if elapsed >= timeout: return ( False, "Error: Operation timed out after 60 seconds — check NAS manually.", ) await asyncio.sleep(delay) elapsed += delay delay = min(delay * 2, 2.0) async def _poll_oneshot( api: str, version: int, taskid: str, first_status: dict[str, Any] | None, ) -> tuple[bool, dict[str, Any] | str]: """Continue polling a one-shot DSM task after the first status poll. Called after ``client.start_and_poll_immediately`` has already made the first status request. Handles three outcomes for ``first_status``: * ``finished=True`` — return immediately (task done on first poll). * ``finished=False`` — task confirmed running; enter Phase 2 (exponential backoff until ``finished=True`` or 60 s timeout). * ``None`` (first poll returned 599) — burst-retry 10× at 10 ms, then enter Phase 2 regardless (large directories will eventually return ``finished=False``; a 599 after the task was seen alive means the window closed — fail fast with a retry message). Returns: ``(True, status_dict)`` on success, or ``(False, "Error: …")`` on DSM error or timeout. """ from mcp_synology_filestation.client import SynologyError as _SynologyError seen_alive = False if first_status is not None: if first_status.get("finished"): return True, first_status seen_alive = True # finished=False: task is running else: # 599 on the immediate poll: burst-retry (10×, 10 ms apart) for _ in range(10): await asyncio.sleep(0.01) try: s = await client.request( api, "status", version=version, params={"taskid": taskid} ) except _SynologyError as e: if e.code == 599: continue return False, f"Error: {e}" if s.get("finished"): return True, s seen_alive = True break # finished=False: enter Phase 2 # ── Phase 2: exponential backoff until finished or 60 s timeout ── delay = 0.2 elapsed = 0.0 timeout = 60.0 while True: await asyncio.sleep(delay) elapsed += delay delay = min(delay * 2, 2.0) try: s = await client.request(api, "status", version=version, params={"taskid": taskid}) except _SynologyError as e: if e.code == 599: if seen_alive: # Task was running but the one-shot window closed before we read it return ( False, "Error: Could not read task result — the operation finished" " before the result was polled. Please retry.", ) # Not yet seen alive: large dir still initialising, keep polling else: return False, f"Error: {e}" else: seen_alive = True if s.get("finished"): return True, s if elapsed >= timeout: return ( False, "Error: Operation timed out after 60 seconds — check NAS manually.", ) @mcp.tool() async def list_shares(): """List all shared folders. Returns name/path/volume-usage table.""" from mcp_synology_filestation.client import SynologyError try: data = await client.request( "SYNO.FileStation.List", "list_share", params={"additional": json.dumps(["volume_status"])}, ) except SynologyError as e: return f"Error: {e}" shares: list[dict] = data.get("shares", []) if not shares: return "No shared folders found." # Build table manually with consistent column widths rows = [] for share in shares: name = share.get("name", "") path = share.get("path", f"/{name}") vol = share.get("additional", {}).get("volume_status", {}) total = vol.get("totalspace") used = vol.get("usedspace") if total and used: pct = used / total * 100 usage = f"{_fmt_size(used)} / {_fmt_size(total)} ({pct:.0f}%)" else: usage = "-" rows.append((name, path, usage)) # Column widths w_name = max(len("Share"), *(len(r[0]) for r in rows)) w_path = max(len("Path"), *(len(r[1]) for r in rows)) w_usage = max(len("Usage"), *(len(r[2]) for r in rows)) sep = f"+{'-' * (w_name + 2)}+{'-' * (w_path + 2)}+{'-' * (w_usage + 2)}+" header = f"| {'Share':<{w_name}} | {'Path':<{w_path}} | {'Usage':<{w_usage}} |" lines = [sep, header, sep] for name, path, usage in rows: lines.append(f"| {name:<{w_name}} | {path:<{w_path}} | {usage:<{w_usage}} |") lines.append(sep) lines.append(f"\n{len(shares)} share(s) found.") return "\n".join(lines) @mcp.tool() async def list_dir( path: str, offset: int = 0, limit: int = 100, sort_by: str = "name", sort_direction: str = "asc", ): """List directory contents. path: share-relative (e.g. /docker). offset/limit for pagination, sort_by/sort_direction for ordering.""" from mcp_synology_filestation.client import SynologyError # Validate inputs if sort_by not in _VALID_SORT_BY: return f"Error: sort_by must be one of {sorted(_VALID_SORT_BY)}" if sort_direction not in _VALID_SORT_DIR: return "Error: sort_direction must be 'asc' or 'desc'" limit = max(1, min(limit, _MAX_LIMIT)) offset = max(0, offset) try: data = await client.request( "SYNO.FileStation.List", "list", params={ "folder_path": path, "offset": offset, "limit": limit, "sort_by": sort_by, "sort_direction": sort_direction, "additional": json.dumps(["size", "time"]), }, ) except SynologyError as e: return f"Error: {e}" files: list[dict] = data.get("files", []) total: int = data.get("total", len(files)) if not files: return f"Directory '{path}' is empty (or does not exist)." # Build table rows = [] for f in files: name = f.get("name", "") is_dir = f.get("isdir", False) ftype = "dir" if is_dir else "file" add = f.get("additional", {}) size_str = "-" if is_dir else _fmt_size(add.get("size")) mtime_str = _fmt_time(add.get("time", {}).get("mtime")) rows.append((name, ftype, size_str, mtime_str)) w_name = max(len("Name"), *(len(r[0]) for r in rows)) w_type = max(len("Type"), *(len(r[1]) for r in rows)) w_size = max(len("Size"), *(len(r[2]) for r in rows)) w_mtime = max(len("Modified"), *(len(r[3]) for r in rows)) sep = ( f"+{'-' * (w_name + 2)}" f"+{'-' * (w_type + 2)}" f"+{'-' * (w_size + 2)}" f"+{'-' * (w_mtime + 2)}+" ) header = ( f"| {'Name':<{w_name}} " f"| {'Type':<{w_type}} " f"| {'Size':<{w_size}} " f"| {'Modified':<{w_mtime}} |" ) lines = [f"Path: {path}", sep, header, sep] for name, ftype, size_str, mtime_str in rows: lines.append( f"| {name:<{w_name}} " f"| {ftype:<{w_type}} " f"| {size_str:<{w_size}} " f"| {mtime_str:<{w_mtime}} |" ) lines.append(sep) end = offset + len(files) lines.append(f"\nShowing {offset + 1}–{end} of {total} item(s).") if end < total: lines.append(f"Use offset={end} to fetch the next page.") return "\n".join(lines) @mcp.tool() async def search( path: str, pattern: str, recursive: bool = True, max_results: int = 200, ): """Search files by glob pattern under path. pattern: e.g. "*.yaml". recursive/max_results optional.""" from mcp_synology_filestation.client import SynologyError limit = max(1, min(max_results, 1000)) # 1. Start search task try: start_data = await client.request( "SYNO.FileStation.Search", "start", params={ "folder_path": path, "recursive": "true" if recursive else "false", "pattern": pattern, }, ) except SynologyError as e: return f"Error: {e}" taskid: str = start_data.get("taskid", "") if not taskid: return "Error: DSM did not return a search task ID." # 2. Poll until finished=True (exponential backoff: 200 ms → 2 s, timeout 60 s) delay = 0.2 elapsed = 0.0 timeout = 60.0 files: list[dict] = [] while True: await asyncio.sleep(delay) elapsed += delay try: poll_data = await client.request( "SYNO.FileStation.Search", "list", params={ "taskid": taskid, "offset": 0, "limit": limit, "additional": json.dumps(["size", "time"]), }, ) except SynologyError as e: # Best-effort cleanup before surfacing the error with contextlib.suppress(SynologyError): await client.request( "SYNO.FileStation.Search", "clean", params={"taskid": taskid} ) return f"Error: {e}" current_files: list[dict] = poll_data.get("files", []) if current_files: files = current_files # keep the last non-empty result set finished: bool = poll_data.get("finished", False) if finished: break if elapsed >= timeout: with contextlib.suppress(SynologyError): await client.request( "SYNO.FileStation.Search", "clean", params={"taskid": taskid} ) return "Error: Search timed out after 60 seconds." delay = min(delay * 2, 2.0) # 3. Clean up the search task with contextlib.suppress(SynologyError): await client.request("SYNO.FileStation.Search", "clean", params={"taskid": taskid}) if not files: return f"No files matching '{pattern}' found under '{path}'." # 4. Format results rows = [] for f in files: item_path = f.get("path") or f.get("name", "") is_dir = f.get("isdir", False) ftype = "dir" if is_dir else "file" add = f.get("additional", {}) size_str = "-" if is_dir else _fmt_size(add.get("size")) mtime_str = _fmt_time((add.get("time") or {}).get("mtime")) rows.append((item_path, ftype, size_str, mtime_str)) w_path = max(len("Path"), *(len(r[0]) for r in rows)) w_type = max(len("Type"), *(len(r[1]) for r in rows)) w_size = max(len("Size"), *(len(r[2]) for r in rows)) w_mtime = max(len("Modified"), *(len(r[3]) for r in rows)) sep = ( f"+{'-' * (w_path + 2)}" f"+{'-' * (w_type + 2)}" f"+{'-' * (w_size + 2)}" f"+{'-' * (w_mtime + 2)}+" ) header = ( f"| {'Path':<{w_path}} " f"| {'Type':<{w_type}} " f"| {'Size':<{w_size}} " f"| {'Modified':<{w_mtime}} |" ) lines = [f"Search: '{pattern}' under '{path}'", sep, header, sep] for item_path, ftype, size_str, mtime_str in rows: lines.append( f"| {item_path:<{w_path}} " f"| {ftype:<{w_type}} " f"| {size_str:<{w_size}} " f"| {mtime_str:<{w_mtime}} |" ) lines.append(sep) lines.append(f"\n{len(rows)} match(es) found.") return "\n".join(lines) @mcp.tool() async def download(path: str): """Download a file as base64 (max 10 MB). path: share-relative. Returns JSON {filename, size, content_base64}.""" import base64 from mcp_synology_filestation.client import SynologyError max_download_bytes = 10 * 1024 * 1024 # 10 MB try: filename, content = await client.download_bytes(path) except SynologyError as e: return f"Error: {e}" size = len(content) if size > max_download_bytes: return ( f"Error: File '{filename}' is {_fmt_size(size)}, which exceeds the 10 MB limit " "for MCP downloads. Use SFTP or another file-transfer method instead." ) return json.dumps( { "filename": filename, "size": size, "content_base64": base64.b64encode(content).decode(), } ) @mcp.tool() async def get_info(path: str): """Get metadata (type/size/owner/permissions/timestamps) for one or more paths. path: comma-separated share-relative paths.""" from mcp_synology_filestation.client import SynologyError paths = [p.strip() for p in path.split(",") if p.strip()] if not paths: return "Error: no path provided." try: data = await client.request( "SYNO.FileStation.List", "getinfo", params={ "path": json.dumps(paths), "additional": json.dumps( ["real_path", "size", "time", "perm", "owner", "type"] ), }, ) except SynologyError as e: return f"Error: {e}" files: list[dict] = data.get("files", []) if not files: return "No information returned for the given path(s)." rows = [] for f in files: item_path = f.get("path") or f.get("name", "") is_dir = f.get("isdir", False) ftype = "dir" if is_dir else "file" add = f.get("additional", {}) size_str = "-" if is_dir else _fmt_size(add.get("size")) mtime_str = _fmt_time((add.get("time") or {}).get("mtime")) crtime_str = _fmt_time((add.get("time") or {}).get("crtime")) owner_info = add.get("owner") or {} owner = owner_info.get("user", "-") group = owner_info.get("group", "-") perm_info = add.get("perm") or {} posix = perm_info.get("posix", 0) perm_str = oct(posix)[2:] if posix else "-" real_path = (add.get("real_path") or "").strip() or "-" rows.append( ( item_path, ftype, size_str, owner, group, perm_str, mtime_str, crtime_str, real_path, ) ) headers = ( "Path", "Type", "Size", "Owner", "Group", "Perm", "Modified", "Created", "Real path", ) col_widths = [max(len(h), *(len(r[i]) for r in rows)) for i, h in enumerate(headers)] def _sep() -> str: return "+" + "+".join("-" * (w + 2) for w in col_widths) + "+" def _row(vals: tuple[str, ...]) -> str: return "| " + " | ".join(f"{v:<{col_widths[i]}}" for i, v in enumerate(vals)) + " |" lines = [_sep(), _row(headers), _sep()] for r in rows: lines.append(_row(r)) lines.append(_sep()) lines.append(f"\n{len(rows)} item(s).") return "\n".join(lines) @mcp.tool() async def check_exist(path: str): """Check if one or more paths exist. path: comma-separated share-relative paths. Returns Yes/No table.""" from mcp_synology_filestation.client import SynologyError paths = [p.strip() for p in path.split(",") if p.strip()] if not paths: return "Error: no path provided." try: data = await client.request( "SYNO.FileStation.List", "getinfo", params={ "path": json.dumps(paths), "additional": json.dumps([]), }, ) except SynologyError as e: return f"Error: {e}" files: list[dict] = data.get("files", []) if not files: return "No information returned for the given path(s)." # A path that doesn't exist still gets an entry but with name=None rows = [(f.get("path", ""), "Yes" if f.get("name") is not None else "No") for f in files] w_path = max(len("Path"), *(len(r[0]) for r in rows)) w_exists = len("Exists") # "Yes" / "No" always shorter sep = f"+{'-' * (w_path + 2)}+{'-' * (w_exists + 2)}+" header = f"| {'Path':<{w_path}} | {'Exists':<{w_exists}} |" lines = [sep, header, sep] for item_path, exists_str in rows: lines.append(f"| {item_path:<{w_path}} | {exists_str:<{w_exists}} |") lines.append(sep) lines.append(f"\n{len(rows)} path(s) checked.") return "\n".join(lines) # ── write tools ─────────────────────────────────────────────────────── @mcp.tool() async def create_folder( path: str, name: str, create_parents: bool = False, ): """Create a folder. path: parent dir, name: folder name (not full path). create_parents: make missing parents.""" from mcp_synology_filestation.client import SynologyError try: data = await client.request( "SYNO.FileStation.CreateFolder", "create", params={ "folder_path": json.dumps(path), "name": json.dumps(name), "force_parent": "true" if create_parents else "false", }, ) except SynologyError as e: return f"Error: {e}" folders = data.get("folders", []) created_path = folders[0].get("path", f"{path}/{name}") if folders else f"{path}/{name}" return f"Created: {created_path}" @mcp.tool() async def rename(path: str, new_name: str): """Rename a file or folder. path: current share-relative path, new_name: bare name only (not full path).""" from mcp_synology_filestation.client import SynologyError try: data = await client.request( "SYNO.FileStation.Rename", "rename", params={ "path": json.dumps(path), "name": json.dumps(new_name), }, ) except SynologyError as e: return f"Error: {e}" files = data.get("files", []) parent = path.rsplit("/", 1)[0] or "/" new_path = files[0].get("path", f"{parent}/{new_name}") if files else f"{parent}/{new_name}" return f"Renamed to: {new_path}" @mcp.tool() async def copy(src: str, dst: str, overwrite: bool = False): """Copy src to dst directory. WARNING: overwrite=True replaces existing items (default False).""" from mcp_synology_filestation.client import SynologyError try: start_data = await client.request( "SYNO.FileStation.CopyMove", "start", version=3, params={ "path": json.dumps(src), "dest_folder_path": json.dumps(dst), "overwrite": "true" if overwrite else "false", "remove_src": "false", }, ) except SynologyError as e: return f"Error: {e}" taskid: str = start_data.get("taskid", "") if not taskid: return "Error: DSM did not return a task ID." ok, result = await _poll_task("SYNO.FileStation.CopyMove", 3, taskid) if not ok: return result # type: ignore[return-value] status: dict[str, Any] = result # type: ignore[assignment] dest_folder = status.get("dest_folder_path", dst) filename = src.rsplit("/", 1)[-1] return f"Copied to: {dest_folder}/{filename}" @mcp.tool() async def move(src: str, dst: str, overwrite: bool = False): """Move src to dst directory. WARNING: overwrite=True replaces existing items (default False).""" from mcp_synology_filestation.client import SynologyError try: start_data = await client.request( "SYNO.FileStation.CopyMove", "start", version=3, params={ "path": json.dumps(src), "dest_folder_path": json.dumps(dst), "overwrite": "true" if overwrite else "false", "remove_src": "true", }, ) except SynologyError as e: return f"Error: {e}" taskid = start_data.get("taskid", "") if not taskid: return "Error: DSM did not return a task ID." ok, result = await _poll_task("SYNO.FileStation.CopyMove", 3, taskid) if not ok: return result # type: ignore[return-value] status: dict[str, Any] = result # type: ignore[assignment] dest_folder = status.get("dest_folder_path", dst) filename = src.rsplit("/", 1)[-1] return f"Moved to: {dest_folder}/{filename}" @mcp.tool() async def delete(path: str, confirmed: bool = False): """Delete a file or folder. IRREVERSIBLE. confirmed=False (default) shows preview only; pass confirmed=True to actually delete.""" from mcp_synology_filestation.client import SynologyError if not confirmed: return ( f"Would permanently delete: {path}\n" "This cannot be undone. Pass confirmed=True to proceed." ) try: start_data = await client.request( "SYNO.FileStation.Delete", "start", params={ "path": json.dumps(path), "recursive": "true", "accurate_progress": "false", }, ) except SynologyError as e: return f"Error: {e}" taskid = start_data.get("taskid", "") if not taskid: return "Error: DSM did not return a task ID." ok, result = await _poll_task("SYNO.FileStation.Delete", 2, taskid) if not ok: return result # type: ignore[return-value] return f"Deleted: {path}" @mcp.tool() async def compress( paths: list[str], dest_file_path: str, level: str = "moderate", mode: str = "add", format: str = "zip", password: str = "", ): """Compress paths into an archive. dest_file_path: full path incl. filename. level: store/fastest/fast/normal/moderate/maximum. format: zip/7z.""" from mcp_synology_filestation.client import SynologyError _valid_levels = {"store", "fastest", "fast", "normal", "moderate", "maximum"} _valid_modes = {"add", "update", "refreshen"} _valid_formats = {"zip", "7z"} if level not in _valid_levels: return f"Error: level must be one of {sorted(_valid_levels)}" if mode not in _valid_modes: return f"Error: mode must be one of {sorted(_valid_modes)}" if format not in _valid_formats: return f"Error: format must be one of {sorted(_valid_formats)}" if not paths: return "Error: paths list must not be empty." try: start_data = await client.request( "SYNO.FileStation.Compress", "start", version=3, params={ "path": json.dumps(paths), "dest_file_path": json.dumps(dest_file_path), "level": level, "mode": mode, "format": format, "compress_password": password, }, ) except SynologyError as e: return f"Error: {e}" taskid: str = start_data.get("taskid", "") if not taskid: return "Error: DSM did not return a task ID." ok, result = await _poll_task("SYNO.FileStation.Compress", 3, taskid) if not ok: return result # type: ignore[return-value] return f"Compressed to: {dest_file_path}" @mcp.tool() async def extract( file_path: str, dest_folder_path: str, overwrite: bool = False, keep_dir: bool = True, create_subfolder: bool = False, password: str = "", ): """Extract a ZIP or 7z archive to dest_folder_path. overwrite/keep_dir/create_subfolder/password optional.""" from mcp_synology_filestation.client import SynologyError try: start_data = await client.request( "SYNO.FileStation.Extract", "start", version=2, params={ "file_path": json.dumps(file_path), "dest_folder_path": json.dumps(dest_folder_path), "overwrite": "true" if overwrite else "false", "keep_dir": "true" if keep_dir else "false", "create_subfolder": "true" if create_subfolder else "false", "codepage": "enu", "password": password, }, ) except SynologyError as e: return f"Error: {e}" taskid: str = start_data.get("taskid", "") if not taskid: return "Error: DSM did not return a task ID." ok, result = await _poll_task("SYNO.FileStation.Extract", 2, taskid) if not ok: return result # type: ignore[return-value] status: dict[str, Any] = result # type: ignore[assignment] dest = status.get("dest_folder_path", dest_folder_path) return f"Extracted to: {dest}" @mcp.tool() async def dir_size(path: str): """Get total size, file count and folder count for one or more directories. path: comma-separated share-relative paths.""" from mcp_synology_filestation.client import SynologyError paths = [p.strip() for p in path.split(",") if p.strip()] if not paths: return "Error: no path provided." try: taskid, first_status = await client.start_and_poll_immediately( "SYNO.FileStation.DirSize", start_params={"path": json.dumps(paths)}, poll_version=1, start_version=2, ) except SynologyError as e: return f"Error: {e}" ok, result = await _poll_oneshot("SYNO.FileStation.DirSize", 1, taskid, first_status) if not ok: return result # type: ignore[return-value] status: dict[str, Any] = result # type: ignore[assignment] num_dir = status.get("num_dir", 0) num_file = status.get("num_file", 0) total_size = status.get("total_size", 0) path_label = ", ".join(paths) w_path = max(len("Path"), len(path_label)) num_dir_str = str(num_dir) num_file_str = str(num_file) size_str = _fmt_size(total_size) sep = ( f"+{'-' * (w_path + 2)}" f"+{'-' * (max(len('Folders'), len(num_dir_str)) + 2)}" f"+{'-' * (max(len('Files'), len(num_file_str)) + 2)}" f"+{'-' * (max(len('Total Size'), len(size_str)) + 2)}+" ) w_dir = max(len("Folders"), len(num_dir_str)) w_file = max(len("Files"), len(num_file_str)) w_size = max(len("Total Size"), len(size_str)) sep = ( f"+{'-' * (w_path + 2)}+{'-' * (w_dir + 2)}+{'-' * (w_file + 2)}+{'-' * (w_size + 2)}+" ) header = ( f"| {'Path':<{w_path}} " f"| {'Folders':<{w_dir}} " f"| {'Files':<{w_file}} " f"| {'Total Size':<{w_size}} |" ) row = ( f"| {path_label:<{w_path}} " f"| {num_dir_str:<{w_dir}} " f"| {num_file_str:<{w_file}} " f"| {size_str:<{w_size}} |" ) return "\n".join([sep, header, sep, row, sep]) @mcp.tool() async def get_md5(path: str): """Compute the MD5 checksum of a file on the NAS. path: share-relative file path.""" from mcp_synology_filestation.client import SynologyError try: taskid, first_status = await client.start_and_poll_immediately( "SYNO.FileStation.MD5", start_params={"file_path": json.dumps(path)}, poll_version=1, start_version=2, ) except SynologyError as e: return f"Error: {e}" ok, result = await _poll_oneshot("SYNO.FileStation.MD5", 1, taskid, first_status) if not ok: return result # type: ignore[return-value] status: dict[str, Any] = result # type: ignore[assignment] md5 = status.get("md5", "") if not md5: return "Error: DSM returned no MD5 hash." return f"MD5 of {path}: {md5}" @mcp.tool() async def upload( path: str, filename: str, content_base64: str, overwrite: bool = False, create_parents: bool = True, ): """Upload base64-encoded content as filename into path (max 50 MB). WARNING: overwrite=True replaces existing file (default False).""" import base64 from mcp_synology_filestation.client import SynologyError max_upload_bytes = 50 * 1024 * 1024 # 50 MB try: content = base64.b64decode(content_base64) except Exception: return "Error: content_base64 is not valid base64." if len(content) > max_upload_bytes: return ( f"Error: File is {_fmt_size(len(content))}, which exceeds the 50 MB limit " "for MCP uploads. Use SFTP or another file-transfer method instead." ) try: await client.upload_bytes( dest_folder=path, filename=filename, content=content, overwrite=overwrite, create_parents=create_parents, ) except SynologyError as e: return f"Error: {e}" return f"Uploaded: {path}/{filename}"