Files
marcus 036429e9bf feat: v0.6.0 — read build_stream log instead of dropping it (#2)
DSM emits a readable plaintext build log over the build_stream HTTP
body (one short status line per step) and closes the connection when
the build is done. The 0.2.5 implementation sent the request and
dropped the body unread, leaving users with nothing more than a
BUILD_FAILED polling status and no actionable diagnostic.

DsmClient.trigger_build_stream now consumes the body line-by-line and
returns the collected log as a string. Wall-clock budget of 210 s
(under the Claude Desktop ~4 min ceiling); on timeout the partial log
is returned with a "[build_stream: timeout — stream still open
server-side]" marker so callers know the build continues server-side.
Per-chunk ReadTimeout is treated the same way. JSON error envelope,
transport-error mapping (M-4), and SID-scrubbed HTTP-error formatting
are unchanged.

redeploy_project and create_project now parse the returned log via
_parse_build_stream_log (any line containing "Error response from
daemon:" or ending in " Error" counts as a failure). On a failed log
the tools abort immediately, surface the daemon line(s) in the result
(e.g. "Error response from daemon: manifest for nginx:9.9.9 not
found: manifest unknown"), and skip the polling step. The BUILD_FAILED
polling guard (M-5) stays as a second safety net for late failures
where the stream was clean but the container exited after start.

No new MCP tool: the build log is a live stream and cannot be
re-fetched after the build ends, so it is surfaced during
redeploy_project / create_project rather than exposed as a standalone
get_project_build_log call.

Minor version bump because redeploy_project and create_project return
materially different strings on a failed build and exit earlier in
the failure path. Signatures unchanged.

Tests: streamed-log collection, daemon-error log, header ReadTimeout
marker, per-chunk ReadTimeout partial log, wall-clock budget
truncation, _parse_build_stream_log unit tests, redeploy/create end-
to-end behavior with a failing log.

Closes #2

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 13:58:55 +02:00

1215 lines
45 KiB
Python

"""Tests for dsm_client.py.
Covers the critical paths of DsmClient:
- Pure helpers (_scrub_url, _error_message).
- request() happy-path, envelope handling, HTTPStatusError scrubbing,
sensitive-param log masking.
- Session re-auth retry: single-retry semantics, thundering-herd,
auth-manager-absent, re-auth failure.
- trigger_build_stream: streamed log collection, JSON error detection,
ReadTimeout marker, HTTP-error scrubbing.
- upload_text / download_text happy-path + error-response.
- _ensure_initialized double-checked-locking and M4 negative-cache cooldown.
All tests run offline. No real HTTP traffic. No new test dependencies —
only pytest, pytest-asyncio, and unittest.mock.
"""
from __future__ import annotations
import asyncio
import json
import logging
from types import SimpleNamespace
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
from mcp_synology_container.dsm_client import (
INIT_ERROR_COOLDOWN,
DsmClient,
SynologyError,
_error_message,
_scrub_url,
)
# ──────────────────────────────────────────────────────────────────────
# Test helpers
# ──────────────────────────────────────────────────────────────────────
def make_response(
body: dict | None,
*,
status: int = 200,
content_type: str = "application/json",
text: str | None = None,
url: str = "https://nas.local:443/webapi/entry.cgi",
) -> MagicMock:
"""Build a mock object that quacks like httpx.Response.
Exposes: status_code, headers (dict), json(), text, raise_for_status(),
request.url. raise_for_status() raises httpx.HTTPStatusError for 4xx/5xx.
"""
resp = MagicMock(name="Response")
resp.status_code = status
resp.headers = {"content-type": content_type}
if body is not None:
resp.json = MagicMock(return_value=body)
resp.text = text if text is not None else json.dumps(body)
else:
resp.json = MagicMock(side_effect=json.JSONDecodeError("no body", "", 0))
resp.text = text if text is not None else ""
request = MagicMock(name="Request")
request.url = url
resp.request = request
def _raise_for_status() -> None:
if status >= 400:
raise httpx.HTTPStatusError(
f"{status} error",
request=request, # type: ignore[arg-type]
response=resp, # type: ignore[arg-type]
)
resp.raise_for_status = MagicMock(side_effect=_raise_for_status)
return resp
def seed_api_cache(client: DsmClient) -> None:
"""Populate the API cache with the APIs the tests use."""
client._api_cache = {
"SYNO.API.Info": {"path": "query.cgi", "minVersion": 1, "maxVersion": 1},
"SYNO.API.Auth": {"path": "auth.cgi", "minVersion": 1, "maxVersion": 7},
"SYNO.Docker.Project": {
"path": "entry.cgi",
"minVersion": 1,
"maxVersion": 1,
},
"SYNO.FileStation.Upload": {
"path": "FileStation/api_upload.cgi",
"minVersion": 2,
"maxVersion": 2,
},
"SYNO.FileStation.Download": {
"path": "FileStation/file_download.cgi",
"minVersion": 1,
"maxVersion": 2,
},
}
def mark_initialized(client: DsmClient) -> None:
"""Bypass _ensure_initialized so tests can focus on request()."""
client._initialized = True
seed_api_cache(client)
def make_stream_ctx(response: MagicMock) -> MagicMock:
"""Mock for httpx.AsyncClient.stream() — returns an async ctx manager."""
ctx = MagicMock(name="StreamCtx")
ctx.__aenter__ = AsyncMock(return_value=response)
ctx.__aexit__ = AsyncMock(return_value=None)
return ctx
async def _aiter_from_bytes(chunks: list[bytes]):
"""Async iterator that yields the given byte chunks."""
for c in chunks:
yield c
# ──────────────────────────────────────────────────────────────────────
# _scrub_url
# ──────────────────────────────────────────────────────────────────────
def test_scrub_url_replaces_sid_value() -> None:
assert _scrub_url("https://nas/api?_sid=abc123") == "https://nas/api?_sid=***"
def test_scrub_url_sid_between_other_params() -> None:
assert (
_scrub_url("https://nas/api?foo=bar&_sid=xyz&baz=qux")
== "https://nas/api?foo=bar&_sid=***&baz=qux"
)
def test_scrub_url_sid_first() -> None:
assert _scrub_url("https://nas/api?_sid=xyz&foo=bar") == "https://nas/api?_sid=***&foo=bar"
def test_scrub_url_without_sid_unchanged() -> None:
url = "https://nas.local/webapi/entry.cgi?api=SYNO.Foo&method=list"
assert _scrub_url(url) == url
def test_scrub_url_multiple_sid_occurrences_all_replaced() -> None:
# Pathological: both values get masked; re.sub replaces all non-overlapping
# occurrences by default.
got = _scrub_url("https://nas/api?_sid=aaa&_sid=bbb")
assert got == "https://nas/api?_sid=***&_sid=***"
def test_scrub_url_empty_string() -> None:
assert _scrub_url("") == ""
def test_scrub_url_sid_value_preserved_as_asterisks_not_removed() -> None:
# The secret must be gone entirely; the key survives.
url = "https://nas/api?_sid=supersecrettoken12345"
got = _scrub_url(url)
assert "supersecrettoken12345" not in got
assert "_sid=***" in got
# ──────────────────────────────────────────────────────────────────────
# _error_message
# ──────────────────────────────────────────────────────────────────────
@pytest.mark.parametrize(
("code", "expected"),
[
(100, "Unknown error"),
(101, "Invalid parameter"),
(102, "API does not exist on this NAS"),
(103, "Method does not exist"),
(104, "API version not supported"),
(105, "Permission denied — check DSM user permissions"),
(106, "Session timeout"),
(107, "Session displaced by another login"),
(119, "Session invalid"),
],
)
def test_error_message_common_codes(code: int, expected: str) -> None:
assert _error_message(code) == expected
@pytest.mark.parametrize(
("code", "expected"),
[
(400, "Incorrect username or password"),
(401, "Account disabled"),
(402, "Permission denied for this service"),
(403, "2FA code required"),
(404, "2FA code incorrect or expired"),
(407, "Too many failed login attempts — account temporarily locked"),
(408, "IP blocked due to excessive failed attempts"),
],
)
def test_error_message_auth_codes_require_auth_api(code: int, expected: str) -> None:
assert _error_message(code, "SYNO.API.Auth") == expected
def test_error_message_400_without_auth_api_falls_through() -> None:
# 400 is not in the common table, so without "Auth" in the api name we
# expect the unknown-code fallback.
assert _error_message(400, "SYNO.Docker.Project") == "DSM error code 400"
def test_error_message_unknown_code_fallback() -> None:
assert _error_message(9999) == "DSM error code 9999"
def test_error_message_unknown_code_with_auth_api_fallback() -> None:
# 9999 not in auth or common — still falls back.
assert _error_message(9999, "SYNO.API.Auth") == "DSM error code 9999"
def test_error_message_session_codes_common_not_auth() -> None:
# 106 is in common but also accessed via auth flows; common wins regardless.
assert _error_message(106, "SYNO.API.Auth") == "Session timeout"
# ──────────────────────────────────────────────────────────────────────
# DsmClient.request() — happy-path + error envelope
# ──────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_request_success_returns_data() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response({"success": True, "data": {"x": 1}})
)
result = await client.request("SYNO.Docker.Project", "list")
assert result == {"x": 1}
@pytest.mark.asyncio
async def test_request_success_no_data_returns_empty_dict() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
client._http.get = AsyncMock(return_value=make_response({"success": True}))
result = await client.request("SYNO.Docker.Project", "list")
assert result == {}
@pytest.mark.asyncio
async def test_request_api_not_cached_raises_102() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._initialized = True
client._api_cache = {} # no APIs at all
client._http = AsyncMock()
with pytest.raises(SynologyError) as exc_info:
await client.request("SYNO.Does.Not.Exist", "list")
assert exc_info.value.code == 102
assert "SYNO.Does.Not.Exist" in str(exc_info.value)
@pytest.mark.asyncio
async def test_request_error_envelope_raises_with_code() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response({"success": False, "error": {"code": 101}})
)
with pytest.raises(SynologyError) as exc_info:
await client.request("SYNO.Docker.Project", "list")
assert exc_info.value.code == 101
assert "Invalid parameter" in str(exc_info.value)
@pytest.mark.asyncio
async def test_request_http_error_scrubs_sid_from_url() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "secret123"
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response(
None,
status=500,
url="https://nas.local/webapi/entry.cgi?api=SYNO.Docker.Project&_sid=secret123",
)
)
with pytest.raises(SynologyError) as exc_info:
await client.request("SYNO.Docker.Project", "list")
msg = str(exc_info.value)
assert "secret123" not in msg
assert "_sid=***" in msg
assert exc_info.value.code == 500
@pytest.mark.asyncio
async def test_request_sensitive_params_masked_in_log(
caplog: pytest.LogCaptureFixture,
) -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "leakedsid"
client._http = AsyncMock()
client._http.get = AsyncMock(return_value=make_response({"success": True, "data": {}}))
with caplog.at_level(logging.DEBUG, logger="mcp_synology_container.dsm_client"):
await client.request(
"SYNO.API.Auth",
"login",
params={
"passwd": "s3cr3tP4ss",
"device_id": "deviceABC",
"otp_code": "654321",
"device_token": "tokXYZ",
"account": "admin", # not sensitive — keep visible
},
)
log_text = "\n".join(rec.getMessage() for rec in caplog.records)
assert "s3cr3tP4ss" not in log_text
assert "deviceABC" not in log_text
assert "654321" not in log_text
assert "tokXYZ" not in log_text
assert "leakedsid" not in log_text
# Non-sensitive param and masking token must be visible.
assert "admin" in log_text
assert "***" in log_text
# ──────────────────────────────────────────────────────────────────────
# Session re-auth retry
# ──────────────────────────────────────────────────────────────────────
def make_auth_manager(new_sid: str = "fresh_sid") -> AsyncMock:
"""Build a fake AuthManager with a counted login()."""
mgr = AsyncMock()
mgr.login = AsyncMock(return_value=new_sid)
return mgr
@pytest.mark.asyncio
async def test_request_reauths_on_106_and_retries() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "old_sid"
client._auth_manager = make_auth_manager("new_sid")
client._http = AsyncMock()
responses = [
make_response({"success": False, "error": {"code": 106}}),
make_response({"success": True, "data": {"ok": 1}}),
]
client._http.get = AsyncMock(side_effect=responses)
result = await client.request("SYNO.Docker.Project", "list")
assert result == {"ok": 1}
assert client._auth_manager.login.call_count == 1
assert client._sid == "new_sid"
assert client._http.get.call_count == 2
@pytest.mark.asyncio
async def test_request_reauth_single_retry_only() -> None:
"""Second 106 after re-auth must NOT trigger another login."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "old_sid"
client._auth_manager = make_auth_manager("new_sid")
client._http = AsyncMock()
responses = [
make_response({"success": False, "error": {"code": 106}}),
make_response({"success": False, "error": {"code": 106}}),
]
client._http.get = AsyncMock(side_effect=responses)
with pytest.raises(SynologyError) as exc_info:
await client.request("SYNO.Docker.Project", "list")
assert exc_info.value.code == 106
assert client._auth_manager.login.call_count == 1
assert client._http.get.call_count == 2
@pytest.mark.asyncio
async def test_request_reauth_login_failure_surfaces_reauth_error() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "old_sid"
client._auth_manager = AsyncMock()
client._auth_manager.login = AsyncMock(side_effect=RuntimeError("login broke"))
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response({"success": False, "error": {"code": 106}})
)
with pytest.raises(SynologyError) as exc_info:
await client.request("SYNO.Docker.Project", "list")
assert exc_info.value.code == 106
assert "Re-authentication failed" in str(exc_info.value)
assert isinstance(exc_info.value.__cause__, RuntimeError)
@pytest.mark.asyncio
async def test_request_no_auth_manager_no_retry_on_106() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "old_sid"
client._auth_manager = None
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response({"success": False, "error": {"code": 106}})
)
with pytest.raises(SynologyError) as exc_info:
await client.request("SYNO.Docker.Project", "list")
assert exc_info.value.code == 106
assert client._http.get.call_count == 1
@pytest.mark.asyncio
async def test_request_reauth_thundering_herd_login_called_once() -> None:
"""Two concurrent requests both hitting 106 must share one login.
The reauth lock plus the old_sid snapshot check in dsm_client ensures the
second task observes the already-refreshed SID and skips the redundant
login call.
"""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "old_sid"
client._auth_manager = make_auth_manager("new_sid")
# Four sequential responses consumed in call order by asyncio.gather:
# both initial calls get 106, both retries get success.
responses = [
make_response({"success": False, "error": {"code": 106}}),
make_response({"success": False, "error": {"code": 106}}),
make_response({"success": True, "data": {"n": 1}}),
make_response({"success": True, "data": {"n": 2}}),
]
client._http = AsyncMock()
client._http.get = AsyncMock(side_effect=responses)
# Make login slow so the two tasks genuinely contend for the lock.
login_barrier = asyncio.Event()
async def slow_login(_client: Any) -> str:
await asyncio.sleep(0.01)
login_barrier.set()
return "new_sid"
client._auth_manager.login = AsyncMock(side_effect=slow_login)
results = await asyncio.gather(
client.request("SYNO.Docker.Project", "list"),
client.request("SYNO.Docker.Project", "list"),
)
assert all(isinstance(r, dict) for r in results)
assert client._auth_manager.login.call_count == 1
assert client._sid == "new_sid"
# 2 initial + 2 retry = 4 total GETs.
assert client._http.get.call_count == 4
# ──────────────────────────────────────────────────────────────────────
# trigger_build_stream
# ──────────────────────────────────────────────────────────────────────
async def _aiter_from_lines(lines: list[str]):
"""Async iterator that yields the given strings (httpx aiter_lines shape)."""
for line in lines:
yield line
@pytest.mark.asyncio
async def test_build_stream_collects_streamed_log_lines() -> None:
"""The streamed plaintext body is consumed line-by-line and returned."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
resp = make_response(
{"placeholder": True},
content_type="text/html",
)
resp.aiter_lines = lambda: _aiter_from_lines(
[
"Container vault Running",
"Container vault-db Running",
"",
]
)
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
result = await client.trigger_build_stream("proj-1")
assert isinstance(result, str)
assert "Container vault Running" in result
assert "Container vault-db Running" in result
# Empty lines are dropped.
assert "\n\n" not in result
@pytest.mark.asyncio
async def test_build_stream_collects_failure_log_with_daemon_error() -> None:
"""Failure log (svc Error + daemon-error line) is returned verbatim."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
resp = make_response({"placeholder": True}, content_type="text/html")
resp.aiter_lines = lambda: _aiter_from_lines(
[
"nginx Error",
(
"Error response from daemon: manifest for nginx:9.9.9-nonexistent "
"not found: manifest unknown"
),
]
)
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
result = await client.trigger_build_stream("proj-1")
assert "nginx Error" in result
assert "Error response from daemon:" in result
assert "manifest unknown" in result
@pytest.mark.asyncio
async def test_build_stream_json_error_raises_synology_error() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
body = {"success": False, "error": {"code": 1401}}
resp = make_response(body, content_type="application/json")
resp.aiter_bytes = lambda: _aiter_from_bytes([json.dumps(body).encode("utf-8")])
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
with pytest.raises(SynologyError) as exc_info:
await client.trigger_build_stream("proj-1")
assert exc_info.value.code == 1401
@pytest.mark.asyncio
async def test_build_stream_json_success_accepted() -> None:
"""JSON envelope with success=true → no streamed log, returns empty string."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
body = {"success": True}
resp = make_response(body, content_type="application/json")
resp.aiter_bytes = lambda: _aiter_from_bytes([json.dumps(body).encode("utf-8")])
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
result = await client.trigger_build_stream("proj-1")
assert result == ""
@pytest.mark.asyncio
async def test_build_stream_read_timeout_returns_marker() -> None:
"""Header-arrival ReadTimeout returns the timeout marker (build still running)."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
# stream() itself raises ReadTimeout before entering the ctx manager.
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(side_effect=httpx.ReadTimeout("headers timed out"))
ctx.__aexit__ = AsyncMock(return_value=None)
client._http.stream = MagicMock(return_value=ctx)
result = await client.trigger_build_stream("proj-1")
assert result == DsmClient.BUILD_STREAM_TIMEOUT_MARKER
@pytest.mark.asyncio
async def test_build_stream_per_chunk_read_timeout_returns_partial_log() -> None:
"""If a streamed chunk times out mid-build, return the partial log + marker."""
async def aiter_partial_then_timeout():
yield "Container vault Pulling"
raise httpx.ReadTimeout("chunk timed out")
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
resp = make_response({"placeholder": True}, content_type="text/html")
resp.aiter_lines = aiter_partial_then_timeout
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
result = await client.trigger_build_stream("proj-1")
assert "Container vault Pulling" in result
assert DsmClient.BUILD_STREAM_TIMEOUT_MARKER in result
@pytest.mark.asyncio
async def test_build_stream_wallclock_budget_breaks_loop(monkeypatch) -> None:
"""When the wall-clock budget is exhausted, the loop returns partial log + marker."""
import mcp_synology_container.dsm_client as dsm_mod
# Force the budget to a value just below the initial loop.time() so the
# check after the first emitted line trips immediately.
monkeypatch.setattr(DsmClient, "BUILD_STREAM_BUDGET", -1.0)
async def aiter_many():
yield "Container vault Pulling"
yield "Container vault Running" # Should NOT appear in the result.
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
resp = make_response({"placeholder": True}, content_type="text/html")
resp.aiter_lines = aiter_many
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
result = await client.trigger_build_stream("proj-1")
assert "Container vault Pulling" in result
assert "Container vault Running" not in result
assert DsmClient.BUILD_STREAM_TIMEOUT_MARKER in result
# Keep dsm_mod referenced so the test doesn't trip an unused-import warning.
assert dsm_mod.DsmClient is DsmClient
@pytest.mark.asyncio
async def test_build_stream_connect_error_raises_synology_error() -> None:
"""M-4: ConnectError (DSM unreachable) must surface as SynologyError, not raw httpx."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(side_effect=httpx.ConnectError("nas offline"))
ctx.__aexit__ = AsyncMock(return_value=None)
client._http.stream = MagicMock(return_value=ctx)
with pytest.raises(SynologyError) as exc_info:
await client.trigger_build_stream("proj-1")
msg = str(exc_info.value)
assert "transport error" in msg
assert "ConnectError" in msg
# Cause is suppressed via `from None` to keep error message clean.
assert exc_info.value.__cause__ is None
@pytest.mark.asyncio
async def test_build_stream_remote_protocol_error_raises_synology_error() -> None:
"""M-4: RemoteProtocolError (broken response framing) is also converted."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(side_effect=httpx.RemoteProtocolError("server disconnected"))
ctx.__aexit__ = AsyncMock(return_value=None)
client._http.stream = MagicMock(return_value=ctx)
with pytest.raises(SynologyError) as exc_info:
await client.trigger_build_stream("proj-1")
assert "RemoteProtocolError" in str(exc_info.value)
@pytest.mark.asyncio
async def test_build_stream_http_500_scrubs_sid() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "topsecret"
client._http = AsyncMock()
resp = make_response(
None,
status=500,
url="https://nas.local/webapi/entry.cgi?api=SYNO.Docker.Project&_sid=topsecret",
)
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
with pytest.raises(SynologyError) as exc_info:
await client.trigger_build_stream("proj-1")
msg = str(exc_info.value)
assert "topsecret" not in msg
assert "_sid=***" in msg
assert exc_info.value.code == 500
@pytest.mark.asyncio
async def test_build_stream_malformed_json_body_treated_as_accepted() -> None:
"""Defensive branch: JSON content-type but body fails to parse → empty string."""
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
resp = make_response({"placeholder": True}, content_type="application/json")
resp.aiter_bytes = lambda: _aiter_from_bytes([b"not-json-at-all{{"])
client._http.stream = MagicMock(return_value=make_stream_ctx(resp))
result = await client.trigger_build_stream("proj-1")
assert result == ""
# ──────────────────────────────────────────────────────────────────────
# upload_text / download_text
# ──────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_upload_text_happy_path_builds_form_and_file_parts() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "sid-up"
client._http = AsyncMock()
client._http.post = AsyncMock(
return_value=make_response({"success": True, "data": {"wrote": 1}})
)
result = await client.upload_text(
"/volume1/docker/app", "compose.yaml", "version: '3'\n", overwrite=True
)
assert result == {"wrote": 1}
assert client._http.post.call_count == 1
_, kwargs = client._http.post.call_args
form = kwargs["data"]
assert form["api"] == "SYNO.FileStation.Upload"
assert form["version"] == "2"
assert form["method"] == "upload"
assert form["path"] == "/volume1/docker/app"
assert form["overwrite"] == "true"
assert form["create_parents"] == "true"
files = kwargs["files"]
name, blob, ctype = files["file"]
assert name == "compose.yaml"
assert blob == b"version: '3'\n"
assert ctype == "text/plain"
assert kwargs["params"] == {"_sid": "sid-up"}
@pytest.mark.asyncio
async def test_upload_text_error_response_raises() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
client._http.post = AsyncMock(
return_value=make_response({"success": False, "error": {"code": 900}})
)
with pytest.raises(SynologyError) as exc_info:
await client.upload_text("/volume1/docker/app", "compose.yaml", "x")
assert exc_info.value.code == 900
@pytest.mark.asyncio
async def test_download_text_plain_text_returns_body() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
body_text = "version: '3'\nservices: {}\n"
resp = make_response(
None,
content_type="text/plain",
text=body_text,
)
client._http.get = AsyncMock(return_value=resp)
result = await client.download_text("/volume1/docker/app/compose.yaml")
assert result == body_text
@pytest.mark.asyncio
async def test_download_text_json_error_branch_raises() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
resp = make_response(
{"success": False, "error": {"code": 408}},
content_type="application/json",
)
client._http.get = AsyncMock(return_value=resp)
with pytest.raises(SynologyError) as exc_info:
await client.download_text("/volume1/docker/app/missing.yaml")
# 408 without "Auth" in api name → common fallback.
assert exc_info.value.code == 408
# ──────────────────────────────────────────────────────────────────────
# _ensure_initialized — double-checked-locking + M4 negative cache
# ──────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_ensure_initialized_runs_once_under_concurrency(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Two parallel _ensure_initialized() calls on a fresh client → single init.
The double-checked locking in _ensure_initialized guarantees only one of
the two tasks actually runs the init body; the second awaits on the
asyncio.Lock, then finds _initialized already True and returns early.
"""
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
query_calls = 0
async def fake_query_api_info() -> dict[str, dict[str, Any]]:
nonlocal query_calls
query_calls += 1
# Yield control so the other task enters the lock-wait branch.
await asyncio.sleep(0.01)
seed_api_cache(client)
return client._api_cache
monkeypatch.setattr(client, "query_api_info", fake_query_api_info)
auth_mgr = make_auth_manager("sid-init")
client._auth_manager = auth_mgr
await asyncio.gather(
client._ensure_initialized(),
client._ensure_initialized(),
)
assert query_calls == 1
assert auth_mgr.login.call_count == 1
assert client._initialized is True
@pytest.mark.asyncio
async def test_ensure_initialized_negative_cache_reraises_without_retry(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""M4: after init failure, cached error is re-raised during cooldown."""
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
query_calls = 0
async def failing_query() -> dict[str, Any]:
nonlocal query_calls
query_calls += 1
raise httpx.ConnectError("nas offline")
monkeypatch.setattr(client, "query_api_info", failing_query)
# Pin the clock so the cooldown window is deterministic.
fake_time = [1000.0]
monkeypatch.setattr(
"asyncio.get_event_loop",
lambda: SimpleNamespace(time=lambda: fake_time[0]),
)
with pytest.raises(httpx.ConnectError):
await client._ensure_initialized()
assert query_calls == 1
assert client._init_error is not None
assert client._init_error_until == pytest.approx(1000.0 + INIT_ERROR_COOLDOWN)
# Second call within cooldown → cached error, no new query_api_info.
fake_time[0] = 1010.0
with pytest.raises(httpx.ConnectError):
await client._ensure_initialized()
assert query_calls == 1
@pytest.mark.asyncio
async def test_ensure_initialized_retries_after_cooldown_expires(
monkeypatch: pytest.MonkeyPatch,
) -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
query_calls = 0
async def failing_query() -> dict[str, Any]:
nonlocal query_calls
query_calls += 1
raise httpx.ConnectError("nas offline")
monkeypatch.setattr(client, "query_api_info", failing_query)
fake_time = [2000.0]
monkeypatch.setattr(
"asyncio.get_event_loop",
lambda: SimpleNamespace(time=lambda: fake_time[0]),
)
with pytest.raises(httpx.ConnectError):
await client._ensure_initialized()
assert query_calls == 1
# Fast-forward past the cooldown window. The code snapshots `now`
# BEFORE checking _init_error_until, so bumping the clock is enough.
fake_time[0] = 2000.0 + INIT_ERROR_COOLDOWN + 1.0
with pytest.raises(httpx.ConnectError):
await client._ensure_initialized()
assert query_calls == 2
@pytest.mark.asyncio
async def test_ensure_initialized_success_clears_cached_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""After a successful init, _init_error and _init_error_until reset."""
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
# Pre-populate stale error state (as if a previous init failed).
client._init_error = httpx.ConnectError("stale")
client._init_error_until = 0.0 # already expired
async def ok_query() -> dict[str, Any]:
seed_api_cache(client)
return client._api_cache
monkeypatch.setattr(client, "query_api_info", ok_query)
client._auth_manager = None
await client._ensure_initialized()
assert client._initialized is True
assert client._init_error is None
assert client._init_error_until == 0.0
# ──────────────────────────────────────────────────────────────────────
# Auxiliary coverage: sid property, set_auth_manager, _get_http guard,
# query_api_info, post_request
# ──────────────────────────────────────────────────────────────────────
def test_sid_property_roundtrip() -> None:
client = DsmClient(base_url="https://nas.local:443")
assert client.sid is None
client.sid = "new-sid"
assert client.sid == "new-sid"
assert client._sid == "new-sid"
def test_set_auth_manager_registers_instance() -> None:
client = DsmClient(base_url="https://nas.local:443")
dummy = object()
client.set_auth_manager(dummy) # type: ignore[arg-type]
assert client._auth_manager is dummy
def test_get_http_without_context_manager_raises() -> None:
client = DsmClient(base_url="https://nas.local:443")
with pytest.raises(RuntimeError, match="async context manager"):
client._get_http()
@pytest.mark.asyncio
async def test_query_api_info_success_caches_apis() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response(
{
"success": True,
"data": {
"SYNO.Docker.Project": {
"path": "entry.cgi",
"minVersion": 1,
"maxVersion": 2,
},
"SYNO.API.Auth": {
"path": "auth.cgi",
# minVersion/maxVersion omitted → defaults.
},
},
}
)
)
result = await client.query_api_info()
assert set(result.keys()) == {"SYNO.Docker.Project", "SYNO.API.Auth"}
assert result["SYNO.Docker.Project"]["maxVersion"] == 2
assert result["SYNO.API.Auth"]["minVersion"] == 1 # default
assert result["SYNO.API.Auth"]["maxVersion"] == 1 # default
assert client._api_cache == result
@pytest.mark.asyncio
async def test_query_api_info_error_envelope_raises() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response({"success": False, "error": {"code": 100}})
)
with pytest.raises(SynologyError) as exc_info:
await client.query_api_info()
assert exc_info.value.code == 100
@pytest.mark.asyncio
async def test_query_api_info_http_error_scrubs_sid() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response(
None,
status=500,
url="https://nas.local/webapi/query.cgi?_sid=hushhush",
)
)
with pytest.raises(SynologyError) as exc_info:
await client.query_api_info()
assert "hushhush" not in str(exc_info.value)
assert "_sid=***" in str(exc_info.value)
@pytest.mark.asyncio
async def test_post_request_success_returns_data() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "sid-post"
client._http = AsyncMock()
client._http.post = AsyncMock(
return_value=make_response({"success": True, "data": {"deleted": 1}})
)
result = await client.post_request("SYNO.Docker.Project", "delete", params={"id": "proj-1"})
assert result == {"deleted": 1}
_, kwargs = client._http.post.call_args
# Form body carries api/version/method plus custom params.
assert kwargs["data"]["api"] == "SYNO.Docker.Project"
assert kwargs["data"]["method"] == "delete"
assert kwargs["data"]["id"] == "proj-1"
# SID travels in the query string, not the form body.
assert kwargs["params"] == {"_sid": "sid-post"}
@pytest.mark.asyncio
async def test_post_request_api_not_cached_raises_102() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._initialized = True
client._api_cache = {}
client._http = AsyncMock()
with pytest.raises(SynologyError) as exc_info:
await client.post_request("SYNO.Does.Not.Exist", "delete")
assert exc_info.value.code == 102
@pytest.mark.asyncio
async def test_post_request_error_envelope_raises() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._http = AsyncMock()
client._http.post = AsyncMock(
return_value=make_response({"success": False, "error": {"code": 105}})
)
with pytest.raises(SynologyError) as exc_info:
await client.post_request("SYNO.Docker.Project", "delete")
assert exc_info.value.code == 105
assert "Permission denied" in str(exc_info.value)
@pytest.mark.asyncio
async def test_post_request_http_error_scrubs_sid() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "shh"
client._http = AsyncMock()
client._http.post = AsyncMock(
return_value=make_response(
None,
status=500,
url="https://nas.local/webapi/entry.cgi?_sid=shh",
)
)
with pytest.raises(SynologyError) as exc_info:
await client.post_request("SYNO.Docker.Project", "delete")
assert "shh" not in str(exc_info.value) or "_sid=***" in str(exc_info.value)
assert exc_info.value.code == 500
@pytest.mark.asyncio
async def test_upload_text_api_not_cached_raises_102() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._initialized = True
client._api_cache = {} # FileStation.Upload missing
client._http = AsyncMock()
with pytest.raises(SynologyError) as exc_info:
await client.upload_text("/tmp", "x.txt", "content")
assert exc_info.value.code == 102
@pytest.mark.asyncio
async def test_download_text_api_not_cached_raises_102() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._initialized = True
client._api_cache = {}
client._http = AsyncMock()
with pytest.raises(SynologyError) as exc_info:
await client.download_text("/tmp/x.txt")
assert exc_info.value.code == 102
@pytest.mark.asyncio
async def test_download_text_http_error_scrubs_sid() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "dontleak"
client._http = AsyncMock()
client._http.get = AsyncMock(
return_value=make_response(
None,
status=500,
url="https://nas.local/webapi/FileStation/file_download.cgi?_sid=dontleak",
)
)
with pytest.raises(SynologyError) as exc_info:
await client.download_text("/volume1/docker/app/compose.yaml")
assert "dontleak" not in str(exc_info.value)
assert "_sid=***" in str(exc_info.value)
@pytest.mark.asyncio
async def test_upload_text_http_error_scrubs_sid() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
mark_initialized(client)
client._sid = "uploadsecret"
client._http = AsyncMock()
client._http.post = AsyncMock(
return_value=make_response(
None,
status=500,
url="https://nas.local/webapi/FileStation/api_upload.cgi?_sid=uploadsecret",
)
)
with pytest.raises(SynologyError) as exc_info:
await client.upload_text("/tmp", "x.txt", "content")
assert "uploadsecret" not in str(exc_info.value)
assert "_sid=***" in str(exc_info.value)
@pytest.mark.asyncio
async def test_build_stream_api_not_cached_raises_102() -> None:
async with DsmClient(base_url="https://nas.local:443") as client:
client._initialized = True
client._api_cache = {}
client._http = AsyncMock()
with pytest.raises(SynologyError) as exc_info:
await client.trigger_build_stream("proj-1")
assert exc_info.value.code == 102