4c6de3bfc7
- background_tasks: SYNO.FileStation.BackgroundTask::list (v3) — paginated table of active/recent copy/move/delete/extract/compress tasks - list_snapshots: SYNO.FileStation.Snapshot::list (v2) — Btrfs snapshots per share; maps error 400 to a clear Btrfs-required message - 20 new tests (107 total) - SPEC.md and CLAUDE.md updated (26 tools) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1983 lines
72 KiB
Python
1983 lines
72 KiB
Python
"""Tests for tools/filestation.py: list_shares and list_dir."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import json
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
from mcp_synology_filestation.client import SynologyError
|
||
from mcp_synology_filestation.config import AppConfig, ConnectionConfig
|
||
|
||
|
||
@pytest.fixture()
|
||
def config() -> AppConfig:
|
||
return AppConfig(
|
||
schema_version=1,
|
||
connection=ConnectionConfig(host="nas.example.com"),
|
||
)
|
||
|
||
|
||
def _make_mcp_and_tools(config: AppConfig, client: MagicMock) -> dict:
|
||
"""Register FileStation tools on a mock FastMCP and collect them by name."""
|
||
from mcp_synology_filestation.tools.filestation import register_filestation
|
||
|
||
registered: dict[str, object] = {}
|
||
|
||
mcp = MagicMock()
|
||
|
||
def tool_decorator():
|
||
"""Simulate @mcp.tool() — capture the decorated function."""
|
||
|
||
def decorator(fn):
|
||
registered[fn.__name__] = fn
|
||
return fn
|
||
|
||
return decorator
|
||
|
||
mcp.tool = tool_decorator
|
||
register_filestation(mcp, config, client)
|
||
return registered
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# list_shares
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_shares_success(config: AppConfig) -> None:
|
||
"""list_shares returns a formatted table on success."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"shares": [
|
||
{
|
||
"name": "data",
|
||
"path": "/data",
|
||
"additional": {
|
||
"real_path": "/volume1/data",
|
||
"volume_status": {
|
||
"totalspace": 2_000_000_000,
|
||
"usedspace": 500_000_000,
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"name": "photos",
|
||
"path": "/photos",
|
||
"additional": {
|
||
"real_path": "/volume1/photos",
|
||
"volume_status": {},
|
||
},
|
||
},
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_shares"]()
|
||
|
||
assert "data" in result
|
||
assert "/data" in result # share path, not volume path
|
||
assert "/volume1/data" not in result
|
||
assert "photos" in result
|
||
assert "2 share(s) found" in result
|
||
# Check usage percentage appears
|
||
assert "%" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_shares_empty(config: AppConfig) -> None:
|
||
"""list_shares returns a friendly message when no shares exist."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"shares": []})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_shares"]()
|
||
|
||
assert "No shared folders" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_shares_dsm_error(config: AppConfig) -> None:
|
||
"""list_shares returns an Error: message on SynologyError."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
side_effect=SynologyError("Permission denied — check DSM user permissions", code=105)
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_shares"]()
|
||
|
||
assert result.startswith("Error:")
|
||
assert "Permission denied" in result
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# list_dir
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_success(config: AppConfig) -> None:
|
||
"""list_dir returns a formatted table with name, type, size, and modified columns."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"total": 3,
|
||
"files": [
|
||
{
|
||
"name": "documents",
|
||
"isdir": True,
|
||
"additional": {"size": 0, "time": {"mtime": 1700000000}},
|
||
},
|
||
{
|
||
"name": "photo.jpg",
|
||
"isdir": False,
|
||
"additional": {"size": 2_048_000, "time": {"mtime": 1710000000}},
|
||
},
|
||
{
|
||
"name": "readme.txt",
|
||
"isdir": False,
|
||
"additional": {"size": 512, "time": {"mtime": 1720000000}},
|
||
},
|
||
],
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_dir"](path="/dev")
|
||
|
||
assert "documents" in result
|
||
assert "photo.jpg" in result
|
||
assert "readme.txt" in result
|
||
assert "dir" in result
|
||
assert "file" in result
|
||
# Size column: dirs show "-", files show human-readable size
|
||
assert "2 MB" in result or "1 MB" in result # photo.jpg ~2 MB
|
||
assert "512 B" in result # readme.txt
|
||
# Modified column present
|
||
assert "Modified" in result
|
||
assert "Showing 1–3 of 3 item(s)" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_pagination(config: AppConfig) -> None:
|
||
"""list_dir shows pagination hint when more items are available."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"total": 200,
|
||
"files": [
|
||
{
|
||
"name": f"file{i}.txt",
|
||
"isdir": False,
|
||
"additional": {"size": 100, "time": {"mtime": 1700000000}},
|
||
}
|
||
for i in range(100)
|
||
],
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_dir"](path="/volume1/data", limit=100)
|
||
|
||
assert "Showing 1–100 of 200" in result
|
||
assert "offset=100" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_empty(config: AppConfig) -> None:
|
||
"""list_dir returns a friendly message for empty directories."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"total": 0, "files": []})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_dir"](path="/volume1/empty")
|
||
|
||
assert "empty" in result.lower() or "does not exist" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_invalid_sort_by(config: AppConfig) -> None:
|
||
"""list_dir returns an Error: message for invalid sort_by."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_dir"](path="/volume1/data", sort_by="invalid_field")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "sort_by" in result
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_invalid_sort_direction(config: AppConfig) -> None:
|
||
"""list_dir returns an Error: message for invalid sort_direction."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_dir"](path="/volume1/data", sort_direction="random")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "sort_direction" in result
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_limit_clamped(config: AppConfig) -> None:
|
||
"""list_dir clamps limit to _MAX_LIMIT (500)."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"total": 1, "files": []})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
await tools["list_dir"](path="/volume1/data", limit=9999)
|
||
|
||
call_params = client.request.call_args[1]["params"]
|
||
assert call_params["limit"] == 500
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_dir_dsm_error(config: AppConfig) -> None:
|
||
"""list_dir returns Error: on SynologyError (e.g. path not found)."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["list_dir"](path="/volume1/nonexistent")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# get_info
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_info_single_file(config: AppConfig) -> None:
|
||
"""get_info returns a table with metadata for a single file."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{
|
||
"path": "/dev/notes.txt",
|
||
"name": "notes.txt",
|
||
"isdir": False,
|
||
"additional": {
|
||
"real_path": "/volume1/dev/notes.txt",
|
||
"size": 1024,
|
||
"time": {"mtime": 1700000000, "crtime": 1690000000},
|
||
"owner": {"user": "marcus", "group": "users"},
|
||
"perm": {"posix": 0o644},
|
||
},
|
||
}
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["get_info"](path="/dev/notes.txt")
|
||
|
||
assert "/dev/notes.txt" in result
|
||
assert "file" in result
|
||
assert "1 KB" in result or "1024 B" in result
|
||
assert "marcus" in result
|
||
assert "users" in result
|
||
assert "644" in result
|
||
assert "/volume1/dev/notes.txt" in result
|
||
assert "1 item(s)" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_info_directory(config: AppConfig) -> None:
|
||
"""get_info shows '-' for size of a directory."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{
|
||
"path": "/dev",
|
||
"name": "dev",
|
||
"isdir": True,
|
||
"additional": {
|
||
"real_path": "/volume1/dev",
|
||
"size": 0,
|
||
"time": {"mtime": 1700000000, "crtime": 1690000000},
|
||
"owner": {"user": "marcus", "group": "users"},
|
||
"perm": {"posix": 0o755},
|
||
},
|
||
}
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["get_info"](path="/dev")
|
||
|
||
assert "dir" in result
|
||
assert "755" in result
|
||
# Size for directory should be "-"
|
||
rows = [line for line in result.splitlines() if "/dev" in line and "|" in line]
|
||
assert rows, "expected a data row containing /dev"
|
||
size_col = rows[0].split("|")[3].strip()
|
||
assert size_col == "-"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_info_multiple_paths(config: AppConfig) -> None:
|
||
"""get_info handles comma-separated paths and returns one row per item."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{
|
||
"path": "/dev/a.txt",
|
||
"name": "a.txt",
|
||
"isdir": False,
|
||
"additional": {
|
||
"size": 100,
|
||
"time": {"mtime": 1700000000, "crtime": 1690000000},
|
||
"owner": {"user": "marcus", "group": "users"},
|
||
"perm": {"posix": 0o644},
|
||
},
|
||
},
|
||
{
|
||
"path": "/data/b.txt",
|
||
"name": "b.txt",
|
||
"isdir": False,
|
||
"additional": {
|
||
"size": 200,
|
||
"time": {"mtime": 1700000001, "crtime": 1690000001},
|
||
"owner": {"user": "marcus", "group": "users"},
|
||
"perm": {"posix": 0o644},
|
||
},
|
||
},
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["get_info"](path="/dev/a.txt,/data/b.txt")
|
||
|
||
assert "/dev/a.txt" in result
|
||
assert "/data/b.txt" in result
|
||
assert "2 item(s)" in result
|
||
|
||
# Verify the API received paths as a JSON array (confirmed working format)
|
||
call_params = client.request.call_args[1]["params"]
|
||
assert call_params["path"] == json.dumps(["/dev/a.txt", "/data/b.txt"])
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_info_empty_path(config: AppConfig) -> None:
|
||
"""get_info returns Error: when path is empty."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["get_info"](path=" ")
|
||
|
||
assert result.startswith("Error:")
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_info_dsm_error(config: AppConfig) -> None:
|
||
"""get_info returns Error: on SynologyError."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["get_info"](path="/dev/missing.txt")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_info_uses_getinfo_method(config: AppConfig) -> None:
|
||
"""get_info calls SYNO.FileStation.List with method='getinfo'."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"files": []})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
await tools["get_info"](path="/dev/file.txt")
|
||
|
||
client.request.assert_called_once()
|
||
call_args = client.request.call_args
|
||
assert call_args[0][0] == "SYNO.FileStation.List"
|
||
assert call_args[0][1] == "getinfo"
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# search
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
_SEARCH_FILE = {
|
||
"path": "/docker/app/compose.yaml",
|
||
"name": "compose.yaml",
|
||
"isdir": False,
|
||
"additional": {"size": 1024, "time": {"mtime": 1700000000}},
|
||
}
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_success(config: AppConfig) -> None:
|
||
"""search returns a formatted table after polling two rounds until finished=True."""
|
||
client = MagicMock()
|
||
|
||
call_count = 0
|
||
|
||
async def _request(api, method, **kwargs):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
if method == "start":
|
||
return {"taskid": "abc123", "has_not_index_share": False}
|
||
if method == "list":
|
||
# First poll: not finished yet; second poll: finished
|
||
finished = call_count >= 4 # start=1, list1=2, list2=3 → finished on call 3
|
||
return {"files": [_SEARCH_FILE], "finished": finished, "total": 1}
|
||
if method == "clean":
|
||
return {}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["search"](path="/docker", pattern="*.yaml")
|
||
|
||
assert "/docker/app/compose.yaml" in result
|
||
assert "file" in result
|
||
assert "1 match(es) found" in result
|
||
# Verify start was called with correct params
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[0][0] == "SYNO.FileStation.Search"
|
||
assert start_call[0][1] == "start"
|
||
assert json.loads(start_call[1]["params"]["folder_path"]) == ["/docker"]
|
||
assert start_call[1]["params"]["pattern"] == "*.yaml"
|
||
assert start_call[1]["params"]["recursive"] == "true"
|
||
# Verify clean was called last
|
||
last_call = client.request.call_args_list[-1]
|
||
assert last_call[0][1] == "clean"
|
||
assert last_call[1]["params"]["taskid"] == "abc123"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_polls_until_finished(config: AppConfig) -> None:
|
||
"""search keeps polling when finished=False and stops once finished=True."""
|
||
client = MagicMock()
|
||
poll_calls = 0
|
||
|
||
async def _request(api, method, **kwargs):
|
||
nonlocal poll_calls
|
||
if method == "start":
|
||
return {"taskid": "t1"}
|
||
if method == "list":
|
||
poll_calls += 1
|
||
return {
|
||
"files": [_SEARCH_FILE],
|
||
"finished": poll_calls >= 3, # finish on third list call
|
||
"total": 1,
|
||
}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["search"](path="/docker", pattern="*.yaml")
|
||
|
||
assert poll_calls == 3
|
||
assert "1 match(es) found" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_empty_final_poll(config: AppConfig) -> None:
|
||
"""search returns results from an earlier poll when the final finished=True poll is empty.
|
||
|
||
DSM can return files=[]] on the finishing poll even when results exist — the tool
|
||
must retain the last non-empty result set rather than overwriting with [].
|
||
"""
|
||
client = MagicMock()
|
||
poll_calls = 0
|
||
|
||
async def _request(api, method, **kwargs):
|
||
nonlocal poll_calls
|
||
if method == "start":
|
||
return {"taskid": "t_empty_final"}
|
||
if method == "list":
|
||
poll_calls += 1
|
||
if poll_calls == 1:
|
||
# First poll: results available, not yet finished
|
||
return {"files": [_SEARCH_FILE], "finished": False, "total": 1}
|
||
# Second poll: finished, but DSM returns empty files
|
||
return {"files": [], "finished": True, "total": 1}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["search"](path="/docker", pattern="*.yaml")
|
||
|
||
# Must surface the result from the first poll, not treat the empty final as "no results"
|
||
assert "1 match(es) found" in result
|
||
assert "/docker/app/compose.yaml" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_no_results(config: AppConfig) -> None:
|
||
"""search returns a friendly message when no files are found."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "t2"}
|
||
if method == "list":
|
||
return {"files": [], "finished": True, "total": 0}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["search"](path="/docker", pattern="*.txt")
|
||
|
||
assert "No files" in result or "not found" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_start_dsm_error(config: AppConfig) -> None:
|
||
"""search returns Error: immediately when the start call fails."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
side_effect=SynologyError("Permission denied — check DSM user permissions", code=105)
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["search"](path="/docker", pattern="*.yaml")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "Permission denied" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_list_dsm_error(config: AppConfig) -> None:
|
||
"""search returns Error: and cleans up when a list poll fails."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "t3"}
|
||
if method == "list":
|
||
raise SynologyError("Unknown error", code=100)
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["search"](path="/docker", pattern="*.yaml")
|
||
|
||
assert result.startswith("Error:")
|
||
# clean should have been attempted
|
||
clean_calls = [c for c in client.request.call_args_list if c[0][1] == "clean"]
|
||
assert len(clean_calls) == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_recursive_false(config: AppConfig) -> None:
|
||
"""search passes recursive=false when recursive=False."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "t4"}
|
||
if method == "list":
|
||
return {"files": [], "finished": True, "total": 0}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
await tools["search"](path="/docker", pattern="*.yaml", recursive=False)
|
||
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[1]["params"]["recursive"] == "false"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_search_additional_format(config: AppConfig) -> None:
|
||
"""search uses json.dumps(["size","time"]) for additional parameter."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "t5"}
|
||
if method == "list":
|
||
return {"files": [], "finished": True, "total": 0}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
await tools["search"](path="/docker", pattern="*.yaml")
|
||
|
||
list_call = client.request.call_args_list[1]
|
||
assert list_call[1]["params"]["additional"] == json.dumps(["size", "time"])
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# download
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_success(config: AppConfig) -> None:
|
||
"""download returns JSON with filename, size, and valid base64 content."""
|
||
content = b"hello, world"
|
||
client = MagicMock()
|
||
client.download_bytes = AsyncMock(return_value=("compose.yaml", content))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["download"](path="/docker/app/compose.yaml")
|
||
|
||
parsed = json.loads(result)
|
||
assert parsed["filename"] == "compose.yaml"
|
||
assert parsed["size"] == len(content)
|
||
assert base64.b64decode(parsed["content_base64"]) == content
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_size_limit(config: AppConfig) -> None:
|
||
"""download returns Error: when file exceeds 10 MB."""
|
||
large_content = b"x" * (10 * 1024 * 1024 + 1)
|
||
client = MagicMock()
|
||
client.download_bytes = AsyncMock(return_value=("bigfile.bin", large_content))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["download"](path="/data/bigfile.bin")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "10 MB" in result or "exceeds" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_dsm_error(config: AppConfig) -> None:
|
||
"""download returns Error: on SynologyError."""
|
||
client = MagicMock()
|
||
client.download_bytes = AsyncMock(
|
||
side_effect=SynologyError("File or folder not found", code=1800)
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["download"](path="/data/missing.txt")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_exactly_10mb(config: AppConfig) -> None:
|
||
"""download accepts files exactly at the 10 MB boundary."""
|
||
content = b"x" * (10 * 1024 * 1024)
|
||
client = MagicMock()
|
||
client.download_bytes = AsyncMock(return_value=("edge.bin", content))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["download"](path="/data/edge.bin")
|
||
|
||
parsed = json.loads(result)
|
||
assert parsed["size"] == len(content)
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# create_folder
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_create_folder_success(config: AppConfig) -> None:
|
||
"""create_folder returns the path of the created folder."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={"folders": [{"isdir": True, "name": "new-app", "path": "/docker/new-app"}]}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["create_folder"](path="/docker", name="new-app")
|
||
|
||
assert result == "Created: /docker/new-app"
|
||
call_params = client.request.call_args[1]["params"]
|
||
assert call_params["folder_path"] == json.dumps("/docker")
|
||
assert call_params["name"] == json.dumps("new-app")
|
||
assert call_params["force_parent"] == "false"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_create_folder_dsm_error(config: AppConfig) -> None:
|
||
"""create_folder returns Error: on SynologyError."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("No write permission", code=1801))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["create_folder"](path="/docker", name="new-app")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "permission" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_create_folder_create_parents(config: AppConfig) -> None:
|
||
"""create_folder passes force_parent=true when create_parents=True."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"folders": []})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
await tools["create_folder"](path="/docker/deep/path", name="new-app", create_parents=True)
|
||
|
||
call_params = client.request.call_args[1]["params"]
|
||
assert call_params["force_parent"] == "true"
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# rename
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rename_success(config: AppConfig) -> None:
|
||
"""rename returns the new path of the renamed item."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={"files": [{"isdir": False, "name": "new.yaml", "path": "/docker/new.yaml"}]}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["rename"](path="/docker/old.yaml", new_name="new.yaml")
|
||
|
||
assert result == "Renamed to: /docker/new.yaml"
|
||
call_params = client.request.call_args[1]["params"]
|
||
assert call_params["path"] == json.dumps("/docker/old.yaml")
|
||
assert call_params["name"] == json.dumps("new.yaml")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rename_dsm_error(config: AppConfig) -> None:
|
||
"""rename returns Error: on SynologyError."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["rename"](path="/docker/missing.yaml", new_name="new.yaml")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# copy
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_copy_success_with_polling(config: AppConfig) -> None:
|
||
"""copy polls two rounds then returns the destination path."""
|
||
client = MagicMock()
|
||
poll_calls = 0
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
nonlocal poll_calls
|
||
if method == "start":
|
||
return {"taskid": "FileStation_copy1"}
|
||
if method == "status":
|
||
poll_calls += 1
|
||
finished = poll_calls >= 2
|
||
return {
|
||
"finished": finished,
|
||
"progress": 1.0 if finished else 0.5,
|
||
"dest_folder_path": "/backup/docker",
|
||
}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["copy"](src="/docker/app/compose.yaml", dst="/backup/docker")
|
||
|
||
assert result == "Copied to: /backup/docker/compose.yaml"
|
||
assert poll_calls == 2
|
||
# Verify start used correct params
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[0][0] == "SYNO.FileStation.CopyMove"
|
||
assert start_call[0][1] == "start"
|
||
start_params = start_call[1]["params"]
|
||
assert start_params["path"] == json.dumps("/docker/app/compose.yaml")
|
||
assert start_params["dest_folder_path"] == json.dumps("/backup/docker")
|
||
assert start_params["remove_src"] == "false"
|
||
assert start_params["overwrite"] == "false"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_copy_timeout(config: AppConfig) -> None:
|
||
"""copy returns an error message after polling times out."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_copy_timeout"}
|
||
return {"finished": False, "progress": 0.1}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["copy"](src="/docker/big.tar", dst="/backup")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "timed out" in result.lower() or "60 seconds" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_copy_dsm_error_on_start(config: AppConfig) -> None:
|
||
"""copy returns Error: when the start call fails."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("No write permission", code=1801))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["copy"](src="/docker/app.yaml", dst="/backup")
|
||
|
||
assert result.startswith("Error:")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# move
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_move_success(config: AppConfig) -> None:
|
||
"""move returns destination path and passes remove_src=true."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_move1"}
|
||
return {"finished": True, "dest_folder_path": "/archive/docker"}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["move"](src="/docker/old.yaml", dst="/archive/docker")
|
||
|
||
assert result == "Moved to: /archive/docker/old.yaml"
|
||
start_params = client.request.call_args_list[0][1]["params"]
|
||
assert start_params["remove_src"] == "true"
|
||
assert start_params["path"] == json.dumps("/docker/old.yaml")
|
||
assert start_params["dest_folder_path"] == json.dumps("/archive/docker")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_move_dsm_error(config: AppConfig) -> None:
|
||
"""move returns Error: when the start call fails."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["move"](src="/docker/missing.yaml", dst="/backup")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# delete
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_delete_preview_no_dsm_call(config: AppConfig) -> None:
|
||
"""delete with confirmed=False returns a preview and makes no DSM requests."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["delete"](path="/docker/app", confirmed=False)
|
||
|
||
client.request.assert_not_called()
|
||
assert "/docker/app" in result
|
||
assert "confirmed=True" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_delete_confirmed_with_polling(config: AppConfig) -> None:
|
||
"""delete with confirmed=True polls until finished and returns success."""
|
||
client = MagicMock()
|
||
poll_calls = 0
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
nonlocal poll_calls
|
||
if method == "start":
|
||
return {"taskid": "FileStation_del1"}
|
||
if method == "status":
|
||
poll_calls += 1
|
||
return {"finished": poll_calls >= 2, "processed_num": poll_calls}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["delete"](path="/docker/old-app", confirmed=True)
|
||
|
||
assert result == "Deleted: /docker/old-app"
|
||
start_params = client.request.call_args_list[0][1]["params"]
|
||
assert start_params["path"] == json.dumps("/docker/old-app")
|
||
assert start_params["recursive"] == "true"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_delete_dsm_error_on_start(config: AppConfig) -> None:
|
||
"""delete returns Error: when the start call fails."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
side_effect=SynologyError("Permission denied — check DSM user permissions", code=105)
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["delete"](path="/docker/app", confirmed=True)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "Permission denied" in result
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# upload
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upload_success(config: AppConfig) -> None:
|
||
"""upload decodes base64, calls upload_bytes, and returns the full path."""
|
||
import base64 as _b64
|
||
|
||
raw = b"version: '3'\nservices:\n app:\n image: nginx\n"
|
||
encoded = _b64.b64encode(raw).decode()
|
||
|
||
client = MagicMock()
|
||
client.upload_bytes = AsyncMock(return_value={})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["upload"](
|
||
path="/docker/app", filename="compose.yaml", content_base64=encoded
|
||
)
|
||
|
||
assert result == "Uploaded: /docker/app/compose.yaml"
|
||
client.upload_bytes.assert_called_once()
|
||
call_kwargs = client.upload_bytes.call_args[1]
|
||
assert call_kwargs["dest_folder"] == "/docker/app"
|
||
assert call_kwargs["filename"] == "compose.yaml"
|
||
assert call_kwargs["content"] == raw
|
||
assert call_kwargs["overwrite"] is False
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upload_too_large(config: AppConfig) -> None:
|
||
"""upload returns Error: when decoded content exceeds 50 MB."""
|
||
import base64 as _b64
|
||
|
||
large = _b64.b64encode(b"x" * (50 * 1024 * 1024 + 1)).decode()
|
||
|
||
client = MagicMock()
|
||
client.upload_bytes = AsyncMock(return_value={})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["upload"](path="/data", filename="big.bin", content_base64=large)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "50 MB" in result or "exceeds" in result
|
||
client.upload_bytes.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upload_create_parents(config: AppConfig) -> None:
|
||
"""upload passes create_parents=True to upload_bytes."""
|
||
import base64 as _b64
|
||
|
||
client = MagicMock()
|
||
client.upload_bytes = AsyncMock(return_value={})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
await tools["upload"](
|
||
path="/docker/deep/path",
|
||
filename="file.txt",
|
||
content_base64=_b64.b64encode(b"hello").decode(),
|
||
create_parents=True,
|
||
)
|
||
|
||
call_kwargs = client.upload_bytes.call_args[1]
|
||
assert call_kwargs["create_parents"] is True
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upload_invalid_base64(config: AppConfig) -> None:
|
||
"""upload returns Error: when content_base64 is not valid base64."""
|
||
client = MagicMock()
|
||
client.upload_bytes = AsyncMock(return_value={})
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["upload"](path="/docker", filename="f.txt", content_base64="not-base64!!!")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "base64" in result.lower()
|
||
client.upload_bytes.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upload_dsm_error(config: AppConfig) -> None:
|
||
"""upload returns Error: on SynologyError from upload_bytes."""
|
||
import base64 as _b64
|
||
|
||
client = MagicMock()
|
||
client.upload_bytes = AsyncMock(side_effect=SynologyError("No write permission", code=1801))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["upload"](
|
||
path="/docker",
|
||
filename="compose.yaml",
|
||
content_base64=_b64.b64encode(b"data").decode(),
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "permission" in result.lower()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# check_exist
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_exist_single_existing(config: AppConfig) -> None:
|
||
"""check_exist returns Yes for a path that exists."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{"path": "/docker", "name": "docker", "isdir": True, "additional": {}},
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["check_exist"](path="/docker")
|
||
|
||
assert "/docker" in result
|
||
assert "Yes" in result
|
||
assert "No" not in result
|
||
assert "1 path(s) checked" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_exist_single_missing(config: AppConfig) -> None:
|
||
"""check_exist returns No for a path that does not exist (name=None from DSM)."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{"path": "/no-such-path", "name": None, "isdir": None, "additional": None},
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["check_exist"](path="/no-such-path")
|
||
|
||
assert "/no-such-path" in result
|
||
assert "No" in result
|
||
assert "1 path(s) checked" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_exist_multi_path(config: AppConfig) -> None:
|
||
"""check_exist handles comma-separated paths and reports each correctly."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{"path": "/docker", "name": "docker", "isdir": True, "additional": {}},
|
||
{"path": "/ghost", "name": None, "isdir": None, "additional": None},
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["check_exist"](path="/docker,/ghost")
|
||
|
||
assert "/docker" in result
|
||
assert "/ghost" in result
|
||
assert "Yes" in result
|
||
assert "No" in result
|
||
assert "2 path(s) checked" in result
|
||
|
||
# Verify DSM was called with both paths as a JSON array
|
||
call_params = client.request.call_args[1]["params"]
|
||
requested_paths = json.loads(call_params["path"])
|
||
assert "/docker" in requested_paths
|
||
assert "/ghost" in requested_paths
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# compress
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_success(config: AppConfig) -> None:
|
||
"""compress polls until finished and returns the archive path."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_compress1"}
|
||
if method == "status":
|
||
return {"finished": True}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["compress"](
|
||
paths=["/data/report.pdf", "/data/photos"],
|
||
dest_file_path="/backup/archive.zip",
|
||
)
|
||
|
||
assert result == "Compressed to: /backup/archive.zip"
|
||
|
||
# Verify DSM call parameters
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[0][0] == "SYNO.FileStation.Compress"
|
||
assert start_call[0][1] == "start"
|
||
assert start_call[1]["version"] == 3
|
||
p = start_call[1]["params"]
|
||
assert json.loads(p["path"]) == ["/data/report.pdf", "/data/photos"]
|
||
assert json.loads(p["dest_file_path"]) == "/backup/archive.zip"
|
||
assert p["level"] == "moderate"
|
||
assert p["mode"] == "add"
|
||
assert p["format"] == "zip"
|
||
assert p["compress_password"] == ""
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_polling_multiple_rounds(config: AppConfig) -> None:
|
||
"""compress returns success after multiple polling rounds."""
|
||
client = MagicMock()
|
||
poll_calls = 0
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
nonlocal poll_calls
|
||
if method == "start":
|
||
return {"taskid": "FileStation_compress2"}
|
||
if method == "status":
|
||
poll_calls += 1
|
||
return {"finished": poll_calls >= 3}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["compress"](
|
||
paths=["/data/big-folder"],
|
||
dest_file_path="/backup/big.7z",
|
||
format="7z",
|
||
level="maximum",
|
||
)
|
||
|
||
assert result == "Compressed to: /backup/big.7z"
|
||
assert poll_calls == 3
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_dsm_error_on_start(config: AppConfig) -> None:
|
||
"""compress returns Error: when the start call fails."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("No write permission", code=1801))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["compress"](
|
||
paths=["/data/file.txt"],
|
||
dest_file_path="/backup/out.zip",
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "permission" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_invalid_level(config: AppConfig) -> None:
|
||
"""compress rejects unknown level values before making any DSM call."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["compress"](
|
||
paths=["/data/file.txt"],
|
||
dest_file_path="/backup/out.zip",
|
||
level="ultra",
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "level" in result
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_invalid_format(config: AppConfig) -> None:
|
||
"""compress rejects unknown format values before making any DSM call."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["compress"](
|
||
paths=["/data/file.txt"],
|
||
dest_file_path="/backup/out.zip",
|
||
format="tar.gz",
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "format" in result
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_empty_paths(config: AppConfig) -> None:
|
||
"""compress rejects an empty paths list before making any DSM call."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["compress"](paths=[], dest_file_path="/backup/out.zip")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "paths" in result.lower() or "empty" in result.lower()
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_compress_timeout(config: AppConfig) -> None:
|
||
"""compress returns an error after polling times out."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_compress_timeout"}
|
||
return {"finished": False}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["compress"](
|
||
paths=["/data/huge"],
|
||
dest_file_path="/backup/huge.zip",
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "timed out" in result.lower() or "60 seconds" in result
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# extract
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_success(config: AppConfig) -> None:
|
||
"""extract polls until finished and returns the dest_folder_path from status."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_extract1"}
|
||
if method == "status":
|
||
return {
|
||
"finished": True,
|
||
"dest_folder_path": "/data/extracted",
|
||
"path": "/backup/archive.zip",
|
||
"progress": 1,
|
||
}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["extract"](
|
||
file_path="/backup/archive.zip",
|
||
dest_folder_path="/data/extracted",
|
||
)
|
||
|
||
assert result == "Extracted to: /data/extracted"
|
||
|
||
# Verify DSM call parameters
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[0][0] == "SYNO.FileStation.Extract"
|
||
assert start_call[0][1] == "start"
|
||
assert start_call[1]["version"] == 2
|
||
p = start_call[1]["params"]
|
||
assert json.loads(p["file_path"]) == "/backup/archive.zip"
|
||
assert json.loads(p["dest_folder_path"]) == "/data/extracted"
|
||
assert p["overwrite"] == "false"
|
||
assert p["keep_dir"] == "true"
|
||
assert p["create_subfolder"] == "false"
|
||
assert p["codepage"] == "enu"
|
||
assert p["password"] == ""
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_overwrite_and_subfolder(config: AppConfig) -> None:
|
||
"""extract passes overwrite=true and create_subfolder=true when requested."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_extract2"}
|
||
return {"finished": True, "dest_folder_path": "/data/out"}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
await tools["extract"](
|
||
file_path="/backup/archive.zip",
|
||
dest_folder_path="/data/out",
|
||
overwrite=True,
|
||
create_subfolder=True,
|
||
)
|
||
|
||
p = client.request.call_args_list[0][1]["params"]
|
||
assert p["overwrite"] == "true"
|
||
assert p["create_subfolder"] == "true"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_dest_folder_from_status(config: AppConfig) -> None:
|
||
"""extract uses dest_folder_path from status response when available."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_extract3"}
|
||
return {"finished": True, "dest_folder_path": "/data/real-dest"}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["extract"](
|
||
file_path="/backup/archive.zip",
|
||
dest_folder_path="/data/requested",
|
||
)
|
||
|
||
# Should report what DSM confirmed, not what we requested
|
||
assert result == "Extracted to: /data/real-dest"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_dsm_error_on_start(config: AppConfig) -> None:
|
||
"""extract returns Error: when the start call fails (e.g. bad path)."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["extract"](
|
||
file_path="/backup/missing.zip",
|
||
dest_folder_path="/data/out",
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_timeout(config: AppConfig) -> None:
|
||
"""extract returns an error after polling times out."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_extract_timeout"}
|
||
return {"finished": False, "progress": 0.1}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["extract"](
|
||
file_path="/backup/huge.zip",
|
||
dest_folder_path="/data/out",
|
||
)
|
||
|
||
assert result.startswith("Error:")
|
||
assert "timed out" in result.lower() or "60 seconds" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_exist_empty_path(config: AppConfig) -> None:
|
||
"""check_exist returns Error: when no path is given."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["check_exist"](path=" ")
|
||
|
||
assert result.startswith("Error:")
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_exist_dsm_error(config: AppConfig) -> None:
|
||
"""check_exist propagates DSM errors as Error: messages."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("Permission denied", code=105))
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
result = await tools["check_exist"](path="/docker")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "Permission denied" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_exist_uses_getinfo(config: AppConfig) -> None:
|
||
"""check_exist uses SYNO.FileStation.List::getinfo as its DSM backend."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"files": [
|
||
{"path": "/docker", "name": "docker", "isdir": True, "additional": {}},
|
||
]
|
||
}
|
||
)
|
||
|
||
tools = _make_mcp_and_tools(config, client)
|
||
await tools["check_exist"](path="/docker")
|
||
|
||
client.request.assert_called_once()
|
||
call_args = client.request.call_args
|
||
assert call_args[0][0] == "SYNO.FileStation.List"
|
||
assert call_args[0][1] == "getinfo"
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# dir_size
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_success(config: AppConfig) -> None:
|
||
"""dir_size polls until finished and returns a formatted table."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_dirsize1"}
|
||
if method == "status":
|
||
return {
|
||
"finished": True,
|
||
"num_dir": 4,
|
||
"num_file": 23,
|
||
"total_size": 5_242_880,
|
||
}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/data")
|
||
|
||
assert "Folders" in result
|
||
assert "Files" in result
|
||
assert "Total Size" in result
|
||
assert "4" in result
|
||
assert "23" in result
|
||
assert "5 MB" in result or "MB" in result
|
||
|
||
# Verify DSM call params
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[0][0] == "SYNO.FileStation.DirSize"
|
||
assert start_call[0][1] == "start"
|
||
assert start_call[1]["version"] == 2
|
||
assert json.loads(start_call[1]["params"]["path"]) == ["/data"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_multi_path(config: AppConfig) -> None:
|
||
"""dir_size passes all comma-separated paths as a JSON array."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_dirsize2"}
|
||
return {"finished": True, "num_dir": 1, "num_file": 2, "total_size": 1024}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/data, /backup")
|
||
|
||
start_params = client.request.call_args_list[0][1]["params"]
|
||
assert json.loads(start_params["path"]) == ["/data", "/backup"]
|
||
assert "/data" in result
|
||
assert "/backup" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_dsm_error_on_start(config: AppConfig) -> None:
|
||
"""dir_size returns Error: when start fails."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/missing")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_timeout(config: AppConfig) -> None:
|
||
"""dir_size returns Error: after polling times out."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_dirsize_timeout"}
|
||
return {"finished": False, "num_dir": 0, "num_file": 0, "total_size": 0}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/huge")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "timed out" in result.lower() or "60 seconds" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_empty_path(config: AppConfig) -> None:
|
||
"""dir_size returns Error: for blank path without making a DSM call."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock()
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["dir_size"](path=" ")
|
||
|
||
assert result.startswith("Error:")
|
||
client.request.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_retries_on_transient_599(config: AppConfig) -> None:
|
||
"""dir_size retries up to 4 times on code-599 then succeeds on 5th status call."""
|
||
client = MagicMock()
|
||
call_count = {"status": 0}
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_dirsize_599"}
|
||
call_count["status"] += 1
|
||
if call_count["status"] < 4:
|
||
raise SynologyError("DSM error code 599", code=599)
|
||
return {"finished": True, "num_dir": 2, "num_file": 10, "total_size": 1024}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/data")
|
||
|
||
assert "Total Size" in result
|
||
assert call_count["status"] == 4
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_fails_after_5_consecutive_599(config: AppConfig) -> None:
|
||
"""dir_size gives up and returns Error: after exhausting all restart attempts."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_dirsize_dead"}
|
||
raise SynologyError("DSM error code 599", code=599)
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/dead")
|
||
|
||
assert result.startswith("Error:")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dir_size_cold_start_restart(config: AppConfig) -> None:
|
||
"""dir_size restarts the task after 5 consecutive 599s and succeeds on second attempt."""
|
||
client = MagicMock()
|
||
start_count = {"n": 0}
|
||
status_count = {"n": 0}
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
start_count["n"] += 1
|
||
return {"taskid": f"task_{start_count['n']}"}
|
||
status_count["n"] += 1
|
||
# First 5 status calls → 599 (simulates cold start)
|
||
if status_count["n"] <= 5:
|
||
raise SynologyError("DSM error code 599", code=599)
|
||
# After restart: immediately done
|
||
return {"finished": True, "num_dir": 1, "num_file": 5, "total_size": 1024}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["dir_size"](path="/coldstart")
|
||
|
||
assert "Total Size" in result
|
||
assert start_count["n"] == 2 # task was restarted once after cold-start 599s
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# get_md5
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_md5_success(config: AppConfig) -> None:
|
||
"""get_md5 polls until finished and returns the MD5 string."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_md5_1"}
|
||
if method == "status":
|
||
return {"finished": True, "md5": "d41d8cd98f00b204e9800998ecf8427e"}
|
||
return {}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["get_md5"](path="/data/file.zip")
|
||
|
||
assert result == "MD5 of /data/file.zip: d41d8cd98f00b204e9800998ecf8427e"
|
||
|
||
# Verify DSM call params
|
||
start_call = client.request.call_args_list[0]
|
||
assert start_call[0][0] == "SYNO.FileStation.MD5"
|
||
assert start_call[0][1] == "start"
|
||
assert start_call[1]["version"] == 2
|
||
assert json.loads(start_call[1]["params"]["file_path"]) == "/data/file.zip"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_md5_dsm_error_on_start(config: AppConfig) -> None:
|
||
"""get_md5 returns Error: when start fails (e.g. file not found)."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["get_md5"](path="/data/missing.zip")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "not found" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_md5_timeout(config: AppConfig) -> None:
|
||
"""get_md5 returns Error: after polling times out."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_md5_timeout"}
|
||
return {"finished": False}
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["get_md5"](path="/data/huge.iso")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "timed out" in result.lower() or "60 seconds" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_md5_missing_hash_in_response(config: AppConfig) -> None:
|
||
"""get_md5 returns Error: when finished status contains no md5 field."""
|
||
client = MagicMock()
|
||
|
||
async def _request(api, method, version=None, params=None, **kwargs):
|
||
if method == "start":
|
||
return {"taskid": "FileStation_md5_nohash"}
|
||
return {"finished": True} # md5 field absent
|
||
|
||
client.request = AsyncMock(side_effect=_request)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||
result = await tools["get_md5"](path="/data/file.zip")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "md5" in result.lower() or "hash" in result.lower()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# background_tasks
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_background_tasks_empty(config: AppConfig) -> None:
|
||
"""background_tasks returns a 'no tasks' message when the list is empty."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"offset": 0, "tasks": [], "total": 0})
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["background_tasks"]()
|
||
|
||
assert "No background tasks" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_background_tasks_with_tasks(config: AppConfig) -> None:
|
||
"""background_tasks returns a formatted table when tasks are present."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"offset": 0,
|
||
"total": 1,
|
||
"tasks": [
|
||
{
|
||
"taskid": "FileStation_CopyMove_1",
|
||
"type": "CopyMove",
|
||
"status": "running",
|
||
"path": "/docker/dest",
|
||
"processed_num_file": 3,
|
||
"total_num_file": 10,
|
||
}
|
||
],
|
||
}
|
||
)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["background_tasks"]()
|
||
|
||
assert "FileStation_CopyMove_1" in result
|
||
assert "CopyMove" in result
|
||
assert "running" in result
|
||
assert "/docker/dest" in result
|
||
assert "3/10" in result
|
||
assert "1 task(s)" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_background_tasks_pagination_hint(config: AppConfig) -> None:
|
||
"""background_tasks shows a pagination hint when there are more results."""
|
||
client = MagicMock()
|
||
tasks = [
|
||
{
|
||
"taskid": f"task_{i}",
|
||
"type": "Delete",
|
||
"status": "running",
|
||
"path": f"/share/item{i}",
|
||
"total_num_file": 0,
|
||
}
|
||
for i in range(5)
|
||
]
|
||
client.request = AsyncMock(return_value={"offset": 0, "total": 20, "tasks": tasks})
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["background_tasks"](offset=0, limit=5)
|
||
|
||
assert "20 task(s)" in result
|
||
assert "offset" in result.lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_background_tasks_dsm_error(config: AppConfig) -> None:
|
||
"""background_tasks returns Error: when DSM raises an exception."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
side_effect=SynologyError("Permission denied — check DSM user permissions", code=105)
|
||
)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["background_tasks"]()
|
||
|
||
assert result.startswith("Error:")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_background_tasks_dsm_api_call(config: AppConfig) -> None:
|
||
"""background_tasks calls the correct DSM API with offset and limit."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"offset": 0, "tasks": [], "total": 0})
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
await tools["background_tasks"](offset=10, limit=50)
|
||
|
||
call = client.request.call_args
|
||
assert call[0][0] == "SYNO.FileStation.BackgroundTask"
|
||
assert call[0][1] == "list"
|
||
assert call[1]["version"] == 3
|
||
assert call[1]["params"]["offset"] == 10
|
||
assert call[1]["params"]["limit"] == 50
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# list_snapshots
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_snapshots_btrfs_not_available(config: AppConfig) -> None:
|
||
"""list_snapshots returns a Btrfs-specific error message on DSM error 400."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(side_effect=SynologyError("Invalid parameter", code=400))
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["list_snapshots"](share_path="/docker")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "Btrfs" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_snapshots_empty(config: AppConfig) -> None:
|
||
"""list_snapshots returns a 'no snapshots' message when the list is empty."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"snapshots": [], "total": 0})
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["list_snapshots"](share_path="/docker")
|
||
|
||
assert "No snapshots" in result
|
||
assert "/docker" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_snapshots_with_data(config: AppConfig) -> None:
|
||
"""list_snapshots returns a formatted table when snapshots are present."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
return_value={
|
||
"total": 2,
|
||
"snapshots": [
|
||
{
|
||
"id": "snap_001",
|
||
"time": 1700000000,
|
||
"description": "Before upgrade",
|
||
"lock": True,
|
||
},
|
||
{"id": "snap_002", "time": 1700100000, "description": "", "lock": False},
|
||
],
|
||
}
|
||
)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["list_snapshots"](share_path="/docker")
|
||
|
||
assert "snap_001" in result
|
||
assert "snap_002" in result
|
||
assert "Before upgrade" in result
|
||
assert "Yes" in result # locked
|
||
assert "No" in result # not locked
|
||
assert "2 snapshot(s)" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_snapshots_dsm_error_other(config: AppConfig) -> None:
|
||
"""list_snapshots surfaces non-400 DSM errors as 'Error: …'."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(
|
||
side_effect=SynologyError("Permission denied — check DSM user permissions", code=105)
|
||
)
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["list_snapshots"](share_path="/docker")
|
||
|
||
assert result.startswith("Error:")
|
||
assert "Permission" in result
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_snapshots_dsm_api_call(config: AppConfig) -> None:
|
||
"""list_snapshots calls SYNO.FileStation.Snapshot::list v2 with the correct params."""
|
||
client = MagicMock()
|
||
client.request = AsyncMock(return_value={"snapshots": [], "total": 0})
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
await tools["list_snapshots"](share_path="/data", offset=0, limit=50)
|
||
|
||
call = client.request.call_args
|
||
assert call[0][0] == "SYNO.FileStation.Snapshot"
|
||
assert call[0][1] == "list"
|
||
assert call[1]["version"] == 2
|
||
assert call[1]["params"]["folder_path"] == "/data"
|
||
assert call[1]["params"]["offset"] == 0
|
||
assert call[1]["params"]["limit"] == 50
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_snapshots_pagination_hint(config: AppConfig) -> None:
|
||
"""list_snapshots shows a pagination hint when more results are available."""
|
||
client = MagicMock()
|
||
snaps = [
|
||
{"id": f"snap_{i}", "time": 1700000000 + i * 3600, "description": "", "lock": False}
|
||
for i in range(3)
|
||
]
|
||
client.request = AsyncMock(return_value={"snapshots": snaps, "total": 10})
|
||
tools = _make_mcp_and_tools(config, client)
|
||
|
||
result = await tools["list_snapshots"](share_path="/data", offset=0, limit=3)
|
||
|
||
assert "10 snapshot(s)" in result
|
||
assert "offset" in result.lower()
|