diff --git a/src/mcp_synology_filestation/tools/filestation.py b/src/mcp_synology_filestation/tools/filestation.py index af2dd8a..f6060cf 100644 --- a/src/mcp_synology_filestation/tools/filestation.py +++ b/src/mcp_synology_filestation/tools/filestation.py @@ -805,7 +805,7 @@ def register_filestation( if not taskid: return "Error: DSM did not return a task ID." - ok, result = await _poll_task("SYNO.FileStation.DirSize", 2, taskid) + ok, result = await _poll_task("SYNO.FileStation.DirSize", 1, taskid) if not ok: return result # type: ignore[return-value] diff --git a/test_dirsize_md5.py b/test_dirsize_md5.py new file mode 100644 index 0000000..1db1166 --- /dev/null +++ b/test_dirsize_md5.py @@ -0,0 +1,133 @@ +"""Wegwerfskript: DirSize + MD5 direkt gegen die NAS testen. + +Ergebnisse der Versionsanalyse (vollständig): + DirSize: start v2, status v1 + MD5: start v2, status v1 + - Wichtig: MD5-Status ist ONE-SHOT — nach dem ersten erfolgreichen + Abruf verfällt die Task-ID sofort (code 599 bei allen Folgeabfragen). + - Erster status-Aufruf muss direkt nach start kommen (delay~0). + +DirSize status-Response Felder: + finished: bool + num_dir: int (Anzahl Unterordner) + num_file: int (Anzahl Dateien) + total_size: int (Gesamtgrösse in Bytes) + +MD5 status-Response Felder: + finished: bool + md5: str (Hex-String, 32 Zeichen) + +Ausführen: uv run python test_dirsize_md5.py +""" + +import asyncio +import json + +import httpx + +from mcp_synology_filestation.auth import AuthManager +from mcp_synology_filestation.client import FileStationClient +from mcp_synology_filestation.config import load_config + +DIRSIZE_PATH = "/test-mcp" +MD5_PATH = "/test-mcp/test.zip" + + +def pp(label: str, data: object) -> None: + print(f"\n{'='*60}") + print(f" {label}") + print("=" * 60) + print(json.dumps(data, indent=2, ensure_ascii=False)) + + +async def raw(http: httpx.AsyncClient, base_url: str, sid: str, **params) -> dict: + r = await http.get(f"{base_url}/webapi/entry.cgi", params={"_sid": sid, **params}) + r.raise_for_status() + try: + return r.json() + except Exception: + return {"_raw": r.text[:300], "_http_status": r.status_code} + + +async def main() -> None: + config = load_config() + auth = AuthManager(config) + + async with FileStationClient(config.base_url, config.connection.verify_ssl) as client: + client.set_auth_manager(auth) + await client._ensure_initialized() # noqa: SLF001 + sid = client.sid + base = config.base_url + print(f"[*] SID: {'OK' if sid else 'MISSING'}") + + async with httpx.AsyncClient(verify=config.connection.verify_ssl, timeout=30.0) as http: + + # ── DirSize: start v2, status v1 ───────────────────────────── + print(f"\n{'#'*60}") + print(" SYNO.FileStation.DirSize (start v2, status v1)") + print(f"{'#'*60}") + + start_body = await raw( + http, base, sid, + api="SYNO.FileStation.DirSize", version="2", method="start", + path=json.dumps([DIRSIZE_PATH]), + ) + pp("DirSize::start", start_body) + + taskid = (start_body.get("data") or {}).get("taskid") + if taskid: + for attempt in range(1, 11): + await asyncio.sleep(0.2) + status_body = await raw( + http, base, sid, + api="SYNO.FileStation.DirSize", version="1", method="status", + taskid=taskid, + ) + pp(f"DirSize::status (attempt {attempt})", status_body) + data = (status_body.get("data") or {}) + if data.get("finished"): + print("\n[*] FINISHED. Fields:") + for k, v in data.items(): + print(f" {k!r}: {type(v).__name__} = {v!r}") + break + + # ── MD5: start v2, status v1 (one-shot!) ───────────────────── + print(f"\n{'#'*60}") + print(" SYNO.FileStation.MD5 (start v2, status v1, ONE-SHOT)") + print(f"{'#'*60}") + + start_body = await raw( + http, base, sid, + api="SYNO.FileStation.MD5", version="2", method="start", + file_path=json.dumps(MD5_PATH), + ) + pp("MD5::start", start_body) + + taskid = (start_body.get("data") or {}).get("taskid") + if taskid: + # Immediately poll status v1 — one-shot window, don't delay + for attempt in range(1, 6): + if attempt > 1: + await asyncio.sleep(0.05) + status_body = await raw( + http, base, sid, + api="SYNO.FileStation.MD5", version="1", method="status", + taskid=taskid, + ) + pp(f"MD5::status (attempt {attempt})", status_body) + data = (status_body.get("data") or {}) + if data.get("finished"): + print("\n[*] FINISHED. Fields:") + for k, v in data.items(): + print(f" {k!r}: {type(v).__name__} = {v!r}") + break + if not status_body.get("success"): + print("[!] 599 — task window closed, done probing.") + break + + await auth.logout(client) + print("\n[*] Logout OK.") + + +if __name__ == "__main__": + asyncio.run(main())