feat: v0.4.0 — welle A (8 new tools: container lifecycle, inspect_image, system_overview)
Closes #1, #4, #6, #7. Container lifecycle (#1, #7): - start_container, stop_container, restart_container, pause_container, unpause_container — all via SYNO.Docker.Container with JSON-encoded name parameter, routed through _resolve_container_name for hash- prefix resolution. stop is live-verified; the other four are implemented by symmetry on the same API surface. inspect_image (#4): - Returns full image detail (layers, env, ports, entrypoint/cmd, labels) via SYNO.Docker.Image/get. Accepts name:tag, registry- prefixed names, and bare hashes. Defensive response parsing handles both wrapped (details.*) and flat envelopes. system_overview (#6): - Aggregates CPU %, RAM, network and block I/O across all running containers plus running/stopped counts. No new DSM endpoint — composed from list + stats, reusing the container_stats CPU formula. Per-source errors are non-fatal. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -704,3 +704,157 @@ async def test_container_stats_no_precpu_graceful():
|
||||
|
||||
result = await tools["container_stats"]("myapp")
|
||||
assert "0.00%" in result # graceful fallback to 0
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Lifecycle: start / stop / restart / pause / unpause
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
# Tools that don't have a confirmation gate
|
||||
_NO_CONFIRM_LIFECYCLE = [
|
||||
("start_container", "start", "Started"),
|
||||
("unpause_container", "unpause", "Unpaused"),
|
||||
]
|
||||
|
||||
# Tools that require confirmed=True
|
||||
_CONFIRM_LIFECYCLE = [
|
||||
("stop_container", "stop", "Stopped", "stop"),
|
||||
("restart_container", "restart", "Restarted", "restart"),
|
||||
("pause_container", "pause", "Paused", "pause"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tool_name, dsm_method, success_word", _NO_CONFIRM_LIFECYCLE)
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_no_confirm_calls_dsm(tool_name, dsm_method, success_word):
|
||||
"""start_container and unpause_container call DSM directly with json-encoded name."""
|
||||
from mcp_synology_container.modules.containers import register_containers
|
||||
|
||||
client = AsyncMock()
|
||||
client.request.return_value = {}
|
||||
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_containers(mcp, make_config(), client)
|
||||
|
||||
result = await tools[tool_name]("myapp_web")
|
||||
|
||||
# Find the matching lifecycle call (a list() may also occur via _resolve_container_name)
|
||||
calls = [c for c in client.request.call_args_list if c.args[1] == dsm_method]
|
||||
assert len(calls) == 1
|
||||
call = calls[0]
|
||||
assert call.args[0] == "SYNO.Docker.Container"
|
||||
assert call.kwargs["version"] == 1
|
||||
assert call.kwargs["params"] == {"name": '"myapp_web"'}
|
||||
assert success_word in result
|
||||
assert "myapp_web" in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tool_name, dsm_method, success_word, _action", _CONFIRM_LIFECYCLE)
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_confirmed_calls_dsm(tool_name, dsm_method, success_word, _action):
|
||||
"""stop/restart/pause call DSM with confirmed=True using json-encoded name."""
|
||||
from mcp_synology_container.modules.containers import register_containers
|
||||
|
||||
client = AsyncMock()
|
||||
client.request.return_value = {}
|
||||
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_containers(mcp, make_config(), client)
|
||||
|
||||
result = await tools[tool_name]("myapp_web", confirmed=True)
|
||||
|
||||
calls = [c for c in client.request.call_args_list if c.args[1] == dsm_method]
|
||||
assert len(calls) == 1
|
||||
call = calls[0]
|
||||
assert call.args[0] == "SYNO.Docker.Container"
|
||||
assert call.kwargs["version"] == 1
|
||||
assert call.kwargs["params"] == {"name": '"myapp_web"'}
|
||||
assert success_word in result
|
||||
assert "myapp_web" in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tool_name, _dsm_method, _success_word, action", _CONFIRM_LIFECYCLE)
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_preview_without_confirmation(
|
||||
tool_name, _dsm_method, _success_word, action
|
||||
):
|
||||
"""stop/restart/pause without confirmed=True must NOT call DSM."""
|
||||
from mcp_synology_container.modules.containers import register_containers
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_containers(mcp, make_config(), client)
|
||||
|
||||
result = await tools[tool_name]("myapp_web")
|
||||
assert "Preview" in result
|
||||
assert action in result
|
||||
assert "myapp_web" in result
|
||||
assert "confirmed=True" in result
|
||||
client.request.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tool_name, dsm_method, kwargs",
|
||||
[
|
||||
("start_container", "start", {}),
|
||||
("unpause_container", "unpause", {}),
|
||||
("stop_container", "stop", {"confirmed": True}),
|
||||
("restart_container", "restart", {"confirmed": True}),
|
||||
("pause_container", "pause", {"confirmed": True}),
|
||||
],
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_resolves_hash_prefix(tool_name, dsm_method, kwargs):
|
||||
"""All lifecycle tools resolve 'jenkins' to 'f93cb8b504f7_jenkins' for the DSM call."""
|
||||
from mcp_synology_container.modules.containers import register_containers
|
||||
|
||||
async def mock_request(api, method, **kw):
|
||||
if api == "SYNO.Docker.Container" and method == "list":
|
||||
return HASH_PREFIXED_CONTAINERS_DATA
|
||||
if api == "SYNO.Docker.Container" and method == dsm_method:
|
||||
assert kw["params"]["name"] == '"f93cb8b504f7_jenkins"'
|
||||
assert kw["version"] == 1
|
||||
return {}
|
||||
return {}
|
||||
|
||||
client = AsyncMock()
|
||||
client.request.side_effect = mock_request
|
||||
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_containers(mcp, make_config(), client)
|
||||
|
||||
# User passes the clean name; resolved to the hash-prefixed name for DSM,
|
||||
# but the display name in the success message must be hash-stripped.
|
||||
result = await tools[tool_name]("jenkins", **kwargs)
|
||||
assert "jenkins" in result
|
||||
assert "f93cb8b504f7" not in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tool_name, kwargs, error_verb",
|
||||
[
|
||||
("start_container", {}, "starting"),
|
||||
("unpause_container", {}, "unpausing"),
|
||||
("stop_container", {"confirmed": True}, "stopping"),
|
||||
("restart_container", {"confirmed": True}, "restarting"),
|
||||
("pause_container", {"confirmed": True}, "pausing"),
|
||||
],
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_api_error(tool_name, kwargs, error_verb):
|
||||
"""API errors during lifecycle ops return a human-readable error message."""
|
||||
from mcp_synology_container.dsm_client import SynologyError
|
||||
from mcp_synology_container.modules.containers import register_containers
|
||||
|
||||
client = AsyncMock()
|
||||
client.request.side_effect = SynologyError("API error", code=102)
|
||||
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_containers(mcp, make_config(), client)
|
||||
|
||||
result = await tools[tool_name]("myapp_web", **kwargs)
|
||||
assert "Error" in result
|
||||
assert error_verb in result
|
||||
assert "myapp_web" in result
|
||||
|
||||
@@ -411,6 +411,216 @@ async def test_delete_image_api_error():
|
||||
assert "114" in result
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# inspect_image
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
SAMPLE_INSPECT = {
|
||||
"details": {
|
||||
"Id": "sha256:aaaa",
|
||||
"RepoTags": ["nginx:1.24"],
|
||||
"Size": 50 * 1024 * 1024,
|
||||
"Created": "2024-01-01T00:00:00Z",
|
||||
"Config": {
|
||||
"Env": ["NGINX_VERSION=1.24", "PATH=/usr/local/sbin"],
|
||||
"ExposedPorts": {"80/tcp": {}, "443/tcp": {}},
|
||||
"Entrypoint": ["/docker-entrypoint.sh"],
|
||||
"Cmd": ["nginx", "-g", "daemon off;"],
|
||||
"WorkingDir": "/",
|
||||
"Labels": {"maintainer": "NGINX Docker Maintainers"},
|
||||
},
|
||||
"RootFS": {
|
||||
"Type": "layers",
|
||||
"Layers": ["sha256:layer1", "sha256:layer2"],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _make_inspect_client(inspect_payload=None, images_payload=None):
|
||||
"""Build a mock DsmClient that returns SAMPLE_IMAGES for list and inspect_payload for get."""
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Image" and method == "list":
|
||||
return images_payload if images_payload is not None else SAMPLE_IMAGES
|
||||
if api == "SYNO.Docker.Image" and method == "get":
|
||||
return inspect_payload if inspect_payload is not None else SAMPLE_INSPECT
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
return client
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_by_name_tag():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = _make_inspect_client()
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="nginx:1.24")
|
||||
assert "nginx" in result
|
||||
assert "1.24" in result
|
||||
assert "MiB" in result # size formatted via _human_size
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_by_hash():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
# Inspect data shaped for redis (sha256:cccc)
|
||||
redis_inspect = {
|
||||
"details": {
|
||||
"Id": "sha256:cccc",
|
||||
"RepoTags": ["redis:7"],
|
||||
"Size": 30 * 1024 * 1024,
|
||||
"Config": {"Env": [], "ExposedPorts": {}, "Cmd": ["redis-server"]},
|
||||
"RootFS": {"Layers": ["sha256:rlayer1"]},
|
||||
}
|
||||
}
|
||||
client = _make_inspect_client(inspect_payload=redis_inspect)
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="sha256:cccc")
|
||||
assert "redis" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_not_found():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = _make_inspect_client()
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="bogus:latest")
|
||||
assert "not found" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_shows_env_vars():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = _make_inspect_client()
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="nginx:1.24")
|
||||
assert "NGINX_VERSION=1.24" in result
|
||||
assert "PATH=/usr/local/sbin" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_shows_exposed_ports():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = _make_inspect_client()
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="nginx:1.24")
|
||||
assert "80/tcp" in result
|
||||
assert "443/tcp" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_shows_layers():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = _make_inspect_client()
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="nginx:1.24")
|
||||
assert "Layers" in result
|
||||
# Layer hashes truncated to 12 chars after sha256:
|
||||
assert "layer1" in result
|
||||
assert "layer2" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_shows_entrypoint_cmd():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = _make_inspect_client()
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="nginx:1.24")
|
||||
assert "/docker-entrypoint.sh" in result
|
||||
assert "nginx" in result
|
||||
assert "daemon off;" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_registry_prefixed():
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
registry_images = {
|
||||
"images": [
|
||||
{
|
||||
"id": "sha256:dddd",
|
||||
"repository": "ghcr.io/foo/bar",
|
||||
"tags": ["v1"],
|
||||
"size": 100 * 1024 * 1024,
|
||||
"created": 1700000000,
|
||||
"upgradable": False,
|
||||
}
|
||||
]
|
||||
}
|
||||
registry_inspect = {
|
||||
"details": {
|
||||
"Id": "sha256:dddd",
|
||||
"RepoTags": ["ghcr.io/foo/bar:v1"],
|
||||
"Size": 100 * 1024 * 1024,
|
||||
"Config": {"Env": [], "ExposedPorts": {}, "Cmd": ["/app"]},
|
||||
"RootFS": {"Layers": ["sha256:rlayer1"]},
|
||||
}
|
||||
}
|
||||
client = _make_inspect_client(inspect_payload=registry_inspect, images_payload=registry_images)
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="ghcr.io/foo/bar:v1")
|
||||
assert "ghcr.io/foo/bar" in result
|
||||
assert "v1" in result
|
||||
|
||||
# Verify the get call used the full registry-prefixed repository name
|
||||
get_calls = [
|
||||
c for c in client.request.call_args_list if c.args[:2] == ("SYNO.Docker.Image", "get")
|
||||
]
|
||||
assert get_calls, "inspect_image must call SYNO.Docker.Image/get"
|
||||
params = get_calls[0].kwargs.get("params") or {}
|
||||
assert params.get("name") == "ghcr.io/foo/bar"
|
||||
assert params.get("tag") == "v1"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inspect_image_api_error():
|
||||
from mcp_synology_container.dsm_client import SynologyError
|
||||
from mcp_synology_container.modules.images import register_images
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Image" and method == "list":
|
||||
return SAMPLE_IMAGES
|
||||
if api == "SYNO.Docker.Image" and method == "get":
|
||||
raise SynologyError("inspect failed", code=120)
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_images(mcp, make_config(), client)
|
||||
|
||||
result = await tools["inspect_image"](image_id="nginx:1.24")
|
||||
assert "Error" in result
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# check_image_updates (existing tests preserved)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -394,3 +394,209 @@ async def test_system_prune_preview_network_fetch_failure_is_nonfatal() -> None:
|
||||
# Preview still renders; networks count falls back to 0.
|
||||
assert "preview" in result.lower()
|
||||
assert "Unused networks: 0" in result
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# system_overview
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
# Two-container stats snapshot used to verify aggregation arithmetic.
|
||||
SAMPLE_STATS_OVERVIEW = {
|
||||
"aaa111": {
|
||||
"id": "aaa111",
|
||||
"name": "/web",
|
||||
"cpu_stats": {
|
||||
"cpu_usage": {"total_usage": 2_000_000_000, "percpu_usage": [1, 2]},
|
||||
"system_cpu_usage": 1_000_000_000_000,
|
||||
"online_cpus": 2,
|
||||
},
|
||||
"precpu_stats": {
|
||||
"cpu_usage": {"total_usage": 1_000_000_000},
|
||||
"system_cpu_usage": 999_000_000_000,
|
||||
},
|
||||
"memory_stats": {"usage": 100 * 1024 * 1024, "limit": 1024 * 1024 * 1024},
|
||||
"networks": {
|
||||
"eth0": {"rx_bytes": 1_000_000, "tx_bytes": 500_000},
|
||||
},
|
||||
"blkio_stats": {
|
||||
"io_service_bytes_recursive": [
|
||||
{"op": "Read", "value": 10 * 1024 * 1024},
|
||||
{"op": "Write", "value": 5 * 1024 * 1024},
|
||||
]
|
||||
},
|
||||
},
|
||||
"bbb222": {
|
||||
"id": "bbb222",
|
||||
"name": "/db",
|
||||
"cpu_stats": {
|
||||
"cpu_usage": {"total_usage": 3_000_000_000, "percpu_usage": [1, 2]},
|
||||
"system_cpu_usage": 1_000_000_000_000,
|
||||
"online_cpus": 2,
|
||||
},
|
||||
"precpu_stats": {
|
||||
"cpu_usage": {"total_usage": 2_000_000_000},
|
||||
"system_cpu_usage": 999_000_000_000,
|
||||
},
|
||||
"memory_stats": {"usage": 200 * 1024 * 1024, "limit": 2 * 1024 * 1024 * 1024},
|
||||
"networks": {
|
||||
"eth0": {"rx_bytes": 2_000_000, "tx_bytes": 1_000_000},
|
||||
},
|
||||
"blkio_stats": {
|
||||
"io_service_bytes_recursive": [
|
||||
{"op": "Read", "value": 20 * 1024 * 1024},
|
||||
{"op": "Write", "value": 7 * 1024 * 1024},
|
||||
]
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_overview_aggregates_stats():
|
||||
"""Sums CPU%, memory, net I/O, and block I/O across all containers in stats."""
|
||||
from mcp_synology_container.modules.system import register_system
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Container" and method == "list":
|
||||
return SAMPLE_CONTAINERS
|
||||
if api == "SYNO.Docker.Container" and method == "stats":
|
||||
return SAMPLE_STATS_OVERVIEW
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_system(mcp, make_config(), client)
|
||||
|
||||
result = await tools["system_overview"]()
|
||||
|
||||
# Headline section exists
|
||||
assert "Docker System Overview" in result
|
||||
assert "Aggregated" in result
|
||||
|
||||
# CPU per container: (1e9 / 1e9) * 2 * 100 = 200% each → sum ≈ 400%
|
||||
assert "400.00%" in result
|
||||
|
||||
# Memory total usage = 100 MiB + 200 MiB = 300 MiB; limit = 1 GiB + 2 GiB = 3 GiB
|
||||
assert "300 MiB" in result
|
||||
assert "3.0 GiB" in result
|
||||
|
||||
# Net I/O present (rx/tx sums non-zero)
|
||||
assert "Net I/O" in result
|
||||
assert "rx " in result
|
||||
assert "tx " in result
|
||||
|
||||
# Block I/O: read = 30 MiB, write = 12 MiB
|
||||
assert "30 MiB" in result
|
||||
assert "12 MiB" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_overview_running_vs_stopped_count():
|
||||
"""Counts running and stopped containers from the list response."""
|
||||
from mcp_synology_container.modules.system import register_system
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Container" and method == "list":
|
||||
return SAMPLE_CONTAINERS
|
||||
if api == "SYNO.Docker.Container" and method == "stats":
|
||||
return SAMPLE_STATS_OVERVIEW
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_system(mcp, make_config(), client)
|
||||
|
||||
result = await tools["system_overview"]()
|
||||
|
||||
# SAMPLE_CONTAINERS has 2 running ("web", "db") and 1 stopped ("old")
|
||||
assert "running" in result
|
||||
assert "stopped" in result
|
||||
assert " 2 running" in result
|
||||
assert " 1 stopped" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_overview_no_containers():
|
||||
"""Empty container list and empty stats — shows zero counts, no aggregation block."""
|
||||
from mcp_synology_container.modules.system import register_system
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Container" and method == "list":
|
||||
return {"containers": []}
|
||||
if api == "SYNO.Docker.Container" and method == "stats":
|
||||
return {}
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_system(mcp, make_config(), client)
|
||||
|
||||
result = await tools["system_overview"]()
|
||||
assert " 0 running" in result
|
||||
assert " 0 stopped" in result
|
||||
# No aggregation block when there are no stats entries
|
||||
assert "Aggregated" not in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_overview_container_list_error_graceful():
|
||||
"""list fails, stats works → still shows aggregated stats and a warning."""
|
||||
from mcp_synology_container.dsm_client import SynologyError
|
||||
from mcp_synology_container.modules.system import register_system
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Container" and method == "list":
|
||||
raise SynologyError("list failed", code=102)
|
||||
if api == "SYNO.Docker.Container" and method == "stats":
|
||||
return SAMPLE_STATS_OVERVIEW
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_system(mcp, make_config(), client)
|
||||
|
||||
result = await tools["system_overview"]()
|
||||
# Counts default to 0 (list failed), but aggregation still runs
|
||||
assert "Warnings" in result
|
||||
assert "list" in result
|
||||
assert "Aggregated" in result
|
||||
assert "400.00%" in result # stats still aggregated
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_overview_stats_error_graceful():
|
||||
"""stats fails, list works → still shows counts and a warning."""
|
||||
from mcp_synology_container.dsm_client import SynologyError
|
||||
from mcp_synology_container.modules.system import register_system
|
||||
|
||||
client = AsyncMock()
|
||||
|
||||
async def mock_request(api, method, **kwargs):
|
||||
if api == "SYNO.Docker.Container" and method == "list":
|
||||
return SAMPLE_CONTAINERS
|
||||
if api == "SYNO.Docker.Container" and method == "stats":
|
||||
raise SynologyError("stats failed", code=102)
|
||||
return {}
|
||||
|
||||
client.request.side_effect = mock_request
|
||||
mcp, tools = make_mock_mcp()
|
||||
register_system(mcp, make_config(), client)
|
||||
|
||||
result = await tools["system_overview"]()
|
||||
# Counts still displayed
|
||||
assert " 2 running" in result
|
||||
assert " 1 stopped" in result
|
||||
# Warning surfaced for the stats failure
|
||||
assert "Warnings" in result
|
||||
assert "stats" in result
|
||||
# No aggregation block — stats unavailable
|
||||
assert "Aggregated" not in result
|
||||
|
||||
Reference in New Issue
Block a user