feat: add dir_size and get_md5 tools

dir_size: SYNO.FileStation.DirSize v2 — async scan of one or more
directories, returns folder/file count and total size as a table.
get_md5: SYNO.FileStation.MD5 v2 — async MD5 checksum of a file.
Both follow the existing _poll_task pattern. 9 new unit tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-14 12:10:51 +02:00
parent fc706fb809
commit 1d0cf940b4
2 changed files with 294 additions and 0 deletions
@@ -781,6 +781,101 @@ def register_filestation(
dest = status.get("dest_folder_path", dest_folder_path) dest = status.get("dest_folder_path", dest_folder_path)
return f"Extracted to: {dest}" return f"Extracted to: {dest}"
@mcp.tool()
async def dir_size(path: str):
"""Get total size, file count and folder count for one or more directories.
path: comma-separated share-relative paths."""
from mcp_synology_filestation.client import SynologyError
paths = [p.strip() for p in path.split(",") if p.strip()]
if not paths:
return "Error: no path provided."
try:
start_data = await client.request(
"SYNO.FileStation.DirSize",
"start",
version=2,
params={"path": json.dumps(paths)},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.DirSize", 2, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
num_dir = status.get("num_dir", 0)
num_file = status.get("num_file", 0)
total_size = status.get("total_size", 0)
path_label = ", ".join(paths)
w_path = max(len("Path"), len(path_label))
num_dir_str = str(num_dir)
num_file_str = str(num_file)
size_str = _fmt_size(total_size)
sep = (
f"+{'-' * (w_path + 2)}"
f"+{'-' * (max(len('Folders'), len(num_dir_str)) + 2)}"
f"+{'-' * (max(len('Files'), len(num_file_str)) + 2)}"
f"+{'-' * (max(len('Total Size'), len(size_str)) + 2)}+"
)
w_dir = max(len("Folders"), len(num_dir_str))
w_file = max(len("Files"), len(num_file_str))
w_size = max(len("Total Size"), len(size_str))
sep = (
f"+{'-' * (w_path + 2)}+{'-' * (w_dir + 2)}+{'-' * (w_file + 2)}+{'-' * (w_size + 2)}+"
)
header = (
f"| {'Path':<{w_path}} "
f"| {'Folders':<{w_dir}} "
f"| {'Files':<{w_file}} "
f"| {'Total Size':<{w_size}} |"
)
row = (
f"| {path_label:<{w_path}} "
f"| {num_dir_str:<{w_dir}} "
f"| {num_file_str:<{w_file}} "
f"| {size_str:<{w_size}} |"
)
return "\n".join([sep, header, sep, row, sep])
@mcp.tool()
async def get_md5(path: str):
"""Compute the MD5 checksum of a file on the NAS. path: share-relative file path."""
from mcp_synology_filestation.client import SynologyError
try:
start_data = await client.request(
"SYNO.FileStation.MD5",
"start",
version=2,
params={"file_path": json.dumps(path)},
)
except SynologyError as e:
return f"Error: {e}"
taskid: str = start_data.get("taskid", "")
if not taskid:
return "Error: DSM did not return a task ID."
ok, result = await _poll_task("SYNO.FileStation.MD5", 2, taskid)
if not ok:
return result # type: ignore[return-value]
status: dict[str, Any] = result # type: ignore[assignment]
md5 = status.get("md5", "")
if not md5:
return "Error: DSM returned no MD5 hash."
return f"MD5 of {path}: {md5}"
@mcp.tool() @mcp.tool()
async def upload( async def upload(
path: str, path: str,
+199
View File
@@ -1496,3 +1496,202 @@ async def test_check_exist_uses_getinfo(config: AppConfig) -> None:
call_args = client.request.call_args call_args = client.request.call_args
assert call_args[0][0] == "SYNO.FileStation.List" assert call_args[0][0] == "SYNO.FileStation.List"
assert call_args[0][1] == "getinfo" assert call_args[0][1] == "getinfo"
# ──────────────────────────────────────────────────────────────────────────
# dir_size
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_dir_size_success(config: AppConfig) -> None:
"""dir_size polls until finished and returns a formatted table."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize1"}
if method == "status":
return {
"finished": True,
"num_dir": 4,
"num_file": 23,
"total_size": 5_242_880,
}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data")
assert "Folders" in result
assert "Files" in result
assert "Total Size" in result
assert "4" in result
assert "23" in result
assert "5 MB" in result or "MB" in result
# Verify DSM call params
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.DirSize"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
assert json.loads(start_call[1]["params"]["path"]) == ["/data"]
@pytest.mark.asyncio
async def test_dir_size_multi_path(config: AppConfig) -> None:
"""dir_size passes all comma-separated paths as a JSON array."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize2"}
return {"finished": True, "num_dir": 1, "num_file": 2, "total_size": 1024}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/data, /backup")
start_params = client.request.call_args_list[0][1]["params"]
assert json.loads(start_params["path"]) == ["/data", "/backup"]
assert "/data" in result
assert "/backup" in result
@pytest.mark.asyncio
async def test_dir_size_dsm_error_on_start(config: AppConfig) -> None:
"""dir_size returns Error: when start fails."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/missing")
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_dir_size_timeout(config: AppConfig) -> None:
"""dir_size returns Error: after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_dirsize_timeout"}
return {"finished": False, "num_dir": 0, "num_file": 0, "total_size": 0}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["dir_size"](path="/huge")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_dir_size_empty_path(config: AppConfig) -> None:
"""dir_size returns Error: for blank path without making a DSM call."""
client = MagicMock()
client.request = AsyncMock()
tools = _make_mcp_and_tools(config, client)
result = await tools["dir_size"](path=" ")
assert result.startswith("Error:")
client.request.assert_not_called()
# ──────────────────────────────────────────────────────────────────────────
# get_md5
# ──────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_md5_success(config: AppConfig) -> None:
"""get_md5 polls until finished and returns the MD5 string."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_1"}
if method == "status":
return {"finished": True, "md5": "d41d8cd98f00b204e9800998ecf8427e"}
return {}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/file.zip")
assert result == "MD5 of /data/file.zip: d41d8cd98f00b204e9800998ecf8427e"
# Verify DSM call params
start_call = client.request.call_args_list[0]
assert start_call[0][0] == "SYNO.FileStation.MD5"
assert start_call[0][1] == "start"
assert start_call[1]["version"] == 2
assert json.loads(start_call[1]["params"]["file_path"]) == "/data/file.zip"
@pytest.mark.asyncio
async def test_get_md5_dsm_error_on_start(config: AppConfig) -> None:
"""get_md5 returns Error: when start fails (e.g. file not found)."""
client = MagicMock()
client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800))
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/missing.zip")
assert result.startswith("Error:")
assert "not found" in result.lower()
@pytest.mark.asyncio
async def test_get_md5_timeout(config: AppConfig) -> None:
"""get_md5 returns Error: after polling times out."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_timeout"}
return {"finished": False}
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/huge.iso")
assert result.startswith("Error:")
assert "timed out" in result.lower() or "60 seconds" in result
@pytest.mark.asyncio
async def test_get_md5_missing_hash_in_response(config: AppConfig) -> None:
"""get_md5 returns Error: when finished status contains no md5 field."""
client = MagicMock()
async def _request(api, method, version=None, params=None, **kwargs):
if method == "start":
return {"taskid": "FileStation_md5_nohash"}
return {"finished": True} # md5 field absent
client.request = AsyncMock(side_effect=_request)
tools = _make_mcp_and_tools(config, client)
with patch("asyncio.sleep", new_callable=AsyncMock):
result = await tools["get_md5"](path="/data/file.zip")
assert result.startswith("Error:")
assert "md5" in result.lower() or "hash" in result.lower()