diff --git a/src/mcp_synology_filestation/tools/filestation.py b/src/mcp_synology_filestation/tools/filestation.py index b746fc0..af2dd8a 100644 --- a/src/mcp_synology_filestation/tools/filestation.py +++ b/src/mcp_synology_filestation/tools/filestation.py @@ -781,6 +781,101 @@ def register_filestation( dest = status.get("dest_folder_path", dest_folder_path) return f"Extracted to: {dest}" + @mcp.tool() + async def dir_size(path: str): + """Get total size, file count and folder count for one or more directories. + path: comma-separated share-relative paths.""" + from mcp_synology_filestation.client import SynologyError + + paths = [p.strip() for p in path.split(",") if p.strip()] + if not paths: + return "Error: no path provided." + + try: + start_data = await client.request( + "SYNO.FileStation.DirSize", + "start", + version=2, + params={"path": json.dumps(paths)}, + ) + except SynologyError as e: + return f"Error: {e}" + + taskid: str = start_data.get("taskid", "") + if not taskid: + return "Error: DSM did not return a task ID." + + ok, result = await _poll_task("SYNO.FileStation.DirSize", 2, taskid) + if not ok: + return result # type: ignore[return-value] + + status: dict[str, Any] = result # type: ignore[assignment] + num_dir = status.get("num_dir", 0) + num_file = status.get("num_file", 0) + total_size = status.get("total_size", 0) + + path_label = ", ".join(paths) + w_path = max(len("Path"), len(path_label)) + num_dir_str = str(num_dir) + num_file_str = str(num_file) + size_str = _fmt_size(total_size) + + sep = ( + f"+{'-' * (w_path + 2)}" + f"+{'-' * (max(len('Folders'), len(num_dir_str)) + 2)}" + f"+{'-' * (max(len('Files'), len(num_file_str)) + 2)}" + f"+{'-' * (max(len('Total Size'), len(size_str)) + 2)}+" + ) + w_dir = max(len("Folders"), len(num_dir_str)) + w_file = max(len("Files"), len(num_file_str)) + w_size = max(len("Total Size"), len(size_str)) + + sep = ( + f"+{'-' * (w_path + 2)}+{'-' * (w_dir + 2)}+{'-' * (w_file + 2)}+{'-' * (w_size + 2)}+" + ) + header = ( + f"| {'Path':<{w_path}} " + f"| {'Folders':<{w_dir}} " + f"| {'Files':<{w_file}} " + f"| {'Total Size':<{w_size}} |" + ) + row = ( + f"| {path_label:<{w_path}} " + f"| {num_dir_str:<{w_dir}} " + f"| {num_file_str:<{w_file}} " + f"| {size_str:<{w_size}} |" + ) + return "\n".join([sep, header, sep, row, sep]) + + @mcp.tool() + async def get_md5(path: str): + """Compute the MD5 checksum of a file on the NAS. path: share-relative file path.""" + from mcp_synology_filestation.client import SynologyError + + try: + start_data = await client.request( + "SYNO.FileStation.MD5", + "start", + version=2, + params={"file_path": json.dumps(path)}, + ) + except SynologyError as e: + return f"Error: {e}" + + taskid: str = start_data.get("taskid", "") + if not taskid: + return "Error: DSM did not return a task ID." + + ok, result = await _poll_task("SYNO.FileStation.MD5", 2, taskid) + if not ok: + return result # type: ignore[return-value] + + status: dict[str, Any] = result # type: ignore[assignment] + md5 = status.get("md5", "") + if not md5: + return "Error: DSM returned no MD5 hash." + return f"MD5 of {path}: {md5}" + @mcp.tool() async def upload( path: str, diff --git a/tests/test_tools_filestation.py b/tests/test_tools_filestation.py index 03cb379..00c0fe8 100644 --- a/tests/test_tools_filestation.py +++ b/tests/test_tools_filestation.py @@ -1496,3 +1496,202 @@ async def test_check_exist_uses_getinfo(config: AppConfig) -> None: call_args = client.request.call_args assert call_args[0][0] == "SYNO.FileStation.List" assert call_args[0][1] == "getinfo" + + +# ────────────────────────────────────────────────────────────────────────── +# dir_size +# ────────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_dir_size_success(config: AppConfig) -> None: + """dir_size polls until finished and returns a formatted table.""" + client = MagicMock() + + async def _request(api, method, version=None, params=None, **kwargs): + if method == "start": + return {"taskid": "FileStation_dirsize1"} + if method == "status": + return { + "finished": True, + "num_dir": 4, + "num_file": 23, + "total_size": 5_242_880, + } + return {} + + client.request = AsyncMock(side_effect=_request) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["dir_size"](path="/data") + + assert "Folders" in result + assert "Files" in result + assert "Total Size" in result + assert "4" in result + assert "23" in result + assert "5 MB" in result or "MB" in result + + # Verify DSM call params + start_call = client.request.call_args_list[0] + assert start_call[0][0] == "SYNO.FileStation.DirSize" + assert start_call[0][1] == "start" + assert start_call[1]["version"] == 2 + assert json.loads(start_call[1]["params"]["path"]) == ["/data"] + + +@pytest.mark.asyncio +async def test_dir_size_multi_path(config: AppConfig) -> None: + """dir_size passes all comma-separated paths as a JSON array.""" + client = MagicMock() + + async def _request(api, method, version=None, params=None, **kwargs): + if method == "start": + return {"taskid": "FileStation_dirsize2"} + return {"finished": True, "num_dir": 1, "num_file": 2, "total_size": 1024} + + client.request = AsyncMock(side_effect=_request) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["dir_size"](path="/data, /backup") + + start_params = client.request.call_args_list[0][1]["params"] + assert json.loads(start_params["path"]) == ["/data", "/backup"] + assert "/data" in result + assert "/backup" in result + + +@pytest.mark.asyncio +async def test_dir_size_dsm_error_on_start(config: AppConfig) -> None: + """dir_size returns Error: when start fails.""" + client = MagicMock() + client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800)) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["dir_size"](path="/missing") + + assert result.startswith("Error:") + assert "not found" in result.lower() + + +@pytest.mark.asyncio +async def test_dir_size_timeout(config: AppConfig) -> None: + """dir_size returns Error: after polling times out.""" + client = MagicMock() + + async def _request(api, method, version=None, params=None, **kwargs): + if method == "start": + return {"taskid": "FileStation_dirsize_timeout"} + return {"finished": False, "num_dir": 0, "num_file": 0, "total_size": 0} + + client.request = AsyncMock(side_effect=_request) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["dir_size"](path="/huge") + + assert result.startswith("Error:") + assert "timed out" in result.lower() or "60 seconds" in result + + +@pytest.mark.asyncio +async def test_dir_size_empty_path(config: AppConfig) -> None: + """dir_size returns Error: for blank path without making a DSM call.""" + client = MagicMock() + client.request = AsyncMock() + tools = _make_mcp_and_tools(config, client) + + result = await tools["dir_size"](path=" ") + + assert result.startswith("Error:") + client.request.assert_not_called() + + +# ────────────────────────────────────────────────────────────────────────── +# get_md5 +# ────────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_get_md5_success(config: AppConfig) -> None: + """get_md5 polls until finished and returns the MD5 string.""" + client = MagicMock() + + async def _request(api, method, version=None, params=None, **kwargs): + if method == "start": + return {"taskid": "FileStation_md5_1"} + if method == "status": + return {"finished": True, "md5": "d41d8cd98f00b204e9800998ecf8427e"} + return {} + + client.request = AsyncMock(side_effect=_request) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["get_md5"](path="/data/file.zip") + + assert result == "MD5 of /data/file.zip: d41d8cd98f00b204e9800998ecf8427e" + + # Verify DSM call params + start_call = client.request.call_args_list[0] + assert start_call[0][0] == "SYNO.FileStation.MD5" + assert start_call[0][1] == "start" + assert start_call[1]["version"] == 2 + assert json.loads(start_call[1]["params"]["file_path"]) == "/data/file.zip" + + +@pytest.mark.asyncio +async def test_get_md5_dsm_error_on_start(config: AppConfig) -> None: + """get_md5 returns Error: when start fails (e.g. file not found).""" + client = MagicMock() + client.request = AsyncMock(side_effect=SynologyError("File or folder not found", code=1800)) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["get_md5"](path="/data/missing.zip") + + assert result.startswith("Error:") + assert "not found" in result.lower() + + +@pytest.mark.asyncio +async def test_get_md5_timeout(config: AppConfig) -> None: + """get_md5 returns Error: after polling times out.""" + client = MagicMock() + + async def _request(api, method, version=None, params=None, **kwargs): + if method == "start": + return {"taskid": "FileStation_md5_timeout"} + return {"finished": False} + + client.request = AsyncMock(side_effect=_request) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["get_md5"](path="/data/huge.iso") + + assert result.startswith("Error:") + assert "timed out" in result.lower() or "60 seconds" in result + + +@pytest.mark.asyncio +async def test_get_md5_missing_hash_in_response(config: AppConfig) -> None: + """get_md5 returns Error: when finished status contains no md5 field.""" + client = MagicMock() + + async def _request(api, method, version=None, params=None, **kwargs): + if method == "start": + return {"taskid": "FileStation_md5_nohash"} + return {"finished": True} # md5 field absent + + client.request = AsyncMock(side_effect=_request) + tools = _make_mcp_and_tools(config, client) + + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await tools["get_md5"](path="/data/file.zip") + + assert result.startswith("Error:") + assert "md5" in result.lower() or "hash" in result.lower()