"""Wegwerfskript: DirSize + MD5 direkt gegen die NAS testen. Ergebnisse der Versionsanalyse (vollständig): DirSize: start v2, status v1 MD5: start v2, status v1 - Wichtig: MD5-Status ist ONE-SHOT — nach dem ersten erfolgreichen Abruf verfällt die Task-ID sofort (code 599 bei allen Folgeabfragen). - Erster status-Aufruf muss direkt nach start kommen (delay~0). DirSize status-Response Felder: finished: bool num_dir: int (Anzahl Unterordner) num_file: int (Anzahl Dateien) total_size: int (Gesamtgrösse in Bytes) MD5 status-Response Felder: finished: bool md5: str (Hex-String, 32 Zeichen) Ausführen: uv run python test_dirsize_md5.py """ import asyncio import json import httpx from mcp_synology_filestation.auth import AuthManager from mcp_synology_filestation.client import FileStationClient from mcp_synology_filestation.config import load_config DIRSIZE_PATH = "/test-mcp" MD5_PATH = "/test-mcp/test.zip" def pp(label: str, data: object) -> None: print(f"\n{'='*60}") print(f" {label}") print("=" * 60) print(json.dumps(data, indent=2, ensure_ascii=False)) async def raw(http: httpx.AsyncClient, base_url: str, sid: str, **params) -> dict: r = await http.get(f"{base_url}/webapi/entry.cgi", params={"_sid": sid, **params}) r.raise_for_status() try: return r.json() except Exception: return {"_raw": r.text[:300], "_http_status": r.status_code} async def main() -> None: config = load_config() auth = AuthManager(config) async with FileStationClient(config.base_url, config.connection.verify_ssl) as client: client.set_auth_manager(auth) await client._ensure_initialized() # noqa: SLF001 sid = client.sid base = config.base_url print(f"[*] SID: {'OK' if sid else 'MISSING'}") async with httpx.AsyncClient(verify=config.connection.verify_ssl, timeout=30.0) as http: # ── DirSize: start v2, status v1 ───────────────────────────── print(f"\n{'#'*60}") print(" SYNO.FileStation.DirSize (start v2, status v1)") print(f"{'#'*60}") start_body = await raw( http, base, sid, api="SYNO.FileStation.DirSize", version="2", method="start", path=json.dumps([DIRSIZE_PATH]), ) pp("DirSize::start", start_body) taskid = (start_body.get("data") or {}).get("taskid") if taskid: for attempt in range(1, 11): await asyncio.sleep(0.2) status_body = await raw( http, base, sid, api="SYNO.FileStation.DirSize", version="1", method="status", taskid=taskid, ) pp(f"DirSize::status (attempt {attempt})", status_body) data = (status_body.get("data") or {}) if data.get("finished"): print("\n[*] FINISHED. Fields:") for k, v in data.items(): print(f" {k!r}: {type(v).__name__} = {v!r}") break # ── MD5: start v2, status v1 (one-shot!) ───────────────────── print(f"\n{'#'*60}") print(" SYNO.FileStation.MD5 (start v2, status v1, ONE-SHOT)") print(f"{'#'*60}") start_body = await raw( http, base, sid, api="SYNO.FileStation.MD5", version="2", method="start", file_path=json.dumps(MD5_PATH), ) pp("MD5::start", start_body) taskid = (start_body.get("data") or {}).get("taskid") if taskid: # Immediately poll status v1 — one-shot window, don't delay for attempt in range(1, 6): if attempt > 1: await asyncio.sleep(0.05) status_body = await raw( http, base, sid, api="SYNO.FileStation.MD5", version="1", method="status", taskid=taskid, ) pp(f"MD5::status (attempt {attempt})", status_body) data = (status_body.get("data") or {}) if data.get("finished"): print("\n[*] FINISHED. Fields:") for k, v in data.items(): print(f" {k!r}: {type(v).__name__} = {v!r}") break if not status_body.get("success"): print("[!] 599 — task window closed, done probing.") break await auth.logout(client) print("\n[*] Logout OK.") if __name__ == "__main__": asyncio.run(main())