feat: add compress and extract tools

Implements SYNO.FileStation.Compress (v3) and SYNO.FileStation.Extract (v2)
with async polling identical to copy/move. Includes input validation for
compress (level, mode, format, empty paths) and 11 new unit tests.
Bumps version to 0.2.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-14 11:26:08 +02:00
parent dbab842738
commit 473c771c20
4 changed files with 432 additions and 2 deletions
@@ -801,6 +801,137 @@ def register_filestation(
return f"Deleted: {path}"
@mcp.tool()
async def compress(
paths: list[str],
dest_file_path: str,
level: str = "moderate",
mode: str = "add",
format: str = "zip",
password: str = "",
) -> str:
"""Compress files or folders into an archive on the NAS.
Creates a new archive asynchronously. Progress is polled until the operation
completes. Use share paths as returned by list_shares.
Args:
paths: List of share-relative paths to include in the archive
(e.g. ["/data/report.pdf", "/data/photos"]).
dest_file_path: Full destination path including filename
(e.g. "/backup/archive.zip").
level: Compression level — "store", "fastest", "fast", "normal",
"moderate" (default), or "maximum".
mode: Archive write mode — "add" (default), "update", or "refreshen".
format: Archive format — "zip" (default) or "7z".
password: Optional password to encrypt the archive (default: none).
Returns:
Path of the created archive on success, or an Error: message.
"""
from mcp_synology_filestation.client import SynologyError
_valid_levels = {"store", "fastest", "fast", "normal", "moderate", "maximum"}
_valid_modes = {"add", "update", "refreshen"}
_valid_formats = {"zip", "7z"}
if level not in _valid_levels:
return f"Error: level must be one of {sorted(_valid_levels)}"
if mode not in _valid_modes:
return f"Error: mode must be one of {sorted(_valid_modes)}"
if format not in _valid_formats:
return f"Error: format must be one of {sorted(_valid_formats)}"
if not paths:
return "Error: paths list must not be empty."
try:
start_data = await client.request(
"SYNO.FileStation.Compress",
"start",
version=3,
params={
"path": json.dumps(paths),
"dest_file_path": json.dumps(dest_file_path),
"level": level,
"mode": mode,
"format": format,
"compress_password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Compress", 3, taskid)
if not ok:
return result # type: ignore[return-value]
return f"Compressed to: {dest_file_path}"
@mcp.tool()
async def extract(
file_path: str,
dest_folder_path: str,
overwrite: bool = False,
keep_dir: bool = True,
create_subfolder: bool = False,
password: str = "",
) -> str:
"""Extract an archive file to a destination folder on the NAS.
Supports ZIP and 7z archives. Runs asynchronously; progress is polled
until the extraction completes. Use share paths as returned by list_shares.
Args:
file_path: Share-relative path to the archive file
(e.g. "/backup/archive.zip").
dest_folder_path: Destination folder for extracted contents
(e.g. "/data/extracted").
overwrite: Replace existing files at the destination (default False).
keep_dir: Preserve the directory structure inside the archive
(default True).
create_subfolder: Create a subfolder named after the archive to hold
extracted contents (default False).
password: Password for encrypted archives (default: none).
Returns:
Destination folder path on success, or an Error: message.
"""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.Extract",
"start",
version=2,
params={
"file_path": json.dumps(file_path),
"dest_folder_path": json.dumps(dest_folder_path),
"overwrite": "true" if overwrite else "false",
"keep_dir": "true" if keep_dir else "false",
"create_subfolder": "true" if create_subfolder else "false",
"codepage": "enu",
"password": password,
},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.Extract", 2, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
dest = status.get("dest_folder_path", dest_folder_path)
return f"Extracted to: {dest}"
@mcp.tool()
async def upload(
path: str,