"""Wegwerfskript: DirSize + MD5 direkt gegen die NAS testen. Ergebnisse der Versionsanalyse (vollständig): DirSize: start v2, status v1 MD5: start v2, status v2 - Wichtig: MD5-Status ist ONE-SHOT -- nach dem ersten erfolgreichen Abruf verfaellt die Task-ID sofort (code 599 bei allen Folgeabfragen). - Erster status-Aufruf muss direkt nach start kommen (delay~0). DirSize status-Response Felder: finished: bool num_dir: int (Anzahl Unterordner) num_file: int (Anzahl Dateien) total_size: int (Gesamtgroesse in Bytes) MD5 status-Response Felder: finished: bool md5: str (Hex-String, 32 Zeichen) Hypothese: _poll_task() schlaeft 200ms vor dem ersten status-Call. Winzige Daten -> Task abgeschlossen bevor erster Poll -> 599. Fix: initial_delay=0 fuer DirSize und MD5. WICHTIG: Rohe HTTP-Tests treffen evt. falschen API-Pfad (entry.cgi), waehrend die API wirklich auf einem anderen Pfad (z.B. FileStation.cgi) laeuft. Test unten zeigt API-Pfad aus dem Cache. Ausfuehren: uv run python test_dirsize_md5.py """ import asyncio import json import time import httpx from mcp_synology_filestation.auth import AuthManager from mcp_synology_filestation.client import FileStationClient from mcp_synology_filestation.config import load_config DIRSIZE_PATH = "/test-mcp" MD5_PATH = "/test-mcp/test.zip" def pp(label: str, data: object, elapsed_ms: float | None = None) -> None: print(f"\n{'='*60}") suffix = f" [{elapsed_ms:.1f} ms nach start]" if elapsed_ms is not None else "" print(f" {label}{suffix}") print("=" * 60) print(json.dumps(data, indent=2, ensure_ascii=False)) async def raw(http: httpx.AsyncClient, url: str, sid: str, **params) -> dict: """Hit a specific URL (not hardcoded to entry.cgi).""" r = await http.get(url, params={"_sid": sid, **params}) r.raise_for_status() try: return r.json() except Exception: return {"_raw": r.text[:300], "_http_status": r.status_code} async def probe_dirsize( http: httpx.AsyncClient, base: str, sid: str, api_url: str ) -> None: print(f"\n{'#'*60}") print(f" DIRSIZE — Timing-Probe (URL: {api_url})") print(" Delays: 0ms, 50ms, 200ms, 500ms, stale, fake") print(f"{'#'*60}") t0 = time.perf_counter() start_body = await raw( http, api_url, sid, api="SYNO.FileStation.DirSize", version="2", method="start", path=json.dumps([DIRSIZE_PATH]), ) pp("DirSize::start", start_body, (time.perf_counter() - t0) * 1000) taskid = (start_body.get("data") or {}).get("taskid") if not taskid: print("[!] No taskid -- aborting probe.") return for label, pre_sleep in [ ("0ms delay", 0), ("~50ms delay", 0.05), ("~200ms delay", 0.15), ("~500ms delay", 0.3), ("stale ~2.5s", 2.0), ]: if pre_sleep: await asyncio.sleep(pre_sleep) t1 = time.perf_counter() r = await raw( http, api_url, sid, api="SYNO.FileStation.DirSize", version="1", method="status", taskid=taskid, ) pp(f"DirSize::status [{label}]", r, (t1 - t0) * 1000) data = (r.get("data") or {}) if data.get("finished"): print("[*] finished=True. Stopping here.") break if not r.get("success"): error_code = (r.get("error") or {}).get("code") print(f"[!] Error {error_code}") # Fake taskid rf = await raw( http, api_url, sid, api="SYNO.FileStation.DirSize", version="1", method="status", taskid="fake-task-id-does-not-exist", ) pp("DirSize::status [fake taskid]", rf) async def probe_md5( http: httpx.AsyncClient, base: str, sid: str, api_url: str ) -> None: print(f"\n{'#'*60}") print(f" MD5 -- Timing-Probe (URL: {api_url})") print(" Delays: 0ms, 50ms, 200ms, stale, fake") print(f"{'#'*60}") t0 = time.perf_counter() start_body = await raw( http, api_url, sid, api="SYNO.FileStation.MD5", version="2", method="start", file_path=json.dumps(MD5_PATH), ) pp("MD5::start", start_body, (time.perf_counter() - t0) * 1000) taskid = (start_body.get("data") or {}).get("taskid") if not taskid: print("[!] No taskid -- aborting probe.") return # Try both versions at 0ms for the same taskid for ver in [1, 2]: t1 = time.perf_counter() r = await raw( http, api_url, sid, api="SYNO.FileStation.MD5", version=str(ver), method="status", taskid=taskid, ) pp(f"MD5::status v{ver} [0ms delay]", r, (t1 - t0) * 1000) data = (r.get("data") or {}) if data.get("finished"): print(f"[*] v{ver} finished=True!") for label, pre_sleep in [ ("~50ms delay", 0.05), ("~200ms delay", 0.15), ("stale ~2.5s", 2.0), ]: await asyncio.sleep(pre_sleep) for ver in [1, 2]: t1 = time.perf_counter() r = await raw( http, api_url, sid, api="SYNO.FileStation.MD5", version=str(ver), method="status", taskid=taskid, ) pp(f"MD5::status v{ver} [{label}]", r, (t1 - t0) * 1000) data = (r.get("data") or {}) if data.get("finished"): print(f"[*] v{ver} finished=True! md5={data.get('md5')}") # What does start response look like for a new task? print("\n[*] Starting a second MD5 task to compare...") t2 = time.perf_counter() start2 = await raw( http, api_url, sid, api="SYNO.FileStation.MD5", version="2", method="start", file_path=json.dumps(MD5_PATH), ) pp("MD5::start (2nd)", start2, (time.perf_counter() - t2) * 1000) taskid2 = (start2.get("data") or {}).get("taskid") if taskid2: # Poll immediately with both versions for ver in [1, 2]: t3 = time.perf_counter() r = await raw( http, api_url, sid, api="SYNO.FileStation.MD5", version=str(ver), method="status", taskid=taskid2, ) pp(f"MD5::status v{ver} [0ms, task2]", r, (t3 - t2) * 1000) data = (r.get("data") or {}) if data.get("finished"): print(f"[*] v{ver} task2 finished=True! md5={data.get('md5')}") # Fake taskid rf = await raw( http, api_url, sid, api="SYNO.FileStation.MD5", version="2", method="status", taskid="fake-task-id-does-not-exist", ) pp("MD5::status [fake taskid]", rf) async def main() -> None: config = load_config() auth = AuthManager(config) async with FileStationClient(config.base_url, config.connection.verify_ssl) as client: client.set_auth_manager(auth) await client._ensure_initialized() # noqa: SLF001 sid = client.sid base = config.base_url # --- Show API paths from cache --- print(f"\n[*] SID: {'OK' if sid else 'MISSING'}") for api_name in ["SYNO.FileStation.DirSize", "SYNO.FileStation.MD5"]: info = client._api_cache.get(api_name) # noqa: SLF001 if info: print(f"[*] {api_name}: path={info['path']} v{info['minVersion']}-v{info['maxVersion']}") else: print(f"[!] {api_name}: NOT in API cache!") dirsize_info = client._api_cache.get("SYNO.FileStation.DirSize", {}) # noqa: SLF001 md5_info = client._api_cache.get("SYNO.FileStation.MD5", {}) # noqa: SLF001 dirsize_url = f"{base}/webapi/{dirsize_info.get('path', 'entry.cgi')}" md5_url = f"{base}/webapi/{md5_info.get('path', 'entry.cgi')}" async with httpx.AsyncClient(verify=config.connection.verify_ssl, timeout=30.0) as http: await probe_dirsize(http, base, sid, dirsize_url) await probe_md5(http, base, sid, md5_url) await auth.logout(client) print("\n[*] Logout OK.") if __name__ == "__main__": asyncio.run(main())