15 Commits

Author SHA1 Message Date
marcus 62f8e41931 fix: _poll_oneshot for DirSize/MD5 with burst-retry on early 599
Add FileStationClient.start_and_poll_immediately: starts the async task and
immediately makes the first status poll within the same method, with no
intermediate awaits other than the two HTTP calls. This minimises scheduler
latency between start and first poll for one-shot tasks.

_poll_oneshot now accepts the first_status from start_and_poll_immediately:
- finished=True on first poll → return immediately
- finished=False → Phase 2 (exponential backoff, 60 s timeout)
- None (first poll was 599) → burst-retry 10× at 10 ms, then Phase 2
  (Phase 2 keeps polling through 599 until seen_alive, then fails fast)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:50:28 +02:00
marcus 0e8ffaa6df fix: _poll_oneshot for DirSize/MD5 with burst-retry on early 599
Replace _poll_task for one-shot tasks with _poll_oneshot, which uses two
phases: (1) a burst of up to 11 immediate polls at 50ms intervals to catch
tasks that complete in <500ms, and (2) exponential-backoff polling once
finished=False is observed. A 599 during burst → window missed (fail fast).
A 599 during Phase 2 (task was seen running) → same. _poll_task is
simplified back to a plain long-poll with no window_timeout logic.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:38:42 +02:00
marcus 6510493930 fix: window_timeout for one-shot tasks, drop debug stderr logging
_poll_task now accepts window_timeout. For DirSize and MD5 (one-shot result
windows), if only 599 errors arrive for window_timeout seconds without ever
seeing the task alive (finished=False), return a fast "result window missed —
please retry" error instead of waiting the full 60 s. Tasks that return
finished=False at least once (large dirs, large files) are unaffected.

Also removes the stale [dsm] debug stderr.write left in client.request().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:30:20 +02:00
marcus 4bf655236d fix: treat DSM 599 as task-not-ready in _poll_task, poll until 60s timeout
DirSize/MD5 return error 599 while the async task is still initialising on
the NAS, not only after the task is gone. Remove the 5-consecutive-599 abort
limit and the debug stderr logging; instead pass on 599 and keep polling
until the existing 60 s timeout fires. Rename the test that checked the old
limit to reflect the new timeout-based behaviour.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:19:32 +02:00
marcus c0d4c347c5 debug: add stderr logging to _poll_task for every status poll attempt
Logs [poll] lines to stderr so Claude Desktop's MCP log shows exactly
what DSM returns on each status call: attempt number, elapsed time,
finished flag, data keys (on success) or error code + message (on 599).

Version 0.2.3 — remove this logging once the 599 root cause is confirmed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:05:37 +02:00
marcus e3fa71b458 fix: retry _poll_task on transient 599 instead of aborting immediately
DirSize for large directories (e.g. /docker, 8441 folders, 46832 files)
takes ~800ms to compute. While running, status returns intermediate
progress (finished=false). But on the very first poll the task can return
599 transiently (task just started, not yet available). Previously
_poll_task caught any SynologyError and returned immediately, making
dir_size always fail on the first 599.

Fix: treat 599 as a transient condition and continue polling. Give up
only after 5 consecutive 599 responses. All other error codes remain
immediately fatal.

Investigation confirmed with test_dirsize_md5.py:
- /test-mcp (2937 B): finished=true at 0ms
- /docker (3.9 GB, 46832 files): finished=false at 35ms, finished=true at 789ms

Tests: 2 new cases (retry-succeeds, 5x-599-gives-up) → 95 total

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:01:27 +02:00
marcus 4d8eae752d chore: bump version to 0.2.1
Includes all DirSize/MD5 polling fixes:
- initial_delay=0.0 for DirSize and MD5 (poll immediately after start)
- MD5 status uses version=1 (v2 always returns 599 on this NAS)
- __version__ kept in sync with pyproject.toml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:52:42 +02:00
marcus c923da6f6a fix: sync __version__ to 0.2.0 to match pyproject.toml
__init__.py was still at 0.1.0 after the pyproject.toml bump in an
earlier commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:50:59 +02:00
marcus d8d7c6fd47 test: update dirsize/md5 probe with path-variant comparison results
Three path formats tested against NAS:
  a) plain string        -> 599 (wrong, task fails silently)
  b) json.dumps([path])  -> works (JSON array is the correct format)
  c) json.dumps(path)    -> 599 (wrong, double-encoded string)

Confirms current implementation (json.dumps(paths)) is correct.
MD5 probe simplified to final confirmed settings (status v1, 0ms).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:39:37 +02:00
marcus 4145d929a6 fix: correct DirSize/MD5 polling — initial_delay=0 and MD5 status v1
Live NAS investigation (test_dirsize_md5.py) revealed two bugs:

1. _poll_task() always slept 200ms before the first status call.
   DirSize and MD5 complete near-instantly on small data, so the result
   window closes before the first poll. Fix: add initial_delay parameter
   (default 0.2s for CopyMove/Delete/Compress/Extract); DirSize and MD5
   pass initial_delay=0.0 to poll immediately after start.

2. get_md5 used status version=2, but the NAS only serves the result on
   status v1 (v2 always returns 599 regardless of timing). Fix: change
   _poll_task version to 1 for SYNO.FileStation.MD5.

