20 Commits

Author SHA1 Message Date
marcus 83bccbcb53 chore: remove throwaway test scripts, update SPEC.md for v0.2.10
- Delete test_dirsize_md5.py, test_extract.py, test_sharing.py
- SPEC.md: add all 20 tools, DSM quirks (one-shot, cold-start, FastMCP
  outputSchema, DirSize status v1, Sharing.delete id encoding), APIs
  confirmed unavailable, mark v0.2 complete, list v0.3 candidates

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 15:33:22 +02:00
marcus ae90e5f09a feat: add check_permission + 3 sharing tools (v0.2.10)
Implements Group 3 of the planned tool set:
- check_permission: SYNO.FileStation.CheckPermission/write
- create_sharing_link: SYNO.FileStation.Sharing/create (password + expiry optional)
- list_sharing_links: SYNO.FileStation.Sharing/list (paginated table)
- delete_sharing_link: SYNO.FileStation.Sharing/delete

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 15:04:00 +02:00
marcus 451ee7116f fix: cold-start 599 on DirSize/MD5 — restart task instead of giving up
DSM's DirSize and MD5 background service needs ~6-8 s to initialise
after a period of inactivity. During this cold-start window tasks are
registered but every status poll returns error 599 ("no such task").

Replace the ad-hoc start+_poll_task call in dir_size and get_md5 with
a new _start_and_poll_oneshot helper that:
- polls with exponential backoff (0.2 s → cap 2 s)
- on 5 consecutive 599s: restarts the task (up to 6 attempts total)
- honours a shared 60 s wall-clock budget across all restarts
- returns a clear error if all restart attempts are exhausted

Root cause confirmed by test_dirsize_md5.py: after ~6 s / 2 restarts
the service warms up and the very first poll on the new task succeeds.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 14:49:32 +02:00
marcus 8b2f07d9c3 revert: restore _poll_task and dir_size/get_md5 to 0.2.2 state
All changes since 0.2.2 to _poll_task, dir_size, and get_md5 (window_timeout,
_poll_oneshot, start_and_poll_immediately) are reverted. The 0.2.2 behaviour
worked reliably for small directories and is the last known-good baseline.

The remaining known limitation (occasional 599 on large directories) is
documented in SPEC.md. Retry the operation as a workaround.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 14:18:43 +02:00
marcus 62f8e41931 fix: _poll_oneshot for DirSize/MD5 with burst-retry on early 599
Add FileStationClient.start_and_poll_immediately: starts the async task and
immediately makes the first status poll within the same method, with no
intermediate awaits other than the two HTTP calls. This minimises scheduler
latency between start and first poll for one-shot tasks.

_poll_oneshot now accepts the first_status from start_and_poll_immediately:
- finished=True on first poll → return immediately
- finished=False → Phase 2 (exponential backoff, 60 s timeout)
- None (first poll was 599) → burst-retry 10× at 10 ms, then Phase 2
  (Phase 2 keeps polling through 599 until seen_alive, then fails fast)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:50:28 +02:00
marcus 0e8ffaa6df fix: _poll_oneshot for DirSize/MD5 with burst-retry on early 599
Replace _poll_task for one-shot tasks with _poll_oneshot, which uses two
phases: (1) a burst of up to 11 immediate polls at 50ms intervals to catch
tasks that complete in <500ms, and (2) exponential-backoff polling once
finished=False is observed. A 599 during burst → window missed (fail fast).
A 599 during Phase 2 (task was seen running) → same. _poll_task is
simplified back to a plain long-poll with no window_timeout logic.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:38:42 +02:00
marcus 6510493930 fix: window_timeout for one-shot tasks, drop debug stderr logging
_poll_task now accepts window_timeout. For DirSize and MD5 (one-shot result
windows), if only 599 errors arrive for window_timeout seconds without ever
seeing the task alive (finished=False), return a fast "result window missed —
please retry" error instead of waiting the full 60 s. Tasks that return
finished=False at least once (large dirs, large files) are unaffected.

Also removes the stale [dsm] debug stderr.write left in client.request().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:30:20 +02:00
marcus 4bf655236d fix: treat DSM 599 as task-not-ready in _poll_task, poll until 60s timeout
DirSize/MD5 return error 599 while the async task is still initialising on
the NAS, not only after the task is gone. Remove the 5-consecutive-599 abort
limit and the debug stderr logging; instead pass on 599 and keep polling
until the existing 60 s timeout fires. Rename the test that checked the old
limit to reflect the new timeout-based behaviour.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:19:32 +02:00
marcus c0d4c347c5 debug: add stderr logging to _poll_task for every status poll attempt
Logs [poll] lines to stderr so Claude Desktop's MCP log shows exactly
what DSM returns on each status call: attempt number, elapsed time,
finished flag, data keys (on success) or error code + message (on 599).

Version 0.2.3 — remove this logging once the 599 root cause is confirmed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:05:37 +02:00
marcus e3fa71b458 fix: retry _poll_task on transient 599 instead of aborting immediately
DirSize for large directories (e.g. /docker, 8441 folders, 46832 files)
takes ~800ms to compute. While running, status returns intermediate
progress (finished=false). But on the very first poll the task can return
599 transiently (task just started, not yet available). Previously
_poll_task caught any SynologyError and returned immediately, making
dir_size always fail on the first 599.

Fix: treat 599 as a transient condition and continue polling. Give up
only after 5 consecutive 599 responses. All other error codes remain
immediately fatal.

Investigation confirmed with test_dirsize_md5.py:
- /test-mcp (2937 B): finished=true at 0ms
- /docker (3.9 GB, 46832 files): finished=false at 35ms, finished=true at 789ms

Tests: 2 new cases (retry-succeeds, 5x-599-gives-up) → 95 total

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 13:01:27 +02:00
marcus 4d8eae752d chore: bump version to 0.2.1
Includes all DirSize/MD5 polling fixes:
- initial_delay=0.0 for DirSize and MD5 (poll immediately after start)
- MD5 status uses version=1 (v2 always returns 599 on this NAS)
- __version__ kept in sync with pyproject.toml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:52:42 +02:00
marcus c923da6f6a fix: sync __version__ to 0.2.0 to match pyproject.toml
__init__.py was still at 0.1.0 after the pyproject.toml bump in an
earlier commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:50:59 +02:00
marcus d8d7c6fd47 test: update dirsize/md5 probe with path-variant comparison results
Three path formats tested against NAS:
  a) plain string        -> 599 (wrong, task fails silently)
  b) json.dumps([path])  -> works (JSON array is the correct format)
  c) json.dumps(path)    -> 599 (wrong, double-encoded string)

Confirms current implementation (json.dumps(paths)) is correct.
MD5 probe simplified to final confirmed settings (status v1, 0ms).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:39:37 +02:00
marcus 4145d929a6 fix: correct DirSize/MD5 polling — initial_delay=0 and MD5 status v1
Live NAS investigation (test_dirsize_md5.py) revealed two bugs:

1. _poll_task() always slept 200ms before the first status call.
   DirSize and MD5 complete near-instantly on small data, so the result
   window closes before the first poll. Fix: add initial_delay parameter
   (default 0.2s for CopyMove/Delete/Compress/Extract); DirSize and MD5
   pass initial_delay=0.0 to poll immediately after start.

2. get_md5 used status version=2, but the NAS only serves the result on
   status v1 (v2 always returns 599 regardless of timing). Fix: change
   _poll_task version to 1 for SYNO.FileStation.MD5.

MD5 is one-shot: the result is consumed on the first successful status
read. Polling at 0ms ensures we catch it before it expires.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:35:43 +02:00
marcus 04caaef003 fix: use correct status API versions for DirSize and MD5
- DirSize: _poll_task now uses status version=1 (v2 always returns 599)
- MD5: _poll_task keeps status version=2 (confirmed working via live NAS test)

