feat: add search and download tools

Implements SYNO.FileStation.Search with async polling (exponential
backoff 200ms→2s, 60s timeout) and SYNO.FileStation.Download with
base64 output and a 10 MB size cap.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-14 09:43:48 +02:00
parent 1bccf1e5d2
commit c47221dc8f
3 changed files with 441 additions and 1 deletions
@@ -2,6 +2,8 @@
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
from typing import TYPE_CHECKING
@@ -212,6 +214,182 @@ def register_filestation(
return "\n".join(lines)
@mcp.tool()
async def search(
path: str,
pattern: str,
recursive: bool = True,
max_results: int = 200,
) -> str:
"""Search for files matching a glob pattern within a directory.
Starts an async DSM search task, polls until complete, then cleans up.
Use share paths as returned by list_shares (e.g. "/docker").
Args:
path: Root directory to search from (e.g. "/docker").
pattern: Filename glob pattern (e.g. "*.yaml", "report*.pdf").
recursive: Search subdirectories (default True).
max_results: Maximum number of matches to return (default 200, max 1000).
Returns:
Formatted table with path, type, size, and modification time,
plus total match count.
"""
from mcp_synology_filestation.client import SynologyError
limit = max(1, min(max_results, 1000))
# 1. Start search task
try:
start_data = await client.request(
"SYNO.FileStation.Search",
"start",
params={
"folder_path": path,
"recursive": "true" if recursive else "false",
"pattern": pattern,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a search task ID."
# 2. Poll until finished=True (exponential backoff: 200 ms → 2 s, timeout 60 s)
delay = 0.2
elapsed = 0.0
timeout = 60.0
files: list[dict] = []
while True:
await asyncio.sleep(delay)
elapsed += delay
try:
poll_data = await client.request(
"SYNO.FileStation.Search",
"list",
params={
"taskid": taskid,
"offset": 0,
"limit": limit,
"additional": json.dumps(["size", "time"]),
},
)
except SynologyError as e:
# Best-effort cleanup before surfacing the error
with contextlib.suppress(SynologyError):
await client.request(
"SYNO.FileStation.Search", "clean", params={"taskid": taskid}
)
return f"Error: {e}"
files = poll_data.get("files", [])
finished: bool = poll_data.get("finished", False)
if finished:
break
if elapsed >= timeout:
with contextlib.suppress(SynologyError):
await client.request(
"SYNO.FileStation.Search", "clean", params={"taskid": taskid}
)
return "Error: Search timed out after 60 seconds."
delay = min(delay * 2, 2.0)
# 3. Clean up the search task
with contextlib.suppress(SynologyError):
await client.request("SYNO.FileStation.Search", "clean", params={"taskid": taskid})
if not files:
return f"No files matching '{pattern}' found under '{path}'."
# 4. Format results
rows = []
for f in files:
item_path = f.get("path") or f.get("name", "")
is_dir = f.get("isdir", False)
ftype = "dir" if is_dir else "file"
add = f.get("additional", {})
size_str = "-" if is_dir else _fmt_size(add.get("size"))
mtime_str = _fmt_time((add.get("time") or {}).get("mtime"))
rows.append((item_path, ftype, size_str, mtime_str))
w_path = max(len("Path"), *(len(r[0]) for r in rows))
w_type = max(len("Type"), *(len(r[1]) for r in rows))
w_size = max(len("Size"), *(len(r[2]) for r in rows))
w_mtime = max(len("Modified"), *(len(r[3]) for r in rows))
sep = (
f"+{'-' * (w_path + 2)}"
f"+{'-' * (w_type + 2)}"
f"+{'-' * (w_size + 2)}"
f"+{'-' * (w_mtime + 2)}+"
)
header = (
f"| {'Path':<{w_path}} "
f"| {'Type':<{w_type}} "
f"| {'Size':<{w_size}} "
f"| {'Modified':<{w_mtime}} |"
)
lines = [f"Search: '{pattern}' under '{path}'", sep, header, sep]
for item_path, ftype, size_str, mtime_str in rows:
lines.append(
f"| {item_path:<{w_path}} "
f"| {ftype:<{w_type}} "
f"| {size_str:<{w_size}} "
f"| {mtime_str:<{w_mtime}} |"
)
lines.append(sep)
lines.append(f"\n{len(rows)} match(es) found.")
return "\n".join(lines)
@mcp.tool()
async def download(path: str) -> str:
"""Download a single file from the NAS and return its content as base64.
Files larger than 10 MB are rejected — use SFTP or another method instead.
Use share paths as returned by list_shares (e.g. "/docker/app/config.yaml").
Args:
path: Absolute share-relative path to the file on the NAS.
Returns:
JSON object with "filename", "size" (bytes), and "content_base64".
"""
import base64
from mcp_synology_filestation.client import SynologyError
max_download_bytes = 10 * 1024 * 1024 # 10 MB
try:
filename, content = await client.download_bytes(path)
except SynologyError as e:
return f"Error: {e}"
size = len(content)
if size > max_download_bytes:
return (
f"Error: File '{filename}' is {_fmt_size(size)}, which exceeds the 10 MB limit "
"for MCP downloads. Use SFTP or another file-transfer method instead."
)
return json.dumps(
{
"filename": filename,
"size": size,
"content_base64": base64.b64encode(content).decode(),
}
)
@mcp.tool()
async def get_info(path: str) -> str:
"""Get detailed metadata for one or more files or folders on the NAS.