MD5 is one-shot: the result is consumed on the first successful status
read. Polling at 0ms ensures we catch it before it expires.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:35:43 +02:00
marcus 04caaef003 fix: use correct status API versions for DirSize and MD5
- DirSize: _poll_task now uses status version=1 (v2 always returns 599)
- MD5: _poll_task keeps status version=2 (confirmed working via live NAS test)

Investigation notes documented in test_dirsize_md5.py:
both APIs use start v2; DirSize status needs v1, MD5 status needs v2;
tiny data causes one-shot race condition (no issue with real-world data).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:24:02 +02:00
marcus 1d0cf940b4 feat: add dir_size and get_md5 tools
dir_size: SYNO.FileStation.DirSize v2 — async scan of one or more
directories, returns folder/file count and total size as a table.
get_md5: SYNO.FileStation.MD5 v2 — async MD5 checksum of a file.
Both follow the existing _poll_task pattern. 9 new unit tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:10:51 +02:00
marcus fc706fb809 perf: shorten all tool docstrings to reduce tools/list payload
Reduced tools/list JSON payload from ~45 KB to 5.7 KB by replacing
verbose multi-paragraph docstrings with 1-2 line summaries on all
14 @mcp.tool() functions. Fixes Claude Desktop truncation of tools/list.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 11:58:11 +02:00
marcus 500dc73324 fix: remove -> str return annotations from all mcp.tool() functions
FastMCP generates an outputSchema from the return type annotation, which
inflates the tools/list payload and triggers truncation in Claude Desktop.
Dropping the annotations suppresses outputSchema generation entirely.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 11:49:04 +02:00
marcus 473c771c20 feat: add compress and extract tools
Implements SYNO.FileStation.Compress (v3) and SYNO.FileStation.Extract (v2)
with async polling identical to copy/move. Includes input validation for
compress (level, mode, format, empty paths) and 11 new unit tests.
Bumps version to 0.2.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 11:26:08 +02:00
7 changed files with 1059 additions and 178 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "mcp-synology-filestation"
version = "0.1.0"
version = "0.2.7"
description = "MCP server for Synology FileStation"
requires-python = ">=3.12"
dependencies = [
+1 -1
View File
@@ -1,3 +1,3 @@
"""MCP server for Synology FileStation."""
__version__ = "0.1.0"
__version__ = "0.2.7"
+46 -2
View File
@@ -247,8 +247,6 @@ class FileStationClient:
Raises:
SynologyError: On API errors.
"""
sys.stderr.write(f"[dsm] request: {api}/{method}\n")
sys.stderr.flush()
if not self._initializing:
await self._ensure_initialized()
http = self._get_http()
@@ -309,6 +307,52 @@ class FileStationClient:
raise SynologyError(_error_message(code, api), code=code)
async def start_and_poll_immediately(
self,
api: str,
start_params: dict[str, Any],
poll_version: int,
*,
start_version: int | None = None,
) -> tuple[str, dict[str, Any] | None]:
"""Start a DSM async task and immediately make the first status poll.
Designed for one-shot tasks (DirSize, MD5) where the result window
may close quickly. Both the ``start`` and the first ``status`` request
are issued inside this single method with no intermediate awaits other
than the HTTP calls themselves, minimising scheduler latency.
Args:
api: DSM API name (e.g. "SYNO.FileStation.DirSize").
start_params: Query parameters for the ``start`` call.
poll_version: API version to use for the ``status`` call.
start_version: API version for the ``start`` call (defaults to
``maxVersion`` from the API info cache).
Returns:
``(taskid, status_data)`` where ``status_data`` is ``None`` if
the first status poll returned 599 (task not yet visible).
Raises:
SynologyError: If the ``start`` call fails, the response contains
no task ID, or the ``status`` call fails with a non-599 error.
"""
start_data = await self.request(api, "start", version=start_version, params=start_params)
taskid: str = start_data.get("taskid", "")
if not taskid:
raise SynologyError("DSM did not return a task ID.", code=0)
try:
status_data = await self.request(
api, "status", version=poll_version, params={"taskid": taskid}
)
except SynologyError as e:
if e.code == 599:
return taskid, None
raise
return taskid, status_data
async def download_bytes(self, path: str) -> tuple[str, bytes]:
"""Download a file from the NAS via SYNO.FileStation.Download.
+325 -173
View File
@@ -61,13 +61,23 @@ def register_filestation(
# ── internal polling helper ───────────────────────────────────────────
async def _poll_task(api: str, version: int, taskid: str) -> tuple[bool, dict[str, Any] | str]:
"""Poll a DSM async task (CopyMove / Delete) until finished or timeout.
async def _poll_task(
api: str,
version: int,
taskid: str,
initial_delay: float = 0.2,
) -> tuple[bool, dict[str, Any] | str]:
"""Poll a DSM async task until finished or timeout.
For tasks that return intermediate ``finished=False`` status while
running (CopyMove, Delete, Compress, Extract, Search). Use
``_poll_oneshot`` for DirSize and MD5.
Args:
api: DSM API name (e.g. "SYNO.FileStation.CopyMove").
version: API version to use for the status call.
taskid: Task ID returned by the corresponding start method.
initial_delay: Seconds to wait before the first status poll.
Returns:
``(True, status_dict)`` on success, or ``(False, "Error: …")`` on
@@ -76,13 +86,13 @@ def register_filestation(
from mcp_synology_filestation.client import SynologyError as _SynologyError
delay = 0.2
elapsed = 0.0
elapsed = initial_delay
timeout = 60.0
while True:
await asyncio.sleep(delay)
elapsed += delay
if initial_delay > 0:
await asyncio.sleep(initial_delay)
while True:
try:
status_data = await client.request(
api,
@@ -91,10 +101,13 @@ def register_filestation(
params={"taskid": taskid},
)
except _SynologyError as e:
return False, f"Error: {e}"
if status_data.get("finished"):
return True, status_data
if e.code == 599:
pass # task not yet visible — keep polling
else:
return False, f"Error: {e}"
else:
if status_data.get("finished"):
return True, status_data
if elapsed >= timeout:
return (
@@ -102,14 +115,96 @@ def register_filestation(
"Error: Operation timed out after 60 seconds — check NAS manually.",
)
await asyncio.sleep(delay)
elapsed += delay
delay = min(delay * 2, 2.0)
@mcp.tool()
async def list_shares() -> str:
"""List all shared folders visible to the authenticated user.
async def _poll_oneshot(
api: str,
version: int,
taskid: str,
first_status: dict[str, Any] | None,
) -> tuple[bool, dict[str, Any] | str]:
"""Continue polling a one-shot DSM task after the first status poll.
Returns a formatted table with share name, path, and volume status.
Called after ``client.start_and_poll_immediately`` has already made
the first status request. Handles three outcomes for ``first_status``:
* ``finished=True`` — return immediately (task done on first poll).
* ``finished=False`` — task confirmed running; enter Phase 2
(exponential backoff until ``finished=True`` or 60 s timeout).
* ``None`` (first poll returned 599) — burst-retry 10× at 10 ms,
then enter Phase 2 regardless (large directories will eventually
return ``finished=False``; a 599 after the task was seen alive
means the window closed — fail fast with a retry message).
Returns:
``(True, status_dict)`` on success, or ``(False, "Error: …")``
on DSM error or timeout.
"""
from mcp_synology_filestation.client import SynologyError as _SynologyError
seen_alive = False
if first_status is not None:
if first_status.get("finished"):
return True, first_status
seen_alive = True # finished=False: task is running
else:
# 599 on the immediate poll: burst-retry (10×, 10 ms apart)
for _ in range(10):
await asyncio.sleep(0.01)
try:
s = await client.request(
api, "status", version=version, params={"taskid": taskid}
)
except _SynologyError as e:
if e.code == 599:
continue
return False, f"Error: {e}"
if s.get("finished"):
return True, s
seen_alive = True
break # finished=False: enter Phase 2
# ── Phase 2: exponential backoff until finished or 60 s timeout ──
delay = 0.2
elapsed = 0.0
timeout = 60.0
while True:
await asyncio.sleep(delay)
elapsed += delay
delay = min(delay * 2, 2.0)
try:
s = await client.request(api, "status", version=version, params={"taskid": taskid})
except _SynologyError as e:
if e.code == 599:
if seen_alive:
# Task was running but the one-shot window closed before we read it
return (
False,
"Error: Could not read task result — the operation finished"
" before the result was polled. Please retry.",
)
# Not yet seen alive: large dir still initialising, keep polling
else:
return False, f"Error: {e}"
else:
seen_alive = True
if s.get("finished"):
return True, s
if elapsed >= timeout:
return (
False,
"Error: Operation timed out after 60 seconds — check NAS manually.",
)
@mcp.tool()
async def list_shares():
"""List all shared folders. Returns name/path/volume-usage table."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -163,24 +258,9 @@ def register_filestation(
limit: int = 100,
sort_by: str = "name",
sort_direction: str = "asc",
) -> str:
"""List the contents of a directory on the NAS.
Use share paths as returned by list_shares (e.g. "/dev", "/data"),
not volume paths (e.g. "/volume1/dev" will not work).
Args:
path: Share-relative path on the NAS (e.g. "/dev" or "/data/photos").
offset: Number of items to skip (for pagination).
limit: Maximum items to return (1-500, default 100).
sort_by: Sort field — one of: name, size, user, group, mtime, atime,
crtime, posix, type.
sort_direction: "asc" or "desc".
Returns:
Formatted table with name and type, plus the total item count
for pagination context.
"""
):
"""List directory contents. path: share-relative (e.g. /docker).
offset/limit for pagination, sort_by/sort_direction for ordering."""
from mcp_synology_filestation.client import SynologyError
# Validate inputs
@@ -265,22 +345,9 @@ def register_filestation(
pattern: str,
recursive: bool = True,
max_results: int = 200,
) -> str:
"""Search for files matching a glob pattern within a directory.
Starts an async DSM search task, polls until complete, then cleans up.
Use share paths as returned by list_shares (e.g. "/docker").
Args:
path: Root directory to search from (e.g. "/docker").
pattern: Filename glob pattern (e.g. "*.yaml", "report*.pdf").
recursive: Search subdirectories (default True).
max_results: Maximum number of matches to return (default 200, max 1000).
Returns:
Formatted table with path, type, size, and modification time,
plus total match count.
"""
):
"""Search files by glob pattern under path. pattern: e.g. "*.yaml".
recursive/max_results optional."""
from mcp_synology_filestation.client import SynologyError
limit = max(1, min(max_results, 1000))
@@ -399,18 +466,9 @@ def register_filestation(
return "\n".join(lines)
@mcp.tool()
async def download(path: str) -> str:
"""Download a single file from the NAS and return its content as base64.
Files larger than 10 MB are rejected — use SFTP or another method instead.
Use share paths as returned by list_shares (e.g. "/docker/app/config.yaml").
Args:
path: Absolute share-relative path to the file on the NAS.
Returns:
JSON object with "filename", "size" (bytes), and "content_base64".
"""
async def download(path: str):
"""Download a file as base64 (max 10 MB). path: share-relative.
Returns JSON {filename, size, content_base64}."""
import base64
from mcp_synology_filestation.client import SynologyError
@@ -438,20 +496,9 @@ def register_filestation(
)
@mcp.tool()
async def get_info(path: str) -> str:
"""Get detailed metadata for one or more files or folders on the NAS.
Accepts a single path or a comma-separated list of paths.
Use share paths as returned by list_shares (e.g. "/dev/file.txt").
Args:
path: One or more share-relative paths, comma-separated
(e.g. "/dev/notes.txt" or "/dev/notes.txt,/data/photo.jpg").
Returns:
Formatted table with type, size, owner, permissions, and timestamps
for each requested path.
"""
async def get_info(path: str):
"""Get metadata (type/size/owner/permissions/timestamps) for one or more paths.
path: comma-separated share-relative paths."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
@@ -539,23 +586,9 @@ def register_filestation(
return "\n".join(lines)
@mcp.tool()
async def check_exist(path: str) -> str:
"""Check whether one or more files or folders exist on the NAS.
Accepts a single path or a comma-separated list of paths.
Use share paths as returned by list_shares (e.g. "/dev/file.txt").
Note: SYNO.FileStation.CheckExist returns error 400 on this firmware for all
parameter formats. This tool falls back to SYNO.FileStation.List::getinfo, which
returns an entry per path with name=None when the path does not exist.
Args:
path: One or more share-relative paths, comma-separated
(e.g. "/dev/notes.txt" or "/dev/notes.txt,/data/photo.jpg").
Returns:
Formatted table with each path and whether it exists (Yes / No).
"""
async def check_exist(path: str):
"""Check if one or more paths exist. path: comma-separated share-relative paths.
Returns Yes/No table."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
@@ -602,18 +635,9 @@ def register_filestation(
path: str,
name: str,
create_parents: bool = False,
) -> str:
"""Create a new folder on the NAS.
Args:
path: Parent directory path (e.g. "/docker").
name: New folder name — not a full path (e.g. "my-app").
create_parents: Create missing intermediate parent directories
if True (default False).
Returns:
Full path of the created folder, or an Error: message.
"""
):
"""Create a folder. path: parent dir, name: folder name (not full path).
create_parents: make missing parents."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -634,17 +658,9 @@ def register_filestation(
return f"Created: {created_path}"
@mcp.tool()
async def rename(path: str, new_name: str) -> str:
"""Rename a file or folder on the NAS.
Args:
path: Absolute share-relative path to the item
(e.g. "/docker/old-name.yaml").
new_name: New name — not a full path (e.g. "new-name.yaml").
Returns:
New absolute path after rename, or an Error: message.
"""
async def rename(path: str, new_name: str):
"""Rename a file or folder. path: current share-relative path,
new_name: bare name only (not full path)."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -665,20 +681,9 @@ def register_filestation(
return f"Renamed to: {new_path}"
@mcp.tool()
async def copy(src: str, dst: str, overwrite: bool = False) -> str:
"""Copy a file or folder to a new location on the NAS.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing item at the destination.
Args:
src: Source absolute path (e.g. "/docker/app/compose.yaml").
dst: Destination directory path (e.g. "/backup/docker").
overwrite: Replace existing item at destination (default False).
Returns:
Destination path on success, or an Error: message.
"""
async def copy(src: str, dst: str, overwrite: bool = False):
"""Copy src to dst directory.
WARNING: overwrite=True replaces existing items (default False)."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -710,20 +715,9 @@ def register_filestation(
return f"Copied to: {dest_folder}/{filename}"
@mcp.tool()
async def move(src: str, dst: str, overwrite: bool = False) -> str:
"""Move a file or folder to a new location on the NAS.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing item at the destination.
Args:
src: Source absolute path (e.g. "/docker/app/old-compose.yaml").
dst: Destination directory path (e.g. "/backup/docker").
overwrite: Replace existing item at destination (default False).
Returns:
Destination path on success, or an Error: message.
"""
async def move(src: str, dst: str, overwrite: bool = False):
"""Move src to dst directory.
WARNING: overwrite=True replaces existing items (default False)."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -755,21 +749,9 @@ def register_filestation(
return f"Moved to: {dest_folder}/{filename}"
@mcp.tool()
async def delete(path: str, confirmed: bool = False) -> str:
"""Delete a file or folder on the NAS.
WARNING: This operation is irreversible. Without confirmed=True,
returns only a preview — no changes are made.
Args:
path: Absolute share-relative path to delete.
confirmed: Must be True to actually delete. Defaults to False
(preview only — no DSM call).
Returns:
Preview message if confirmed=False; success or Error: message
if confirmed=True.
"""
async def delete(path: str, confirmed: bool = False):
"""Delete a file or folder. IRREVERSIBLE.
confirmed=False (default) shows preview only; pass confirmed=True to actually delete."""
from mcp_synology_filestation.client import SynologyError
if not confirmed:
@@ -801,6 +783,189 @@ def register_filestation(
return f"Deleted: {path}"
@mcp.tool()
async def compress(
paths: list[str],
dest_file_path: str,
level: str = "moderate",
mode: str = "add",
format: str = "zip",
password: str = "",
):
"""Compress paths into an archive. dest_file_path: full path incl. filename.
level: store/fastest/fast/normal/moderate/maximum. format: zip/7z."""
from mcp_synology_filestation.client import SynologyError
_valid_levels = {"store", "fastest", "fast", "normal", "moderate", "maximum"}
_valid_modes = {"add", "update", "refreshen"}
_valid_formats = {"zip", "7z"}
if level not in _valid_levels:
return f"Error: level must be one of {sorted(_valid_levels)}"
if mode not in _valid_modes:
return f"Error: mode must be one of {sorted(_valid_modes)}"
if format not in _valid_formats:
return f"Error: format must be one of {sorted(_valid_formats)}"
if not paths:
return "Error: paths list must not be empty."
try:
start_data = await client.request(
"SYNO.FileStation.Compress",
"start",
version=3,
params={
"path": json.dumps(paths),
"dest_file_path": json.dumps(dest_file_path),
"level": level,
"mode": mode,
"format": format,
"compress_password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Compress", 3, taskid)
if not ok:
return result # type: ignore[return-value]
return f"Compressed to: {dest_file_path}"
@mcp.tool()
async def extract(
file_path: str,
dest_folder_path: str,
overwrite: bool = False,
keep_dir: bool = True,
create_subfolder: bool = False,
password: str = "",
):
"""Extract a ZIP or 7z archive to dest_folder_path.
overwrite/keep_dir/create_subfolder/password optional."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.Extract",
"start",
version=2,
params={
"file_path": json.dumps(file_path),
"dest_folder_path": json.dumps(dest_folder_path),
"overwrite": "true" if overwrite else "false",
"keep_dir": "true" if keep_dir else "false",
"create_subfolder": "true" if create_subfolder else "false",
"codepage": "enu",
"password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Extract", 2, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest = status.get("dest_folder_path", dest_folder_path)
return f"Extracted to: {dest}"
@mcp.tool()
async def dir_size(path: str):
"""Get total size, file count and folder count for one or more directories.
path: comma-separated share-relative paths."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
try:
taskid, first_status = await client.start_and_poll_immediately(
"SYNO.FileStation.DirSize",
start_params={"path": json.dumps(paths)},
poll_version=1,
start_version=2,
)
except SynologyError as e:
return f"Error: {e}"
ok, result = await _poll_oneshot("SYNO.FileStation.DirSize", 1, taskid, first_status)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
num_dir = status.get("num_dir", 0)
num_file = status.get("num_file", 0)
total_size = status.get("total_size", 0)
path_label = ", ".join(paths)
w_path = max(len("Path"), len(path_label))
num_dir_str = str(num_dir)
num_file_str = str(num_file)
size_str = _fmt_size(total_size)
sep = (
f"+{'-' * (w_path + 2)}"
f"+{'-' * (max(len('Folders'), len(num_dir_str)) + 2)}"
f"+{'-' * (max(len('Files'), len(num_file_str)) + 2)}"
f"+{'-' * (max(len('Total Size'), len(size_str)) + 2)}+"
)
w_dir = max(len("Folders"), len(num_dir_str))
w_file = max(len("Files"), len(num_file_str))
w_size = max(len("Total Size"), len(size_str))
sep = (
f"+{'-' * (w_path + 2)}+{'-' * (w_dir + 2)}+{'-' * (w_file + 2)}+{'-' * (w_size + 2)}+"
)
header = (
f"| {'Path':<{w_path}} "
f"| {'Folders':<{w_dir}} "
f"| {'Files':<{w_file}} "
f"| {'Total Size':<{w_size}} |"
)
row = (
f"| {path_label:<{w_path}} "
f"| {num_dir_str:<{w_dir}} "
f"| {num_file_str:<{w_file}} "
f"| {size_str:<{w_size}} |"
)
return "\n".join([sep, header, sep, row, sep])
@mcp.tool()
async def get_md5(path: str):
"""Compute the MD5 checksum of a file on the NAS. path: share-relative file path."""
from mcp_synology_filestation.client import SynologyError
try:
taskid, first_status = await client.start_and_poll_immediately(
"SYNO.FileStation.MD5",
start_params={"file_path": json.dumps(path)},
poll_version=1,
start_version=2,
)
except SynologyError as e:
return f"Error: {e}"
ok, result = await _poll_oneshot("SYNO.FileStation.MD5", 1, taskid, first_status)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
md5 = status.get("md5", "")
if not md5:
return "Error: DSM returned no MD5 hash."
return f"MD5 of {path}: {md5}"
@mcp.tool()
async def upload(
path: str,
@@ -808,22 +973,9 @@ def register_filestation(
content_base64: str,
overwrite: bool = False,
create_parents: bool = True,
) -> str:
"""Upload a file to a directory on the NAS from base64-encoded content.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing file.
Args:
path: Destination directory path on the NAS (e.g. "/docker/app").
filename: Filename to create (e.g. "compose.yaml").
content_base64: Base64-encoded file content.
overwrite: Replace existing file at destination (default False).
create_parents: Create missing parent directories (default True).
Returns:
Full path of the uploaded file, or an Error: message.
"""
):
"""Upload base64-encoded content as filename into path (max 50 MB).
WARNING: overwrite=True replaces existing file (default False)."""
import base64
from mcp_synology_filestation.client import SynologyError
+123
View File
@@ -0,0 +1,123 @@
"""Wegwerfskript: DirSize + MD5 direkt gegen die NAS testen.
Ausfuehren: uv run python test_dirsize_md5.py
"""
import asyncio
import json
import time
import httpx
from mcp_synology_filestation.auth import AuthManager
from mcp_synology_filestation.client import FileStationClient
from mcp_synology_filestation.config import load_config
DIRSIZE_PATHS = ["/test-mcp", "/docker"]
MD5_PATH = "/test-mcp/test.zip"
def pp(label: str, data: object, elapsed_ms: float | None = None) -> None:
print(f"\n{'='*60}")
suffix = f" [{elapsed_ms:.1f} ms]" if elapsed_ms is not None else ""
print(f" {label}{suffix}")
print("=" * 60)
print(json.dumps(data, indent=2, ensure_ascii=False))
async def raw(http: httpx.AsyncClient, url: str, sid: str, **params) -> dict:
r = await http.get(url, params={"_sid": sid, **params})
r.raise_for_status()
try:
return r.json()
except Exception:
return {"_raw": r.text[:300], "_http_status": r.status_code}
async def probe_dirsize_long(
http: httpx.AsyncClient, sid: str, api_url: str, path: str
) -> None:
"""Start DirSize and poll v1 every 200ms for up to 15s.
Goal: find out if 599 means 'task still running' (keep polling)
or 'task gone' (give up). If the task eventually returns data,
599 = 'not ready yet'. If it never returns data, 599 = 'task gone'.
"""
print(f"\n{'#'*60}")
print(f" DIRSIZE {path} — long poll (15s, every 200ms)")
print(f"{'#'*60}")
t0 = time.perf_counter()
start_body = await raw(
http, api_url, sid,
api="SYNO.FileStation.DirSize", version="2", method="start",
path=json.dumps([path]),
)
elapsed_start = (time.perf_counter() - t0) * 1000
pp(f"DirSize::start ({path})", start_body, elapsed_start)
taskid = (start_body.get("data") or {}).get("taskid")
if not taskid:
print("[!] No taskid.")
return
print(f"\n[*] Polling status v1 every 200ms for up to 15s (taskid={taskid[:12]}...)")
for attempt in range(75): # 75 * 200ms = 15s
if attempt > 0:
await asyncio.sleep(0.2)
t = time.perf_counter()
r = await raw(
http, api_url, sid,
api="SYNO.FileStation.DirSize", version="1", method="status",
taskid=taskid,
)
elapsed = (t - t0) * 1000
success = r.get("success")
data = (r.get("data") or {})
finished = data.get("finished")
error_code = (r.get("error") or {}).get("code")
if finished:
print(f" [{elapsed:.0f}ms] attempt {attempt+1}: FERTIG! "
f"num_dir={data.get('num_dir')} "
f"num_file={data.get('num_file')} "
f"total_size={data.get('total_size')}")
pp(f"DirSize::status final ({path})", r, elapsed)
return
elif success and not finished:
# Still running — show current progress
print(f" [{elapsed:.0f}ms] attempt {attempt+1}: running... "
f"num_dir={data.get('num_dir', '?')} "
f"num_file={data.get('num_file', '?')} "
f"total_size={data.get('total_size', '?')}")
else:
print(f" [{elapsed:.0f}ms] attempt {attempt+1}: error code={error_code}")
# Continue polling — 599 might mean 'not ready yet'
print(f"\n[!] No result after 15s — task never returned data.")
async def main() -> None:
config = load_config()
auth = AuthManager(config)
async with FileStationClient(config.base_url, config.connection.verify_ssl) as client:
client.set_auth_manager(auth)
await client._ensure_initialized() # noqa: SLF001
sid = client.sid
base = config.base_url
info = client._api_cache.get("SYNO.FileStation.DirSize", {}) # noqa: SLF001
api_url = f"{base}/webapi/{info.get('path', 'entry.cgi')}"
print(f"[*] DirSize API: {api_url} v{info.get('minVersion')}-v{info.get('maxVersion')}")
async with httpx.AsyncClient(verify=config.connection.verify_ssl, timeout=30.0) as http:
for path in DIRSIZE_PATHS:
await probe_dirsize_long(http, sid, api_url, path)
await auth.logout(client)
print("\n[*] Logout OK.")
if __name__ == "__main__":
asyncio.run(main())
+562
View File
@@ -22,8 +22,24 @@ def config() -> AppConfig:
def _make_mcp_and_tools(config: AppConfig, client: MagicMock) -> dict:
"""Register FileStation tools on a mock FastMCP and collect them by name."""
from mcp_synology_filestation.client import FileStationClient
from mcp_synology_filestation.tools.filestation import register_filestation
# Bind the real start_and_poll_immediately so it delegates into the
# already-mocked client.request — no separate mock needed per test.
async def _start_and_poll_immediately(
api: str,
start_params: dict,
poll_version: int,
*,
start_version: int | None = None,
):
return await FileStationClient.start_and_poll_immediately(
client, api, start_params, poll_version, start_version=start_version
)
client.start_and_poll_immediately = _start_and_poll_immediately
registered: dict[str, object] = {}
mcp = MagicMock()
@@ -1152,6 +1168,305 @@ async def test_check_exist_multi_path(config: AppConfig) -> None:
assert "/ghost" in requested_paths
# ──────────────────────────────────────────────────────────────────────────
# compress
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_compress_success(config: AppConfig) -> None:
"""compress polls until finished and returns the archive path."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_compress1"}
if method == "status":
return {"finished": True}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/report.pdf", "/data/photos"],
dest_file_path="/backup/archive.zip",
)
assert result == "Compressed to: /backup/archive.zip"
# Verify DSM call parameters
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.Compress"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 3
p = start_call[1]["params"]
assert json.loads(p["path"]) == ["/data/report.pdf", "/data/photos"]
assert json.loads(p["dest_file_path"]) == "/backup/archive.zip"
assert p["level"] == "moderate"
assert p["mode"] == "add"
assert p["format"] == "zip"
assert p["compress_password"] == ""
@pytest.mark.asyncio
async def test_compress_polling_multiple_rounds(config: AppConfig) -> None:
"""compress returns success after multiple polling rounds."""
client = MagicMock()
poll_calls = 0
async def _request(api, method, version=None, params=None, **kwargs):
nonlocal poll_calls
if method == "start":
return {"taskid": "FileStation_compress2"}
if method == "status":
poll_calls += 1
return {"finished": poll_calls >= 3}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/big-folder"],
dest_file_path="/backup/big.7z",
format="7z",
level="maximum",
)
assert result == "Compressed to: /backup/big.7z"
assert poll_calls == 3
@pytest.mark.asyncio
async def test_compress_dsm_error_on_start(config: AppConfig) -> None:
"""compress returns Error: when the start call fails."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("No write permission", code=1801))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/file.txt"],
dest_file_path="/backup/out.zip",
)
assert result.startswith("Error:")
assert "permission" in result.lower()
@pytest.mark.asyncio
async def test_compress_invalid_level(config: AppConfig) -> None:
"""compress rejects unknown level values before making any DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["compress"](
paths=["/data/file.txt"],
dest_file_path="/backup/out.zip",
level="ultra",
)
assert result.startswith("Error:")
assert "level" in result
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_compress_invalid_format(config: AppConfig) -> None:
"""compress rejects unknown format values before making any DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["compress"](
paths=["/data/file.txt"],
dest_file_path="/backup/out.zip",
format="tar.gz",
)
assert result.startswith("Error:")
assert "format" in result
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_compress_empty_paths(config: AppConfig) -> None:
"""compress rejects an empty paths list before making any DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["compress"](paths=[], dest_file_path="/backup/out.zip")
assert result.startswith("Error:")
assert "paths" in result.lower() or "empty" in result.lower()
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_compress_timeout(config: AppConfig) -> None:
"""compress returns an error after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_compress_timeout"}
return {"finished": False}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/huge"],
dest_file_path="/backup/huge.zip",
)
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
# ──────────────────────────────────────────────────────────────────────────
# extract
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_extract_success(config: AppConfig) -> None:
"""extract polls until finished and returns the dest_folder_path from status."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract1"}
if method == "status":
return {
"finished": True,
"dest_folder_path": "/data/extracted",
"path": "/backup/archive.zip",
"progress": 1,
}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/archive.zip",
dest_folder_path="/data/extracted",
)
assert result == "Extracted to: /data/extracted"
# Verify DSM call parameters
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.Extract"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
p = start_call[1]["params"]
assert json.loads(p["file_path"]) == "/backup/archive.zip"
assert json.loads(p["dest_folder_path"]) == "/data/extracted"
assert p["overwrite"] == "false"
assert p["keep_dir"] == "true"
assert p["create_subfolder"] == "false"
assert p["codepage"] == "enu"
assert p["password"] == ""
@pytest.mark.asyncio
async def test_extract_overwrite_and_subfolder(config: AppConfig) -> None:
"""extract passes overwrite=true and create_subfolder=true when requested."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract2"}
return {"finished": True, "dest_folder_path": "/data/out"}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
await tools["extract"](
file_path="/backup/archive.zip",
dest_folder_path="/data/out",
overwrite=True,
create_subfolder=True,
)
p = client.request.call_args_list[0][1]["params"]
assert p["overwrite"] == "true"
assert p["create_subfolder"] == "true"
@pytest.mark.asyncio
async def test_extract_dest_folder_from_status(config: AppConfig) -> None:
"""extract uses dest_folder_path from status response when available."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract3"}
return {"finished": True, "dest_folder_path": "/data/real-dest"}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/archive.zip",
dest_folder_path="/data/requested",
)
# Should report what DSM confirmed, not what we requested
assert result == "Extracted to: /data/real-dest"
@pytest.mark.asyncio
async def test_extract_dsm_error_on_start(config: AppConfig) -> None:
"""extract returns Error: when the start call fails (e.g. bad path)."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/missing.zip",
dest_folder_path="/data/out",
)
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_extract_timeout(config: AppConfig) -> None:
"""extract returns an error after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract_timeout"}
return {"finished": False, "progress": 0.1}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/huge.zip",
dest_folder_path="/data/out",
)
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_check_exist_empty_path(config: AppConfig) -> None:
"""check_exist returns Error: when no path is given."""
@@ -1197,3 +1512,250 @@ async def test_check_exist_uses_getinfo(config: AppConfig) -> None:
call_args = client.request.call_args
assert call_args[0][0] == "SYNO.FileStation.List"
assert call_args[0][1] == "getinfo"
# ──────────────────────────────────────────────────────────────────────────
# dir_size
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_dir_size_success(config: AppConfig) -> None:
"""dir_size polls until finished and returns a formatted table."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize1"}
if method == "status":
return {
"finished": True,
"num_dir": 4,
"num_file": 23,
"total_size": 5_242_880,
}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data")
assert "Folders" in result
assert "Files" in result
assert "Total Size" in result
assert "4" in result
assert "23" in result
assert "5 MB" in result or "MB" in result
# Verify DSM call params
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.DirSize"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
assert json.loads(start_call[1]["params"]["path"]) == ["/data"]
@pytest.mark.asyncio
async def test_dir_size_multi_path(config: AppConfig) -> None:
"""dir_size passes all comma-separated paths as a JSON array."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize2"}
return {"finished": True, "num_dir": 1, "num_file": 2, "total_size": 1024}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data, /backup")
start_params = client.request.call_args_list[0][1]["params"]
assert json.loads(start_params["path"]) == ["/data", "/backup"]
assert "/data" in result
assert "/backup" in result
@pytest.mark.asyncio
async def test_dir_size_dsm_error_on_start(config: AppConfig) -> None:
"""dir_size returns Error: when start fails."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/missing")
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_dir_size_timeout(config: AppConfig) -> None:
"""dir_size returns Error: after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_timeout"}
return {"finished": False, "num_dir": 0, "num_file": 0, "total_size": 0}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/huge")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_dir_size_empty_path(config: AppConfig) -> None:
"""dir_size returns Error: for blank path without making a DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["dir_size"](path=" ")
assert result.startswith("Error:")
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_dir_size_retries_on_transient_599(config: AppConfig) -> None:
"""dir_size retries up to 4 times on code-599 then succeeds on 5th status call."""
client = MagicMock()
call_count = {"status": 0}
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_599"}
call_count["status"] += 1
if call_count["status"] < 4:
raise SynologyError("DSM error code 599", code=599)
return {"finished": True, "num_dir": 2, "num_file": 10, "total_size": 1024}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data")
assert "Total Size" in result
assert call_count["status"] == 4
@pytest.mark.asyncio
async def test_dir_size_times_out_on_persistent_599(config: AppConfig) -> None:
"""dir_size times out after 60 s when DSM returns only 599s for every poll.
The immediate poll + burst both return 599; Phase 2 keeps polling (large
directories eventually surface) until the 60 s timeout fires.
"""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_dead"}
raise SynologyError("DSM error code 599", code=599)
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/dead")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
# ──────────────────────────────────────────────────────────────────────────
# get_md5
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_md5_success(config: AppConfig) -> None:
"""get_md5 polls until finished and returns the MD5 string."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_1"}
if method == "status":
return {"finished": True, "md5": "d41d8cd98f00b204e9800998ecf8427e"}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/file.zip")
assert result == "MD5 of /data/file.zip: d41d8cd98f00b204e9800998ecf8427e"
# Verify DSM call params
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.MD5"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
assert json.loads(start_call[1]["params"]["file_path"]) == "/data/file.zip"
@pytest.mark.asyncio
async def test_get_md5_dsm_error_on_start(config: AppConfig) -> None:
"""get_md5 returns Error: when start fails (e.g. file not found)."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/missing.zip")
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_get_md5_timeout(config: AppConfig) -> None:
"""get_md5 returns Error: after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_timeout"}
return {"finished": False}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/huge.iso")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_get_md5_missing_hash_in_response(config: AppConfig) -> None:
"""get_md5 returns Error: when finished status contains no md5 field."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_nohash"}
return {"finished": True} # md5 field absent
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/file.zip")
assert result.startswith("Error:")
assert "md5" in result.lower() or "hash" in result.lower()
Generated
+1 -1
View File
@@ -362,7 +362,7 @@ wheels = [
[[package]]
name = "mcp-synology-filestation"
version = "0.1.0"
version = "0.2.4"
source = { editable = "." }
dependencies = [
{ name = "click" },