Investigation notes documented in test_dirsize_md5.py:
both APIs use start v2; DirSize status needs v1, MD5 status needs v2;
tiny data causes one-shot race condition (no issue with real-world data).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:24:02 +02:00
marcus 1d0cf940b4 feat: add dir_size and get_md5 tools
dir_size: SYNO.FileStation.DirSize v2 — async scan of one or more
directories, returns folder/file count and total size as a table.
get_md5: SYNO.FileStation.MD5 v2 — async MD5 checksum of a file.
Both follow the existing _poll_task pattern. 9 new unit tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 12:10:51 +02:00
marcus fc706fb809 perf: shorten all tool docstrings to reduce tools/list payload
Reduced tools/list JSON payload from ~45 KB to 5.7 KB by replacing
verbose multi-paragraph docstrings with 1-2 line summaries on all
14 @mcp.tool() functions. Fixes Claude Desktop truncation of tools/list.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 11:58:11 +02:00
marcus 500dc73324 fix: remove -> str return annotations from all mcp.tool() functions
FastMCP generates an outputSchema from the return type annotation, which
inflates the tools/list payload and triggers truncation in Claude Desktop.
Dropping the annotations suppresses outputSchema generation entirely.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 11:49:04 +02:00
marcus 473c771c20 feat: add compress and extract tools
Implements SYNO.FileStation.Compress (v3) and SYNO.FileStation.Extract (v2)
with async polling identical to copy/move. Includes input validation for
compress (level, mode, format, empty paths) and 11 new unit tests.
Bumps version to 0.2.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 11:26:08 +02:00
marcus dbab842738 feat: add check_exist tool
SYNO.FileStation.CheckExist returns error 400 for every parameter format on
this DSM firmware, so the tool falls back to SYNO.FileStation.List::getinfo.
DSM returns an entry per requested path with name=None for non-existent paths,
which provides a reliable exists/not-exists signal.

