12d532da7b
Closes #1, #4, #6, #7. Container lifecycle (#1, #7): - start_container, stop_container, restart_container, pause_container, unpause_container — all via SYNO.Docker.Container with JSON-encoded name parameter, routed through _resolve_container_name for hash- prefix resolution. stop is live-verified; the other four are implemented by symmetry on the same API surface. inspect_image (#4): - Returns full image detail (layers, env, ports, entrypoint/cmd, labels) via SYNO.Docker.Image/get. Accepts name:tag, registry- prefixed names, and bare hashes. Defensive response parsing handles both wrapped (details.*) and flat envelopes. system_overview (#6): - Aggregates CPU %, RAM, network and block I/O across all running containers plus running/stopped counts. No new DSM endpoint — composed from list + stats, reusing the container_stats CPU formula. Per-source errors are non-fatal. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
603 lines
20 KiB
Python
603 lines
20 KiB
Python
"""Tests for modules/system.py."""
|
|
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
|
|
def make_mock_mcp():
|
|
tools: dict = {}
|
|
|
|
class MockMCP:
|
|
def tool(self):
|
|
def decorator(fn):
|
|
tools[fn.__name__] = fn
|
|
return fn
|
|
|
|
return decorator
|
|
|
|
return MockMCP(), tools
|
|
|
|
|
|
def make_config():
|
|
from mcp_synology_container.config import AppConfig, ConnectionConfig
|
|
|
|
return AppConfig(
|
|
schema_version=1,
|
|
connection=ConnectionConfig(host="nas.local", port=443, https=True, verify_ssl=True),
|
|
)
|
|
|
|
|
|
SAMPLE_IMAGES = {
|
|
"images": [
|
|
{
|
|
"id": "sha256:aaaa",
|
|
"repository": "nginx",
|
|
"tags": ["1.24"],
|
|
"size": 50 * 1024 * 1024,
|
|
},
|
|
{
|
|
"id": "sha256:bbbb",
|
|
"repository": "postgres",
|
|
"tags": ["15"],
|
|
"size": 80 * 1024 * 1024,
|
|
},
|
|
{
|
|
"id": "sha256:cccc",
|
|
"repository": "<none>",
|
|
"tags": [],
|
|
"size": 10 * 1024 * 1024,
|
|
},
|
|
]
|
|
}
|
|
|
|
SAMPLE_CONTAINERS = {
|
|
"containers": [
|
|
{"name": "web", "status": "running", "image_id": "sha256:aaaa"},
|
|
{"name": "db", "status": "running", "image_id": "sha256:bbbb"},
|
|
{"name": "old", "status": "stopped", "image_id": "sha256:aaaa"},
|
|
]
|
|
}
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# system_df
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_df_shows_image_stats():
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_df"]()
|
|
assert "Images" in result
|
|
assert "3" in result # total images
|
|
assert "Containers" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_df_reclaimable():
|
|
"""Unused images (not referenced by any container) are counted as reclaimable."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_df"]()
|
|
# sha256:cccc (<none>) is not referenced by any container → reclaimable
|
|
assert "reclaimable" in result
|
|
assert "unused" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_df_running_vs_stopped():
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_df"]()
|
|
assert "running" in result
|
|
assert "stopped" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_df_image_api_error_graceful():
|
|
"""Container data is still shown even when image API fails."""
|
|
from mcp_synology_container.dsm_client import SynologyError
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
raise SynologyError("image API down", code=102)
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_df"]()
|
|
# Should still show container section and a warning
|
|
assert "Containers" in result or "Warnings" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_df_no_images():
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return {"images": []}
|
|
if api == "SYNO.Docker.Container":
|
|
return {"containers": []}
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_df"]()
|
|
assert "0" in result
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# system_prune
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_preview():
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"]()
|
|
assert "preview" in result.lower()
|
|
assert "confirmed=True" in result
|
|
# prune API must NOT be called
|
|
calls = [c.args[:2] for c in client.request.call_args_list]
|
|
assert ("SYNO.Docker.Utils", "prune") not in calls
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_preview_lists_dangling():
|
|
"""Preview names dangling/unused images and stopped containers."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"]()
|
|
# <none>:<none> is dangling
|
|
assert "<none>" in result
|
|
# "old" container is stopped
|
|
assert "old" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_confirmed():
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Utils" and method == "prune":
|
|
return {"SpaceReclaimed": 10 * 1024 * 1024}
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"](confirmed=True)
|
|
assert "completed" in result.lower()
|
|
assert "MiB" in result or "reclaimed" in result.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_confirmed_no_space_reported():
|
|
"""Prune works even when DSM doesn't report reclaimed bytes."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Utils" and method == "prune":
|
|
return {}
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"](confirmed=True)
|
|
assert "completed" in result.lower()
|
|
assert "not reported" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_api_error():
|
|
from mcp_synology_container.dsm_client import SynologyError
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Utils" and method == "prune":
|
|
raise SynologyError("prune failed", code=100)
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"](confirmed=True)
|
|
assert "Error" in result
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# M-6: system_prune preview now counts unused networks
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
SAMPLE_NETWORKS_FOR_PRUNE = {
|
|
"network": [
|
|
# User-created, no containers attached → will be pruned
|
|
{"name": "orphan_net", "driver": "bridge", "containers": []},
|
|
# User-created, in use → must NOT be counted
|
|
{
|
|
"name": "myapp_default",
|
|
"driver": "bridge",
|
|
"containers": ["web", "db"],
|
|
},
|
|
# Built-in networks: Docker never prunes these even if empty
|
|
{"name": "bridge", "driver": "bridge", "containers": []},
|
|
{"name": "host", "driver": "host", "containers": []},
|
|
{"name": "none", "driver": "null", "containers": []},
|
|
# Another user-created empty network
|
|
{"name": "legacy_net", "driver": "bridge", "containers": []},
|
|
]
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_preview_counts_unused_networks() -> None:
|
|
"""M-6: preview must enumerate user-created networks with no containers,
|
|
skipping the three built-in networks (bridge/host/none)."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Network":
|
|
return SAMPLE_NETWORKS_FOR_PRUNE
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"]()
|
|
|
|
# Two unused user-created networks; the three built-ins must not appear.
|
|
assert "Unused networks: 2" in result
|
|
assert "orphan_net" in result
|
|
assert "legacy_net" in result
|
|
# Built-in network names must not appear in the prune preview.
|
|
assert " - bridge " not in result
|
|
assert " - host " not in result
|
|
assert " - none " not in result
|
|
# Network with containers must not be listed.
|
|
assert "myapp_default" not in result
|
|
# Old "not counted" placeholder must be gone.
|
|
assert "not counted" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_prune_preview_network_fetch_failure_is_nonfatal() -> None:
|
|
"""If the network list fetch fails, the preview still works (0 networks)."""
|
|
from mcp_synology_container.dsm_client import SynologyError
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Image":
|
|
return SAMPLE_IMAGES
|
|
if api == "SYNO.Docker.Container":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Network":
|
|
raise SynologyError("network list failed", code=100)
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_prune"]()
|
|
# Preview still renders; networks count falls back to 0.
|
|
assert "preview" in result.lower()
|
|
assert "Unused networks: 0" in result
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# system_overview
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
# Two-container stats snapshot used to verify aggregation arithmetic.
|
|
SAMPLE_STATS_OVERVIEW = {
|
|
"aaa111": {
|
|
"id": "aaa111",
|
|
"name": "/web",
|
|
"cpu_stats": {
|
|
"cpu_usage": {"total_usage": 2_000_000_000, "percpu_usage": [1, 2]},
|
|
"system_cpu_usage": 1_000_000_000_000,
|
|
"online_cpus": 2,
|
|
},
|
|
"precpu_stats": {
|
|
"cpu_usage": {"total_usage": 1_000_000_000},
|
|
"system_cpu_usage": 999_000_000_000,
|
|
},
|
|
"memory_stats": {"usage": 100 * 1024 * 1024, "limit": 1024 * 1024 * 1024},
|
|
"networks": {
|
|
"eth0": {"rx_bytes": 1_000_000, "tx_bytes": 500_000},
|
|
},
|
|
"blkio_stats": {
|
|
"io_service_bytes_recursive": [
|
|
{"op": "Read", "value": 10 * 1024 * 1024},
|
|
{"op": "Write", "value": 5 * 1024 * 1024},
|
|
]
|
|
},
|
|
},
|
|
"bbb222": {
|
|
"id": "bbb222",
|
|
"name": "/db",
|
|
"cpu_stats": {
|
|
"cpu_usage": {"total_usage": 3_000_000_000, "percpu_usage": [1, 2]},
|
|
"system_cpu_usage": 1_000_000_000_000,
|
|
"online_cpus": 2,
|
|
},
|
|
"precpu_stats": {
|
|
"cpu_usage": {"total_usage": 2_000_000_000},
|
|
"system_cpu_usage": 999_000_000_000,
|
|
},
|
|
"memory_stats": {"usage": 200 * 1024 * 1024, "limit": 2 * 1024 * 1024 * 1024},
|
|
"networks": {
|
|
"eth0": {"rx_bytes": 2_000_000, "tx_bytes": 1_000_000},
|
|
},
|
|
"blkio_stats": {
|
|
"io_service_bytes_recursive": [
|
|
{"op": "Read", "value": 20 * 1024 * 1024},
|
|
{"op": "Write", "value": 7 * 1024 * 1024},
|
|
]
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_overview_aggregates_stats():
|
|
"""Sums CPU%, memory, net I/O, and block I/O across all containers in stats."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Container" and method == "list":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Container" and method == "stats":
|
|
return SAMPLE_STATS_OVERVIEW
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_overview"]()
|
|
|
|
# Headline section exists
|
|
assert "Docker System Overview" in result
|
|
assert "Aggregated" in result
|
|
|
|
# CPU per container: (1e9 / 1e9) * 2 * 100 = 200% each → sum ≈ 400%
|
|
assert "400.00%" in result
|
|
|
|
# Memory total usage = 100 MiB + 200 MiB = 300 MiB; limit = 1 GiB + 2 GiB = 3 GiB
|
|
assert "300 MiB" in result
|
|
assert "3.0 GiB" in result
|
|
|
|
# Net I/O present (rx/tx sums non-zero)
|
|
assert "Net I/O" in result
|
|
assert "rx " in result
|
|
assert "tx " in result
|
|
|
|
# Block I/O: read = 30 MiB, write = 12 MiB
|
|
assert "30 MiB" in result
|
|
assert "12 MiB" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_overview_running_vs_stopped_count():
|
|
"""Counts running and stopped containers from the list response."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Container" and method == "list":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Container" and method == "stats":
|
|
return SAMPLE_STATS_OVERVIEW
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_overview"]()
|
|
|
|
# SAMPLE_CONTAINERS has 2 running ("web", "db") and 1 stopped ("old")
|
|
assert "running" in result
|
|
assert "stopped" in result
|
|
assert " 2 running" in result
|
|
assert " 1 stopped" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_overview_no_containers():
|
|
"""Empty container list and empty stats — shows zero counts, no aggregation block."""
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Container" and method == "list":
|
|
return {"containers": []}
|
|
if api == "SYNO.Docker.Container" and method == "stats":
|
|
return {}
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_overview"]()
|
|
assert " 0 running" in result
|
|
assert " 0 stopped" in result
|
|
# No aggregation block when there are no stats entries
|
|
assert "Aggregated" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_overview_container_list_error_graceful():
|
|
"""list fails, stats works → still shows aggregated stats and a warning."""
|
|
from mcp_synology_container.dsm_client import SynologyError
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Container" and method == "list":
|
|
raise SynologyError("list failed", code=102)
|
|
if api == "SYNO.Docker.Container" and method == "stats":
|
|
return SAMPLE_STATS_OVERVIEW
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_overview"]()
|
|
# Counts default to 0 (list failed), but aggregation still runs
|
|
assert "Warnings" in result
|
|
assert "list" in result
|
|
assert "Aggregated" in result
|
|
assert "400.00%" in result # stats still aggregated
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_system_overview_stats_error_graceful():
|
|
"""stats fails, list works → still shows counts and a warning."""
|
|
from mcp_synology_container.dsm_client import SynologyError
|
|
from mcp_synology_container.modules.system import register_system
|
|
|
|
client = AsyncMock()
|
|
|
|
async def mock_request(api, method, **kwargs):
|
|
if api == "SYNO.Docker.Container" and method == "list":
|
|
return SAMPLE_CONTAINERS
|
|
if api == "SYNO.Docker.Container" and method == "stats":
|
|
raise SynologyError("stats failed", code=102)
|
|
return {}
|
|
|
|
client.request.side_effect = mock_request
|
|
mcp, tools = make_mock_mcp()
|
|
register_system(mcp, make_config(), client)
|
|
|
|
result = await tools["system_overview"]()
|
|
# Counts still displayed
|
|
assert " 2 running" in result
|
|
assert " 1 stopped" in result
|
|
# Warning surfaced for the stats failure
|
|
assert "Warnings" in result
|
|
assert "stats" in result
|
|
# No aggregation block — stats unavailable
|
|
assert "Aggregated" not in result
|