Accepts a single path or a comma-separated list; returns a table of
Path | Exists (Yes/No) with a count footer.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 10:49:50 +02:00
7 changed files with 1497 additions and 260 deletions
+303 -97
View File
@@ -62,21 +62,83 @@ All requests use `GET /webapi/entry.cgi` with query parameters unless noted.
| `SYNO.FileStation.Rename` | 2 | `rename` |
| `SYNO.FileStation.CopyMove` | 3 | `start`, `status`, `stop` |
| `SYNO.FileStation.Delete` | 2 | `start`, `status`, `stop` |
| `SYNO.FileStation.Compress` | 3 | `start`, `status`, `stop` |
| `SYNO.FileStation.Extract` | 2 | `start`, `status`, `stop` |
| `SYNO.FileStation.DirSize` | 2/1 | `start` (v2), `status` (v1) |
| `SYNO.FileStation.MD5` | 2/1 | `start` (v2), `status` (v1) |
| `SYNO.FileStation.CheckPermission` | 3 | `write` |
| `SYNO.FileStation.Sharing` | 3 | `create`, `list`, `delete` |
### Async task pattern (CopyMove, Delete, Search)
### Async task pattern (CopyMove, Delete, Search, Compress, Extract)
DSM returns a `taskid` from `start`; the client polls `status` with exponential backoff
(initial 200 ms, max 2 s, timeout 60 s) until `finished=true`, then calls `stop`/`clean`.
Implemented in `_poll_task()`.
### One-shot task pattern (DirSize, MD5)
`DirSize` and `MD5` differ from the async task pattern: DSM delivers `finished=true` exactly
once, then immediately discards the result (subsequent polls return error 599). Implemented
in `_start_and_poll_oneshot()`:
1. Call `start`.
2. Poll `status` repeatedly until `finished=true` or timeout.
3. On 599: if within the first ~8 s after start, the background service may still be warming
up ("cold start") — restart the task and try again (up to 6 restarts).
4. On `finished=true`: return the data and stop polling (never poll again).
**Cold-start behaviour:** After a period of inactivity, DSM's DirSize/MD5 background service
takes ~68 s to initialise. During this window every `status` poll returns error 599. The
correct recovery is to wait a moment, then restart the task. Retrying the same `taskid` does
not help — the service must be cold-started via a new `start` call.
**DirSize `status` version:** Must use `version=1`. Using `version=2` always returns 599
regardless of service state.
---
## Tools
## Tools (v0.2.10 — 20 tools)
### Read-only
#### `list_shares`
List all shared folders visible to the authenticated user.
| Tool | Description |
|------|-------------|
| `list_shares` | List all shared folders with volume usage |
| `list_dir` | List directory contents with pagination and sorting |
| `get_info` | Get detailed metadata for one or more paths |
| `check_exist` | Check if one or more paths exist (Yes/No table) |
| `search` | Search for files by glob pattern with async polling |
| `download` | Download a file as base64 (max 10 MB) |
| `dir_size` | Total size, file count, folder count for directories |
| `get_md5` | Compute MD5 checksum of a file |
### Write
| Tool | Description |
|------|-------------|
| `create_folder` | Create a new folder (optionally with parent dirs) |
| `rename` | Rename a file or folder |
| `copy` | Copy a file or folder (async polling, overwrite=False default) |
| `move` | Move a file or folder (async polling, overwrite=False default) |
| `delete` | Delete a file or folder — requires confirmed=True |
| `upload` | Upload base64-encoded content to a path (max 50 MB) |
| `compress` | Compress paths into a ZIP or 7z archive |
| `extract` | Extract a ZIP or 7z archive to a destination folder |
### Permission & Sharing
| Tool | Description |
|------|-------------|
| `check_permission` | Check write permission for a filename in a directory |
| `create_sharing_link` | Create a public sharing link (optional password + expiry) |
| `list_sharing_links` | List all sharing links (paginated table) |
| `delete_sharing_link` | Delete a sharing link by ID |
---
### Tool details
#### `list_shares`
**Parameters:** none
**Returns:** Formatted table of share names, paths, and volume usage.
@@ -92,8 +154,6 @@ additional=["volume_status"]
---
#### `list_dir`
List contents of a directory with optional pagination and sorting.
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
@@ -119,17 +179,11 @@ additional=["size","time"]
>
> **`additional` format:** Must be a JSON array serialised as a string
> (`json.dumps(["size","time"])` → `'["size", "time"]'`). A comma-separated string
> (`"size,time"`) is silently ignored by DSM — the `additional` field will be absent from
> every file entry.
>
> **`SYNO.FileStation.Stat`:** Not available on this NAS's API registry. Use
> `SYNO.FileStation.List::getinfo` for per-path metadata instead.
> (`"size,time"`) is silently ignored by DSM.
---
#### `get_info`
Get detailed metadata for one or more files or folders.
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
@@ -140,19 +194,25 @@ creation time, and real volume path for each requested item.
**DSM call:** `SYNO.FileStation.List::getinfo`
```
path={comma-joined paths},
path=json.dumps([paths...]),
additional=["real_path","size","time","perm","owner","type"]
```
> **Note:** `SYNO.FileStation.Stat` is not available on all NAS firmware versions and is
> absent from this NAS's API registry. `SYNO.FileStation.List::getinfo` returns identical
> data and is confirmed working.
---
#### `check_exist`
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `path` | str | yes | One or more share-relative paths, comma-separated |
**Returns:** Yes/No table per path.
**DSM call:** `SYNO.FileStation.List::getinfo` — entries with `name=null` are non-existent.
---
#### `search`
Search for files matching a pattern within a directory.
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
@@ -164,36 +224,48 @@ Search for files matching a pattern within a directory.
**Returns:** List of matching paths with type, size, and modification time.
**DSM calls:** `SYNO.FileStation.Search::start` → poll `::list``::stop` + `::clean`
```
start: folder_path={path}, recursive={recursive}, pattern={pattern}
list: taskid={taskid}, offset=0, limit={max_results}
```
---
#### `download`
Download a single file and return its content.
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `path` | str | yes | Absolute path to the file on the NAS |
| `path` | str | yes | Share-relative file path |
**Returns:** Object with `filename`, `size`, `content_base64` (base64-encoded file bytes).
Files larger than 10 MB return an error suggesting `sftp` instead.
**Returns:** JSON `{filename, size, content_base64}`. Files > 10 MB return an error.
**DSM call:** `SYNO.FileStation.Download::download` (streaming GET)
```
path={path}, mode=download
```
**DSM call:** `SYNO.FileStation.Download::download` (streaming GET, `mode=download`)
---
### Write (require explicit confirmation where noted)
#### `dir_size`
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `path` | str | yes | Comma-separated share-relative paths |
**Returns:** Table with total size, file count, folder count.
**DSM calls:** `SYNO.FileStation.DirSize::start` (v2) → poll `::status` (v1)
> See *One-shot task pattern* above for polling behaviour and cold-start recovery.
---
#### `get_md5`
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `path` | str | yes | Share-relative file path |
**Returns:** `MD5 of {path}: {hash}`
**DSM calls:** `SYNO.FileStation.MD5::start` (v2) → poll `::status` (v1)
---
#### `create_folder`
Create a new directory (and optionally all intermediate parents).
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
@@ -201,113 +273,169 @@ Create a new directory (and optionally all intermediate parents).
| `name` | str | yes | — | New folder name |
| `create_parents` | bool | no | false | Create missing parent directories |
**Returns:** Path of the created folder or an error message.
**DSM call:** `SYNO.FileStation.CreateFolder::create`
```
folder_path={path}, name={name}, force_parent={create_parents}
```
---
#### `rename`
Rename a file or directory.
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `path` | str | yes | Absolute path to the item |
| `new_name` | str | yes | New filename (not a full path) |
**Returns:** New absolute path after rename.
| `path` | str | yes | Current share-relative path |
| `new_name` | str | yes | New filename (bare name, not full path) |
**DSM call:** `SYNO.FileStation.Rename::rename`
```
path={path}, name={new_name}
```
---
#### `move`
Move a file or directory to a new location.
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `src` | str | yes | — | Source absolute path |
| `dst` | str | yes | — | Destination directory path |
| `overwrite` | bool | no | false | Overwrite if destination exists |
**Returns:** Destination path on success, or a descriptive error.
**DSM call:** `SYNO.FileStation.CopyMove::start` (async task)
```
path={src}, dest_folder_path={dst}, overwrite={overwrite}, remove_src=true
```
---
#### `copy`
Copy a file or directory to a new location.
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `src` | str | yes | — | Source absolute path |
| `src` | str | yes | — | Source path |
| `dst` | str | yes | — | Destination directory path |
| `overwrite` | bool | no | false | Overwrite if destination exists |
**Returns:** Destination path on success, or a descriptive error.
**DSM call:** `SYNO.FileStation.CopyMove::start` (async, `remove_src=false`)
**DSM call:** `SYNO.FileStation.CopyMove::start` (async task)
```
path={src}, dest_folder_path={dst}, overwrite={overwrite}, remove_src=false
```
---
#### `move`
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `src` | str | yes | — | Source path |
| `dst` | str | yes | — | Destination directory path |
| `overwrite` | bool | no | false | Overwrite if destination exists |
**DSM call:** `SYNO.FileStation.CopyMove::start` (async, `remove_src=true`)
---
#### `delete`
**Destructive — requires `confirmed=True`.**
Delete a file or directory. Without confirmation, returns a preview of what would be deleted.
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `path` | str | yes | — | Absolute path to delete |
| `path` | str | yes | — | Path to delete |
| `confirmed` | bool | yes | false | Must be `true` to proceed |
**Returns:**
- `confirmed=false`: Preview message listing the path and item type.
- `confirmed=true`: Success message or error detail.
- `confirmed=false`: Preview of what would be deleted.
- `confirmed=true`: Success message or error.
**DSM call:** `SYNO.FileStation.Delete::start` (async task)
```
path={path}, recursive=true, accurate_progress=false
```
**DSM call:** `SYNO.FileStation.Delete::start` (async, `recursive=true`)
---
#### `upload`
Upload a file to the NAS from base64-encoded content.
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `path` | str | yes | — | Destination directory path on the NAS |
| `path` | str | yes | — | Destination directory path |
| `filename` | str | yes | — | Filename to create |
| `content_base64` | str | yes | — | Base64-encoded file content |
| `overwrite` | bool | no | false | Overwrite if a file with this name already exists |
| `overwrite` | bool | no | false | Overwrite if file exists |
| `create_parents` | bool | no | true | Create missing parent directories |
**Returns:** Full path of the uploaded file or an error message.
Files exceeding 50 MB should not be uploaded via MCP; return a clear error.
**Returns:** Full path of uploaded file. Files > 50 MB are rejected.
**DSM call:** `SYNO.FileStation.Upload::upload` (POST multipart/form-data)
```
path={path}, create_parents={create_parents}, overwrite={overwrite},
file=<binary content decoded from content_base64>
```
---
#### `compress`
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `paths` | list[str] | yes | — | Paths to compress |
| `dest_file_path` | str | yes | — | Output archive path incl. filename |
| `level` | str | no | `moderate` | `store`/`fastest`/`fast`/`normal`/`moderate`/`maximum` |
| `mode` | str | no | `add` | `add`/`update`/`refreshen` |
| `format` | str | no | `zip` | `zip` or `7z` |
| `password` | str | no | `""` | Archive password |
**DSM call:** `SYNO.FileStation.Compress::start` (async, v3)
---
#### `extract`
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `file_path` | str | yes | — | Archive path on NAS |
| `dest_folder_path` | str | yes | — | Destination directory |
| `overwrite` | bool | no | false | Overwrite existing files |
| `keep_dir` | bool | no | true | Preserve directory structure |
| `create_subfolder` | bool | no | false | Extract into a subfolder |
| `password` | str | no | `""` | Archive password |
**DSM call:** `SYNO.FileStation.Extract::start` (async, v2)
---
#### `check_permission`
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `path` | str | yes | — | Directory to check |
| `filename` | str | yes | — | Filename to check write access for |
| `overwrite` | bool | no | false | Check overwrite permission |
| `create_only` | bool | no | false | Check create-only permission |
**Returns:** `"Permission granted: write {filename} into {path}"` or `"Error: …"`
**DSM call:** `SYNO.FileStation.CheckPermission::write` (v3)
> **Parameter format:** `path` and `filename` are passed as plain strings (no `json.dumps`).
> On success, DSM returns `{"blSkip": false}` — the tool maps this to a human-readable message.
---
#### `create_sharing_link`
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `path` | str | yes | — | File or folder path to share |
| `password` | str | no | `""` | Password to protect the link |
| `date_expired` | str | no | `""` | Expiry date (`YYYY-MM-DD`) |
| `date_available` | str | no | `""` | Availability start date (`YYYY-MM-DD`) |
**Returns:** Sharing URL + link ID, with password-protection flag.
**DSM call:** `SYNO.FileStation.Sharing::create` (v3)
> `path` must be `json.dumps()`-encoded. The `date_expired`/`date_available` fields are not
> echoed back in the create response — confirm via `list_sharing_links` if needed.
---
#### `list_sharing_links`
**Parameters:**
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `offset` | int | no | 0 | Pagination offset |
| `limit` | int | no | 100 | Max links to return |
**Returns:** Table with ID, URL, path, owner, expiry, status. Includes total count and
pagination hint when more results are available.
**DSM call:** `SYNO.FileStation.Sharing::list` (v3)
---
#### `delete_sharing_link`
**Parameters:**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `link_id` | str | yes | Sharing link ID (from `list_sharing_links`) |
**Returns:** `"Deleted sharing link: {link_id}"` or error.
**DSM call:** `SYNO.FileStation.Sharing::delete` (v3)
> `link_id` must be passed as `json.dumps(link_id)` in the `id` parameter.
---
@@ -317,8 +445,7 @@ file=<binary content decoded from content_base64>
1. DSM error codes are mapped to human-readable messages before surfacing to Claude.
2. Never expose raw stack traces or session IDs in tool responses.
3. Auth errors (codes 400403) trigger a clear message with a hint to run `setup`.
4. Session expiry errors (106, 107, 119) are retried once transparently; if the retry also
fails, the user sees "Session expired — please restart the MCP server."
4. Session expiry errors (106, 107, 119) are retried once transparently.
5. Network errors (timeouts, connection refused) are reported as
"Cannot reach NAS at {host} — check connectivity."
6. Unknown DSM error codes are reported as "DSM error {code}: {raw_message}".
@@ -340,6 +467,7 @@ file=<binary content decoded from content_base64>
| 403 | 2FA required | "Two-factor authentication required — run setup." |
| 404 | 2FA failed | "OTP code incorrect." |
| 408 | Device token required | "Device token required — run setup again." |
| 599 | Background service not ready | (handled by `_start_and_poll_oneshot` — restart task) |
| 1800 | File not found | "File or folder not found: {path}" |
| 1801 | No write permission | "No write permission for: {path}" |
| 1802 | File exists | "A file already exists at this path." |
@@ -351,6 +479,59 @@ file=<binary content decoded from content_base64>
---
## DSM API Quirks
### Parameter encoding
- **JSON array parameters** (`path`, `additional`, etc.) must be serialised with
`json.dumps()`. A plain Python list or comma-separated string is silently ignored
by DSM — the field will be absent from the response.
- **`path` for multi-item operations** (e.g. `List::getinfo`, `DirSize::start`):
always pass as `json.dumps(["/path1", "/path2"])`, even for a single path.
- **`path` for single-item operations** (e.g. `CopyMove::start`, `Rename::rename`,
`Sharing::create`): pass as `json.dumps("/path")` (a JSON-encoded string).
- **`Sharing::delete` `id` parameter:** must be `json.dumps(link_id)` — plain string
is not accepted.
- **`CheckPermission::write` `path` and `filename`:** pass as plain strings (no
`json.dumps`).
- **Share paths vs. volume paths:** always use share paths (`/docker`, `/homes/marcus`).
Volume paths (`/volume1/docker`) cause DSM error 408.
### APIs confirmed NOT available on this NAS
| API | Status | Workaround |
|-----|--------|------------|
| `SYNO.FileStation.Stat` | Not in API registry | Use `List::getinfo` |
| `SYNO.FileStation.CheckExist` | Returns error 400 | Use `List::getinfo` (entries with `name=null` don't exist) |
### FastMCP `outputSchema` / tools/list payload limit
Do **not** add `-> str` return type annotations to `@mcp.tool()` functions. FastMCP
infers `outputSchema` from the annotation, which doubles the tools/list payload size.
Claude Desktop truncates the tool list when the payload exceeds its limit, making some
tools invisible. Omitting the return annotation suppresses `outputSchema` generation.
### DirSize / MD5 one-shot behaviour
`DirSize` and `MD5` tasks are consumed on first read: `finished=true` is returned exactly
once, then the task is deleted and all subsequent `status` polls return error 599.
Implementation: `_start_and_poll_oneshot()` — never call `status` a second time after
receiving `finished=true`.
### DirSize cold-start
After a period of inactivity, DSM's DirSize background service needs ~68 s to initialise.
During cold start, `status` returns 599 on every poll. The fix is to wait briefly and
**restart** the task (new `start` call) — polling the original `taskid` again does not help.
`_start_and_poll_oneshot()` retries up to 6 task restarts before giving up.
### DirSize `status` API version
`SYNO.FileStation.DirSize::status` must be called with `version=1`. Using `version=2`
always returns error 599 regardless of whether the background service is running.
---
## Configuration Model
```yaml
@@ -402,3 +583,28 @@ Validate config, resolve credentials, test login, list available FileStation API
Load config and credentials, create `FileStationClient` (lazy — no immediate connection),
create and run `FastMCP("mcp-synology-filestation")` over stdio. Uses `anyio.run()` for
Windows compatibility.
---
## Roadmap
### v0.2 — complete (20 tools)
All tools shipped in v0.2.10:
**Group 1:** `list_shares`, `list_dir`, `get_info`, `check_exist`, `search`, `download`
**Group 2:** `create_folder`, `rename`, `copy`, `move`, `delete`, `upload`, `compress`,
`extract`, `dir_size`, `get_md5`
**Group 3:** `check_permission`, `create_sharing_link`, `list_sharing_links`,
`delete_sharing_link`
### v0.3 — candidates
| Tool | API | Notes |
|------|-----|-------|
| `get_thumbnail` | `SYNO.FileStation.Thumb` | Return base64 thumbnail for image/video |
| `list_favorites` | `SYNO.FileStation.Favorite` | List user-pinned favourite paths |
| `add_favorite` | `SYNO.FileStation.Favorite` | Pin a path as favourite |
| `delete_favorite` | `SYNO.FileStation.Favorite` | Remove a favourite |
| `list_snapshots` | `SYNO.FileStation.Snapshot` | List Btrfs snapshots for a share |
| `background_tasks` | `SYNO.FileStation.BackgroundTask` | List/cancel background operations |
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "mcp-synology-filestation"
version = "0.1.0"
version = "0.2.10"
description = "MCP server for Synology FileStation"
requires-python = ">=3.12"
dependencies = [
+1 -1
View File
@@ -1,3 +1,3 @@
"""MCP server for Synology FileStation."""
__version__ = "0.1.0"
__version__ = "0.2.10"
-2
View File
@@ -247,8 +247,6 @@ class FileStationClient:
Raises:
SynologyError: On API errors.
"""
sys.stderr.write(f"[dsm] request: {api}/{method}\n")
sys.stderr.flush()
if not self._initializing:
await self._ensure_initialized()
http = self._get_http()
+497 -155
View File
@@ -59,15 +59,104 @@ def register_filestation(
client: FileStationClient for DSM API calls.
"""
# ── internal polling helper ──────────────────────────────────────────
# ── internal polling helpers ──────────────────────────────────────────
async def _poll_task(api: str, version: int, taskid: str) -> tuple[bool, dict[str, Any] | str]:
"""Poll a DSM async task (CopyMove / Delete) until finished or timeout.
async def _start_and_poll_oneshot(
api: str,
start_params: dict[str, Any],
start_version: int,
poll_version: int,
) -> tuple[bool, dict[str, Any] | str]:
"""Start a one-shot DSM task and poll until finished, restarting on cold-start 599s.
DirSize and MD5 are "one-shot" tasks: DSM delivers ``finished=True`` exactly
once, then discards the result. Additionally, the DSM background service for
these tasks occasionally needs a few seconds to initialise after a period of
inactivity ("cold start"). During cold start the service registers task IDs
but returns error 599 on every status poll. The correct recovery is to restart
the task once the service has had time to wake up.
Args:
api: DSM API name (e.g. "SYNO.FileStation.DirSize").
start_params: Parameters forwarded to the ``start`` method.
start_version: API version for the ``start`` call.
poll_version: API version for the ``status`` call.
Returns:
``(True, status_dict)`` on success, or ``(False, "Error: …")`` on
DSM error or timeout.
"""
from mcp_synology_filestation.client import SynologyError as _SynologyError
max_restarts = 6
timeout = 60.0
total_elapsed = 0.0
for _attempt in range(max_restarts):
try:
start_data = await client.request(
api, "start", version=start_version, params=start_params
)
except _SynologyError as e:
return False, f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return False, "Error: DSM did not return a task ID."
# Poll with exponential backoff; restart on 5 consecutive 599s
delay = 0.2
consecutive_599 = 0
while True:
try:
status_data = await client.request(
api, "status", version=poll_version, params={"taskid": taskid}
)
consecutive_599 = 0
if status_data.get("finished"):
return True, status_data
# Still running — keep polling
except _SynologyError as e:
if e.code != 599:
return False, f"Error: {e}"
consecutive_599 += 1
if consecutive_599 >= 5:
# 5× 599 in a row: either cold start or missed result window.
# Restart the task so DSM can re-queue it.
break
if total_elapsed >= timeout:
return (
False,
"Error: Operation timed out after 60 seconds — check NAS manually.",
)
await asyncio.sleep(delay)
total_elapsed += delay
delay = min(delay * 2, 2.0)
return (
False,
"Error: DSM did not return results after multiple retries"
" (service may be starting up — try again in a moment).",
)
async def _poll_task(
api: str,
version: int,
taskid: str,
initial_delay: float = 0.2,
) -> tuple[bool, dict[str, Any] | str]:
"""Poll a DSM async task until finished or timeout.
Args:
api: DSM API name (e.g. "SYNO.FileStation.CopyMove").
version: API version to use for the status call.
taskid: Task ID returned by the corresponding start method.
initial_delay: Seconds to wait before the first status poll.
Set to 0.0 for tasks that may finish before the first poll
interval (e.g. DirSize on small directories, MD5 on small files).
Returns:
``(True, status_dict)`` on success, or ``(False, "Error: …")`` on
@@ -76,13 +165,14 @@ def register_filestation(
from mcp_synology_filestation.client import SynologyError as _SynologyError
delay = 0.2
elapsed = 0.0
elapsed = initial_delay
timeout = 60.0
consecutive_599 = 0
if initial_delay > 0:
await asyncio.sleep(initial_delay)
while True:
await asyncio.sleep(delay)
elapsed += delay
try:
status_data = await client.request(
api,
@@ -90,9 +180,17 @@ def register_filestation(
version=version,
params={"taskid": taskid},
)
consecutive_599 = 0
except _SynologyError as e:
if e.code == 599:
# 599 can be transient (task just started, not yet available).
# Retry up to 5 times before giving up.
consecutive_599 += 1
if consecutive_599 >= 5:
return False, f"Error: {e}"
else:
return False, f"Error: {e}"
else:
if status_data.get("finished"):
return True, status_data
@@ -102,14 +200,13 @@ def register_filestation(
"Error: Operation timed out after 60 seconds — check NAS manually.",
)
await asyncio.sleep(delay)
elapsed += delay
delay = min(delay * 2, 2.0)
@mcp.tool()
async def list_shares() -> str:
"""List all shared folders visible to the authenticated user.
Returns a formatted table with share name, path, and volume status.
"""
async def list_shares():
"""List all shared folders. Returns name/path/volume-usage table."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -163,24 +260,9 @@ def register_filestation(
limit: int = 100,
sort_by: str = "name",
sort_direction: str = "asc",
) -> str:
"""List the contents of a directory on the NAS.
Use share paths as returned by list_shares (e.g. "/dev", "/data"),
not volume paths (e.g. "/volume1/dev" will not work).
Args:
path: Share-relative path on the NAS (e.g. "/dev" or "/data/photos").
offset: Number of items to skip (for pagination).
limit: Maximum items to return (1-500, default 100).
sort_by: Sort field — one of: name, size, user, group, mtime, atime,
crtime, posix, type.
sort_direction: "asc" or "desc".
Returns:
Formatted table with name and type, plus the total item count
for pagination context.
"""
):
"""List directory contents. path: share-relative (e.g. /docker).
offset/limit for pagination, sort_by/sort_direction for ordering."""
from mcp_synology_filestation.client import SynologyError
# Validate inputs
@@ -265,22 +347,9 @@ def register_filestation(
pattern: str,
recursive: bool = True,
max_results: int = 200,
) -> str:
"""Search for files matching a glob pattern within a directory.
Starts an async DSM search task, polls until complete, then cleans up.
Use share paths as returned by list_shares (e.g. "/docker").
Args:
path: Root directory to search from (e.g. "/docker").
pattern: Filename glob pattern (e.g. "*.yaml", "report*.pdf").
recursive: Search subdirectories (default True).
max_results: Maximum number of matches to return (default 200, max 1000).
Returns:
Formatted table with path, type, size, and modification time,
plus total match count.
"""
):
"""Search files by glob pattern under path. pattern: e.g. "*.yaml".
recursive/max_results optional."""
from mcp_synology_filestation.client import SynologyError
limit = max(1, min(max_results, 1000))
@@ -399,18 +468,9 @@ def register_filestation(
return "\n".join(lines)
@mcp.tool()
async def download(path: str) -> str:
"""Download a single file from the NAS and return its content as base64.
Files larger than 10 MB are rejected — use SFTP or another method instead.
Use share paths as returned by list_shares (e.g. "/docker/app/config.yaml").
Args:
path: Absolute share-relative path to the file on the NAS.
Returns:
JSON object with "filename", "size" (bytes), and "content_base64".
"""
async def download(path: str):
"""Download a file as base64 (max 10 MB). path: share-relative.
Returns JSON {filename, size, content_base64}."""
import base64
from mcp_synology_filestation.client import SynologyError
@@ -438,20 +498,9 @@ def register_filestation(
)
@mcp.tool()
async def get_info(path: str) -> str:
"""Get detailed metadata for one or more files or folders on the NAS.
Accepts a single path or a comma-separated list of paths.
Use share paths as returned by list_shares (e.g. "/dev/file.txt").
Args:
path: One or more share-relative paths, comma-separated
(e.g. "/dev/notes.txt" or "/dev/notes.txt,/data/photo.jpg").
Returns:
Formatted table with type, size, owner, permissions, and timestamps
for each requested path.
"""
async def get_info(path: str):
"""Get metadata (type/size/owner/permissions/timestamps) for one or more paths.
path: comma-separated share-relative paths."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
@@ -538,6 +587,49 @@ def register_filestation(
return "\n".join(lines)
@mcp.tool()
async def check_exist(path: str):
"""Check if one or more paths exist. path: comma-separated share-relative paths.
Returns Yes/No table."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
try:
data = await client.request(
"SYNO.FileStation.List",
"getinfo",
params={
"path": json.dumps(paths),
"additional": json.dumps([]),
},
)
except SynologyError as e:
return f"Error: {e}"
files: list[dict] = data.get("files", [])
if not files:
return "No information returned for the given path(s)."
# A path that doesn't exist still gets an entry but with name=None
rows = [(f.get("path", ""), "Yes" if f.get("name") is not None else "No") for f in files]
w_path = max(len("Path"), *(len(r[0]) for r in rows))
w_exists = len("Exists") # "Yes" / "No" always shorter
sep = f"+{'-' * (w_path + 2)}+{'-' * (w_exists + 2)}+"
header = f"| {'Path':<{w_path}} | {'Exists':<{w_exists}} |"
lines = [sep, header, sep]
for item_path, exists_str in rows:
lines.append(f"| {item_path:<{w_path}} | {exists_str:<{w_exists}} |")
lines.append(sep)
lines.append(f"\n{len(rows)} path(s) checked.")
return "\n".join(lines)
# ── write tools ───────────────────────────────────────────────────────
@mcp.tool()
@@ -545,18 +637,9 @@ def register_filestation(
path: str,
name: str,
create_parents: bool = False,
) -> str:
"""Create a new folder on the NAS.
Args:
path: Parent directory path (e.g. "/docker").
name: New folder name — not a full path (e.g. "my-app").
create_parents: Create missing intermediate parent directories
if True (default False).
Returns:
Full path of the created folder, or an Error: message.
"""
):
"""Create a folder. path: parent dir, name: folder name (not full path).
create_parents: make missing parents."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -577,17 +660,9 @@ def register_filestation(
return f"Created: {created_path}"
@mcp.tool()
async def rename(path: str, new_name: str) -> str:
"""Rename a file or folder on the NAS.
Args:
path: Absolute share-relative path to the item
(e.g. "/docker/old-name.yaml").
new_name: New name — not a full path (e.g. "new-name.yaml").
Returns:
New absolute path after rename, or an Error: message.
"""
async def rename(path: str, new_name: str):
"""Rename a file or folder. path: current share-relative path,
new_name: bare name only (not full path)."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -608,20 +683,9 @@ def register_filestation(
return f"Renamed to: {new_path}"
@mcp.tool()
async def copy(src: str, dst: str, overwrite: bool = False) -> str:
"""Copy a file or folder to a new location on the NAS.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing item at the destination.
Args:
src: Source absolute path (e.g. "/docker/app/compose.yaml").
dst: Destination directory path (e.g. "/backup/docker").
overwrite: Replace existing item at destination (default False).
Returns:
Destination path on success, or an Error: message.
"""
async def copy(src: str, dst: str, overwrite: bool = False):
"""Copy src to dst directory.
WARNING: overwrite=True replaces existing items (default False)."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -653,20 +717,9 @@ def register_filestation(
return f"Copied to: {dest_folder}/{filename}"
@mcp.tool()
async def move(src: str, dst: str, overwrite: bool = False) -> str:
"""Move a file or folder to a new location on the NAS.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing item at the destination.
Args:
src: Source absolute path (e.g. "/docker/app/old-compose.yaml").
dst: Destination directory path (e.g. "/backup/docker").
overwrite: Replace existing item at destination (default False).
Returns:
Destination path on success, or an Error: message.
"""
async def move(src: str, dst: str, overwrite: bool = False):
"""Move src to dst directory.
WARNING: overwrite=True replaces existing items (default False)."""
from mcp_synology_filestation.client import SynologyError
try:
@@ -698,21 +751,9 @@ def register_filestation(
return f"Moved to: {dest_folder}/{filename}"
@mcp.tool()
async def delete(path: str, confirmed: bool = False) -> str:
"""Delete a file or folder on the NAS.
WARNING: This operation is irreversible. Without confirmed=True,
returns only a preview — no changes are made.
Args:
path: Absolute share-relative path to delete.
confirmed: Must be True to actually delete. Defaults to False
(preview only — no DSM call).
Returns:
Preview message if confirmed=False; success or Error: message
if confirmed=True.
"""
async def delete(path: str, confirmed: bool = False):
"""Delete a file or folder. IRREVERSIBLE.
confirmed=False (default) shows preview only; pass confirmed=True to actually delete."""
from mcp_synology_filestation.client import SynologyError
if not confirmed:
@@ -744,6 +785,175 @@ def register_filestation(
return f"Deleted: {path}"
@mcp.tool()
async def compress(
paths: list[str],
dest_file_path: str,
level: str = "moderate",
mode: str = "add",
format: str = "zip",
password: str = "",
):
"""Compress paths into an archive. dest_file_path: full path incl. filename.
level: store/fastest/fast/normal/moderate/maximum. format: zip/7z."""
from mcp_synology_filestation.client import SynologyError
_valid_levels = {"store", "fastest", "fast", "normal", "moderate", "maximum"}
_valid_modes = {"add", "update", "refreshen"}
_valid_formats = {"zip", "7z"}
if level not in _valid_levels:
return f"Error: level must be one of {sorted(_valid_levels)}"
if mode not in _valid_modes:
return f"Error: mode must be one of {sorted(_valid_modes)}"
if format not in _valid_formats:
return f"Error: format must be one of {sorted(_valid_formats)}"
if not paths:
return "Error: paths list must not be empty."
try:
start_data = await client.request(
"SYNO.FileStation.Compress",
"start",
version=3,
params={
"path": json.dumps(paths),
"dest_file_path": json.dumps(dest_file_path),
"level": level,
"mode": mode,
"format": format,
"compress_password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Compress", 3, taskid)
if not ok:
return result # type: ignore[return-value]
return f"Compressed to: {dest_file_path}"
@mcp.tool()
async def extract(
file_path: str,
dest_folder_path: str,
overwrite: bool = False,
keep_dir: bool = True,
create_subfolder: bool = False,
password: str = "",
):
"""Extract a ZIP or 7z archive to dest_folder_path.
overwrite/keep_dir/create_subfolder/password optional."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.Extract",
"start",
version=2,
params={
"file_path": json.dumps(file_path),
"dest_folder_path": json.dumps(dest_folder_path),
"overwrite": "true" if overwrite else "false",
"keep_dir": "true" if keep_dir else "false",
"create_subfolder": "true" if create_subfolder else "false",
"codepage": "enu",
"password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Extract", 2, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest = status.get("dest_folder_path", dest_folder_path)
return f"Extracted to: {dest}"
@mcp.tool()
async def dir_size(path: str):
"""Get total size, file count and folder count for one or more directories.
path: comma-separated share-relative paths."""
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
ok, result = await _start_and_poll_oneshot(
"SYNO.FileStation.DirSize",
start_params={"path": json.dumps(paths)},
start_version=2,
poll_version=1,
)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
num_dir = status.get("num_dir", 0)
num_file = status.get("num_file", 0)
total_size = status.get("total_size", 0)
path_label = ", ".join(paths)
w_path = max(len("Path"), len(path_label))
num_dir_str = str(num_dir)
num_file_str = str(num_file)
size_str = _fmt_size(total_size)
sep = (
f"+{'-' * (w_path + 2)}"
f"+{'-' * (max(len('Folders'), len(num_dir_str)) + 2)}"
f"+{'-' * (max(len('Files'), len(num_file_str)) + 2)}"
f"+{'-' * (max(len('Total Size'), len(size_str)) + 2)}+"
)
w_dir = max(len("Folders"), len(num_dir_str))
w_file = max(len("Files"), len(num_file_str))
w_size = max(len("Total Size"), len(size_str))
sep = (
f"+{'-' * (w_path + 2)}+{'-' * (w_dir + 2)}+{'-' * (w_file + 2)}+{'-' * (w_size + 2)}+"
)
header = (
f"| {'Path':<{w_path}} "
f"| {'Folders':<{w_dir}} "
f"| {'Files':<{w_file}} "
f"| {'Total Size':<{w_size}} |"
)
row = (
f"| {path_label:<{w_path}} "
f"| {num_dir_str:<{w_dir}} "
f"| {num_file_str:<{w_file}} "
f"| {size_str:<{w_size}} |"
)
return "\n".join([sep, header, sep, row, sep])
@mcp.tool()
async def get_md5(path: str):
"""Compute the MD5 checksum of a file on the NAS. path: share-relative file path."""
ok, result = await _start_and_poll_oneshot(
"SYNO.FileStation.MD5",
start_params={"file_path": json.dumps(path)},
start_version=2,
poll_version=1,
)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
md5 = status.get("md5", "")
if not md5:
return "Error: DSM returned no MD5 hash."
return f"MD5 of {path}: {md5}"
@mcp.tool()
async def upload(
path: str,
@@ -751,22 +961,9 @@ def register_filestation(
content_base64: str,
overwrite: bool = False,
create_parents: bool = True,
) -> str:
"""Upload a file to a directory on the NAS from base64-encoded content.
WARNING: Set overwrite=True only when you intentionally want to replace
an existing file.
Args:
path: Destination directory path on the NAS (e.g. "/docker/app").
filename: Filename to create (e.g. "compose.yaml").
content_base64: Base64-encoded file content.
overwrite: Replace existing file at destination (default False).
create_parents: Create missing parent directories (default True).
Returns:
Full path of the uploaded file, or an Error: message.
"""
):
"""Upload base64-encoded content as filename into path (max 50 MB).
WARNING: overwrite=True replaces existing file (default False)."""
import base64
from mcp_synology_filestation.client import SynologyError
@@ -796,3 +993,148 @@ def register_filestation(
return f"Error: {e}"
return f"Uploaded: {path}/{filename}"
# ── permission + sharing tools ────────────────────────────────────────
@mcp.tool()
async def check_permission(
path: str,
filename: str,
overwrite: bool = False,
create_only: bool = False,
):
"""Check write permission for filename in path.
Returns 'Permission granted' or an error message."""
from mcp_synology_filestation.client import SynologyError
req_params: dict[str, Any] = {"path": path, "filename": filename}
if overwrite:
req_params["overwrite"] = "true"
if create_only:
req_params["create_only"] = "true"
try:
await client.request(
"SYNO.FileStation.CheckPermission",
"write",
version=3,
params=req_params,
)
except SynologyError as e:
return f"Error: {e}"
return f"Permission granted: write {filename!r} into {path}"
@mcp.tool()
async def create_sharing_link(
path: str,
password: str = "",
date_expired: str = "",
date_available: str = "",
):
"""Create a public sharing link for a file or folder.
date_expired/date_available: YYYY-MM-DD, optional."""
from mcp_synology_filestation.client import SynologyError
req_params: dict[str, Any] = {"path": json.dumps(path)}
if password:
req_params["password"] = password
if date_expired:
req_params["date_expired"] = date_expired
if date_available:
req_params["date_available"] = date_available
try:
data = await client.request(
"SYNO.FileStation.Sharing",
"create",
version=3,
params=req_params,
)
except SynologyError as e:
return f"Error: {e}"
links = data.get("links", [])
if not links:
return "Error: DSM returned no sharing link."
link = links[0]
link_id = link.get("id", "?")
url = link.get("url", "?")
has_password = link.get("has_password", False)
lines = [f"Sharing link created: {url}", f"ID: {link_id}"]
if has_password:
lines.append("Password protected: yes")
return "\n".join(lines)
@mcp.tool()
async def list_sharing_links(offset: int = 0, limit: int = 100):
"""List sharing links (paginated). Table: ID, URL, path, owner, expiry, status."""
from mcp_synology_filestation.client import SynologyError
try:
data = await client.request(
"SYNO.FileStation.Sharing",
"list",
version=3,
params={"offset": str(offset), "limit": str(limit)},
)
except SynologyError as e:
return f"Error: {e}"
links = data.get("links", [])
total = data.get("total", 0)
if not links:
return f"No sharing links found. (total={total})"
rows = []
for lnk in links:
link_id = lnk.get("id", "?")
url = lnk.get("url", "?")
lpath = lnk.get("path", "?")
owner = lnk.get("link_owner", "?")
expiry = lnk.get("date_expired", "") or "never"
status = lnk.get("status", "?")
rows.append((link_id, url, lpath, owner, expiry, status))
headers = ("ID", "URL", "Path", "Owner", "Expires", "Status")
col_widths = [max(len(h), *(len(r[i]) for r in rows)) for i, h in enumerate(headers)]
def _sep() -> str:
return "+" + "+".join("-" * (w + 2) for w in col_widths) + "+"
def _row(vals: tuple[str, ...]) -> str:
return "| " + " | ".join(f"{v:<{col_widths[i]}}" for i, v in enumerate(vals)) + " |"
lines = [_sep(), _row(headers), _sep()]
for r in rows:
lines.append(_row(r))
lines.append(_sep())
if offset + len(rows) < total:
lines.append(
f"\nShowing {offset + 1}{offset + len(rows)} of {total}. Pass offset to paginate."
)
else:
lines.append(f"\n{len(rows)} of {total} link(s).")
return "\n".join(lines)
@mcp.tool()
async def delete_sharing_link(link_id: str):
"""Delete a sharing link by its ID. IRREVERSIBLE."""
from mcp_synology_filestation.client import SynologyError
try:
await client.request(
"SYNO.FileStation.Sharing",
"delete",
version=3,
params={"id": json.dumps(link_id)},
)
except SynologyError as e:
return f"Error: {e}"
return f"Deleted sharing link: {link_id}"
+691
View File
@@ -1075,3 +1075,694 @@ async def test_upload_dsm_error(config: AppConfig) -> None:
assert result.startswith("Error:")
assert "permission" in result.lower()
# ──────────────────────────────────────────────────────────────────────────
# check_exist
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_check_exist_single_existing(config: AppConfig) -> None:
"""check_exist returns Yes for a path that exists."""
client = MagicMock()
client.request = AsyncMock(
return_value={
"files": [
{"path": "/docker", "name": "docker", "isdir": True, "additional": {}},
]
}
)
tools = _make_mcp_and_tools(config, client)
result = await tools["check_exist"](path="/docker")
assert "/docker" in result
assert "Yes" in result
assert "No" not in result
assert "1 path(s) checked" in result
@pytest.mark.asyncio
async def test_check_exist_single_missing(config: AppConfig) -> None:
"""check_exist returns No for a path that does not exist (name=None from DSM)."""
client = MagicMock()
client.request = AsyncMock(
return_value={
"files": [
{"path": "/no-such-path", "name": None, "isdir": None, "additional": None},
]
}
)
tools = _make_mcp_and_tools(config, client)
result = await tools["check_exist"](path="/no-such-path")
assert "/no-such-path" in result
assert "No" in result
assert "1 path(s) checked" in result
@pytest.mark.asyncio
async def test_check_exist_multi_path(config: AppConfig) -> None:
"""check_exist handles comma-separated paths and reports each correctly."""
client = MagicMock()
client.request = AsyncMock(
return_value={
"files": [
{"path": "/docker", "name": "docker", "isdir": True, "additional": {}},
{"path": "/ghost", "name": None, "isdir": None, "additional": None},
]
}
)
tools = _make_mcp_and_tools(config, client)
result = await tools["check_exist"](path="/docker,/ghost")
assert "/docker" in result
assert "/ghost" in result
assert "Yes" in result
assert "No" in result
assert "2 path(s) checked" in result
# Verify DSM was called with both paths as a JSON array
call_params = client.request.call_args[1]["params"]
requested_paths = json.loads(call_params["path"])
assert "/docker" in requested_paths
assert "/ghost" in requested_paths
# ──────────────────────────────────────────────────────────────────────────
# compress
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_compress_success(config: AppConfig) -> None:
"""compress polls until finished and returns the archive path."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_compress1"}
if method == "status":
return {"finished": True}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/report.pdf", "/data/photos"],
dest_file_path="/backup/archive.zip",
)
assert result == "Compressed to: /backup/archive.zip"
# Verify DSM call parameters
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.Compress"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 3
p = start_call[1]["params"]
assert json.loads(p["path"]) == ["/data/report.pdf", "/data/photos"]
assert json.loads(p["dest_file_path"]) == "/backup/archive.zip"
assert p["level"] == "moderate"
assert p["mode"] == "add"
assert p["format"] == "zip"
assert p["compress_password"] == ""
@pytest.mark.asyncio
async def test_compress_polling_multiple_rounds(config: AppConfig) -> None:
"""compress returns success after multiple polling rounds."""
client = MagicMock()
poll_calls = 0
async def _request(api, method, version=None, params=None, **kwargs):
nonlocal poll_calls
if method == "start":
return {"taskid": "FileStation_compress2"}
if method == "status":
poll_calls += 1
return {"finished": poll_calls >= 3}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/big-folder"],
dest_file_path="/backup/big.7z",
format="7z",
level="maximum",
)
assert result == "Compressed to: /backup/big.7z"
assert poll_calls == 3
@pytest.mark.asyncio
async def test_compress_dsm_error_on_start(config: AppConfig) -> None:
"""compress returns Error: when the start call fails."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("No write permission", code=1801))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/file.txt"],
dest_file_path="/backup/out.zip",
)
assert result.startswith("Error:")
assert "permission" in result.lower()
@pytest.mark.asyncio
async def test_compress_invalid_level(config: AppConfig) -> None:
"""compress rejects unknown level values before making any DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["compress"](
paths=["/data/file.txt"],
dest_file_path="/backup/out.zip",
level="ultra",
)
assert result.startswith("Error:")
assert "level" in result
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_compress_invalid_format(config: AppConfig) -> None:
"""compress rejects unknown format values before making any DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["compress"](
paths=["/data/file.txt"],
dest_file_path="/backup/out.zip",
format="tar.gz",
)
assert result.startswith("Error:")
assert "format" in result
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_compress_empty_paths(config: AppConfig) -> None:
"""compress rejects an empty paths list before making any DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["compress"](paths=[], dest_file_path="/backup/out.zip")
assert result.startswith("Error:")
assert "paths" in result.lower() or "empty" in result.lower()
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_compress_timeout(config: AppConfig) -> None:
"""compress returns an error after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_compress_timeout"}
return {"finished": False}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["compress"](
paths=["/data/huge"],
dest_file_path="/backup/huge.zip",
)
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
# ──────────────────────────────────────────────────────────────────────────
# extract
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_extract_success(config: AppConfig) -> None:
"""extract polls until finished and returns the dest_folder_path from status."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract1"}
if method == "status":
return {
"finished": True,
"dest_folder_path": "/data/extracted",
"path": "/backup/archive.zip",
"progress": 1,
}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/archive.zip",
dest_folder_path="/data/extracted",
)
assert result == "Extracted to: /data/extracted"
# Verify DSM call parameters
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.Extract"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
p = start_call[1]["params"]
assert json.loads(p["file_path"]) == "/backup/archive.zip"
assert json.loads(p["dest_folder_path"]) == "/data/extracted"
assert p["overwrite"] == "false"
assert p["keep_dir"] == "true"
assert p["create_subfolder"] == "false"
assert p["codepage"] == "enu"
assert p["password"] == ""
@pytest.mark.asyncio
async def test_extract_overwrite_and_subfolder(config: AppConfig) -> None:
"""extract passes overwrite=true and create_subfolder=true when requested."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract2"}
return {"finished": True, "dest_folder_path": "/data/out"}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
await tools["extract"](
file_path="/backup/archive.zip",
dest_folder_path="/data/out",
overwrite=True,
create_subfolder=True,
)
p = client.request.call_args_list[0][1]["params"]
assert p["overwrite"] == "true"
assert p["create_subfolder"] == "true"
@pytest.mark.asyncio
async def test_extract_dest_folder_from_status(config: AppConfig) -> None:
"""extract uses dest_folder_path from status response when available."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract3"}
return {"finished": True, "dest_folder_path": "/data/real-dest"}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/archive.zip",
dest_folder_path="/data/requested",
)
# Should report what DSM confirmed, not what we requested
assert result == "Extracted to: /data/real-dest"
@pytest.mark.asyncio
async def test_extract_dsm_error_on_start(config: AppConfig) -> None:
"""extract returns Error: when the start call fails (e.g. bad path)."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/missing.zip",
dest_folder_path="/data/out",
)
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_extract_timeout(config: AppConfig) -> None:
"""extract returns an error after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_extract_timeout"}
return {"finished": False, "progress": 0.1}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["extract"](
file_path="/backup/huge.zip",
dest_folder_path="/data/out",
)
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_check_exist_empty_path(config: AppConfig) -> None:
"""check_exist returns Error: when no path is given."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["check_exist"](path=" ")
assert result.startswith("Error:")
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_check_exist_dsm_error(config: AppConfig) -> None:
"""check_exist propagates DSM errors as Error: messages."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("Permission denied", code=105))
tools = _make_mcp_and_tools(config, client)
result = await tools["check_exist"](path="/docker")
assert result.startswith("Error:")
assert "Permission denied" in result
@pytest.mark.asyncio
async def test_check_exist_uses_getinfo(config: AppConfig) -> None:
"""check_exist uses SYNO.FileStation.List::getinfo as its DSM backend."""
client = MagicMock()
client.request = AsyncMock(
return_value={
"files": [
{"path": "/docker", "name": "docker", "isdir": True, "additional": {}},
]
}
)
tools = _make_mcp_and_tools(config, client)
await tools["check_exist"](path="/docker")
client.request.assert_called_once()
call_args = client.request.call_args
assert call_args[0][0] == "SYNO.FileStation.List"
assert call_args[0][1] == "getinfo"
# ──────────────────────────────────────────────────────────────────────────
# dir_size
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_dir_size_success(config: AppConfig) -> None:
"""dir_size polls until finished and returns a formatted table."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize1"}
if method == "status":
return {
"finished": True,
"num_dir": 4,
"num_file": 23,
"total_size": 5_242_880,
}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data")
assert "Folders" in result
assert "Files" in result
assert "Total Size" in result
assert "4" in result
assert "23" in result
assert "5 MB" in result or "MB" in result
# Verify DSM call params
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.DirSize"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
assert json.loads(start_call[1]["params"]["path"]) == ["/data"]
@pytest.mark.asyncio
async def test_dir_size_multi_path(config: AppConfig) -> None:
"""dir_size passes all comma-separated paths as a JSON array."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize2"}
return {"finished": True, "num_dir": 1, "num_file": 2, "total_size": 1024}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data, /backup")
start_params = client.request.call_args_list[0][1]["params"]
assert json.loads(start_params["path"]) == ["/data", "/backup"]
assert "/data" in result
assert "/backup" in result
@pytest.mark.asyncio
async def test_dir_size_dsm_error_on_start(config: AppConfig) -> None:
"""dir_size returns Error: when start fails."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/missing")
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_dir_size_timeout(config: AppConfig) -> None:
"""dir_size returns Error: after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_timeout"}
return {"finished": False, "num_dir": 0, "num_file": 0, "total_size": 0}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/huge")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_dir_size_empty_path(config: AppConfig) -> None:
"""dir_size returns Error: for blank path without making a DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["dir_size"](path=" ")
assert result.startswith("Error:")
client.request.assert_not_called()
@pytest.mark.asyncio
async def test_dir_size_retries_on_transient_599(config: AppConfig) -> None:
"""dir_size retries up to 4 times on code-599 then succeeds on 5th status call."""
client = MagicMock()
call_count = {"status": 0}
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_599"}
call_count["status"] += 1
if call_count["status"] < 4:
raise SynologyError("DSM error code 599", code=599)
return {"finished": True, "num_dir": 2, "num_file": 10, "total_size": 1024}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data")
assert "Total Size" in result
assert call_count["status"] == 4
@pytest.mark.asyncio
async def test_dir_size_fails_after_5_consecutive_599(config: AppConfig) -> None:
"""dir_size gives up and returns Error: after exhausting all restart attempts."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_dead"}
raise SynologyError("DSM error code 599", code=599)
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/dead")
assert result.startswith("Error:")
@pytest.mark.asyncio
async def test_dir_size_cold_start_restart(config: AppConfig) -> None:
"""dir_size restarts the task after 5 consecutive 599s and succeeds on second attempt."""
client = MagicMock()
start_count = {"n": 0}
status_count = {"n": 0}
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
start_count["n"] += 1
return {"taskid": f"task_{start_count['n']}"}
status_count["n"] += 1
# First 5 status calls → 599 (simulates cold start)
if status_count["n"] <= 5:
raise SynologyError("DSM error code 599", code=599)
# After restart: immediately done
return {"finished": True, "num_dir": 1, "num_file": 5, "total_size": 1024}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/coldstart")
assert "Total Size" in result
assert start_count["n"] == 2 # task was restarted once after cold-start 599s
# ──────────────────────────────────────────────────────────────────────────
# get_md5
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_md5_success(config: AppConfig) -> None:
"""get_md5 polls until finished and returns the MD5 string."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_1"}
if method == "status":
return {"finished": True, "md5": "d41d8cd98f00b204e9800998ecf8427e"}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/file.zip")
assert result == "MD5 of /data/file.zip: d41d8cd98f00b204e9800998ecf8427e"
# Verify DSM call params
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.MD5"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
assert json.loads(start_call[1]["params"]["file_path"]) == "/data/file.zip"
@pytest.mark.asyncio
async def test_get_md5_dsm_error_on_start(config: AppConfig) -> None:
"""get_md5 returns Error: when start fails (e.g. file not found)."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/missing.zip")
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_get_md5_timeout(config: AppConfig) -> None:
"""get_md5 returns Error: after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_timeout"}
return {"finished": False}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/huge.iso")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_get_md5_missing_hash_in_response(config: AppConfig) -> None:
"""get_md5 returns Error: when finished status contains no md5 field."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_nohash"}
return {"finished": True} # md5 field absent
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/file.zip")
assert result.startswith("Error:")
assert "md5" in result.lower() or "hash" in result.lower()
Generated
+1 -1
View File
@@ -362,7 +362,7 @@ wheels = [
[[package]]
name = "mcp-synology-filestation"
version = "0.1.0"
version = "0.2.4"
source = { editable = "." }
dependencies = [
{ name = "click" },