Files
mcp-synology-container/src/mcp_synology_container/modules/images.py
T
marcus 6ba4c7ca92 chore: ruff cleanup — fix 7 long-standing lint findings
Mechanical, no behavior change. `ruff check src/ tests/` now passes
with zero findings.

- cli.py:147 (SIM105) — replace `try/except SynologyError/pass` around
  the cleanup logout with `contextlib.suppress(SynologyError)`.
- compose.py:271 (B007) — drop the unused `i` from the env_list
  preview-detection loop (the apply loop below still uses enumerate).
- compose.py:329 (E501) — extract `verb = "Updated" if … else "Added"`
  into a local before the return so the f-string fits in 100 cols.
- images.py:237 (E501) — extract `stopped_name = in_use_stopped[0]`
  before the return and split the message across two f-strings.
- test_auth.py:38, 127, 140 (SIM117) — combine nested `with patch(…):`
  / `with pytest.raises(…):` into single parenthesised with-statements.

236 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 09:15:50 +02:00

324 lines
12 KiB
Python

"""MCP tools for SYNO.Docker.Image: list, check updates, delete."""
from __future__ import annotations
import json
import logging
import sys
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_container.config import AppConfig
from mcp_synology_container.dsm_client import DsmClient
logger = logging.getLogger(__name__)
def _human_size(size_bytes: int) -> str:
"""Convert byte count to human-readable string (GiB / MiB / KiB)."""
if size_bytes >= 1024**3:
return f"{size_bytes / 1024**3:.1f} GiB"
if size_bytes >= 1024**2:
return f"{size_bytes / 1024**2:.0f} MiB"
if size_bytes >= 1024:
return f"{size_bytes / 1024:.0f} KiB"
return f"{size_bytes} B"
def _format_created(ts: int) -> str:
"""Format a Unix timestamp as a UTC date string."""
if not ts:
return "unknown"
try:
return datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d")
except (OSError, OverflowError, ValueError):
return "unknown"
async def _filter_images_for_project(
client: DsmClient,
project_name: str,
all_images: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Filter images to those used by a specific project.
Fetches project details and cross-references container image IDs.
Falls back to returning all images if project details are unavailable.
Args:
client: DsmClient instance.
project_name: Project name to filter for.
all_images: Full list of images from SYNO.Docker.Image.
Returns:
Subset of images used by the project.
"""
try:
list_data = await client.request("SYNO.Docker.Project", "list")
projects: dict[str, Any] = list_data if isinstance(list_data, dict) else {}
project_entry = next((p for p in projects.values() if p.get("name") == project_name), None)
if not project_entry:
return []
project_id = project_entry.get("id", "")
detail_data = await client.request(
"SYNO.Docker.Project",
"get",
params={"id": project_id},
)
containers = detail_data.get("containers", []) or []
image_ids: set[str] = set()
image_names: set[str] = set()
for container in containers:
img_id = container.get("Image", "")
if img_id:
image_ids.add(img_id)
cfg_image = container.get("Config", {}).get("Image", "")
if cfg_image:
name = cfg_image.split(":")[0] if ":" in cfg_image else cfg_image
image_names.add(name)
result = []
for img in all_images:
img_id = img.get("id", "")
repo = img.get("repository", "")
if img_id in image_ids or repo in image_names:
result.append(img)
return result
except Exception as e:
logger.debug("Could not filter images for project '%s': %s", project_name, e)
return all_images
def register_images(mcp: FastMCP, config: AppConfig, client: DsmClient) -> None:
"""Register all image management tools with the MCP server."""
@mcp.tool()
async def list_images():
"""List local Docker images sorted by size, showing tag, date, and in-use status."""
try:
img_data = await client.request(
"SYNO.Docker.Image",
"list",
params={"limit": "-1", "offset": "0", "show_dsm": "false"},
)
except Exception as e:
return f"Error listing images: {e}"
images: list[dict[str, Any]] = img_data.get("images", [])
if not images:
return "No local images found."
# Collect image IDs in use by containers
in_use_ids: set[str] = set()
try:
ctr_data = await client.request(
"SYNO.Docker.Container",
"list",
params={"limit": "-1", "offset": "0", "type": "all"},
)
for ctr in ctr_data.get("containers", []):
img_id = ctr.get("image_id") or ctr.get("ImageID") or ctr.get("Image", "")
if img_id:
in_use_ids.add(img_id)
except Exception as e:
logger.debug("Could not fetch containers for in-use check: %s", e)
# Sort by size descending
images_sorted = sorted(images, key=lambda x: x.get("size", 0), reverse=True)
total_size = sum(img.get("size", 0) for img in images)
lines = [f"Local images ({len(images)} total, {_human_size(total_size)}):", ""]
for img in images_sorted:
repo = img.get("repository", "<none>")
tags = img.get("tags") or ["<none>"]
tag_str = ", ".join(tags)
size = _human_size(img.get("size", 0))
created = _format_created(img.get("created", 0))
img_id = img.get("id", "")
used_marker = " [in use]" if img_id in in_use_ids else ""
upgradable = " [update available]" if img.get("upgradable") else ""
lines.append(f" {repo}:{tag_str} {size} {created}{used_marker}{upgradable}")
return "\n".join(lines)
@mcp.tool()
async def delete_image(image_id: str, confirmed: bool = False):
"""Delete a local image by name:tag or hash. Requires confirmed=True; refuses if in use."""
# Parse name and tag using the last ":" as separator so that
# registry-prefixed images (e.g. "ghcr.io/foo/bar:v1") are handled
# correctly. rpartition returns ("", "", original) when ":" is absent.
name, sep, tag = image_id.rpartition(":")
if not sep:
# No ":" found — bare name without explicit tag
name = image_id
tag = "latest"
# Fetch the local image list for size reporting and in-use detection
try:
img_data = await client.request(
"SYNO.Docker.Image",
"list",
params={"limit": "-1", "offset": "0", "show_dsm": "false"},
)
except Exception as e:
return f"Error fetching image list: {e}"
images: list[dict[str, Any]] = img_data.get("images", [])
# Locate the target image by name+tag or hash prefix
is_hash = image_id.startswith("sha256:") or (len(image_id) >= 12 and ":" not in image_id)
target: dict[str, Any] | None = None
for img in images:
if is_hash:
img_hash = img.get("id", "")
if img_hash == image_id or img_hash.startswith(image_id):
target = img
break
else:
repo = img.get("repository", "")
img_tags = img.get("tags") or []
if repo == name and tag in img_tags:
target = img
break
if target is None:
return f"Image '{image_id}' not found locally."
# Resolve display info from the found image
repo = target.get("repository", name)
img_tags = target.get("tags") or [tag]
display_name = f"{repo}:{img_tags[0]}"
size_str = _human_size(target.get("size", 0))
img_hash = target.get("id", "")
# Check if image is in use by any container
in_use_running: list[str] = []
in_use_stopped: list[str] = []
try:
ctr_data = await client.request(
"SYNO.Docker.Container",
"list",
params={"limit": "-1", "offset": "0", "type": "all"},
)
for ctr in ctr_data.get("containers", []):
ctr_img_id = ctr.get("image_id") or ctr.get("ImageID") or ctr.get("Image", "")
hash_prefix = img_hash[:12] if img_hash else ""
if img_hash and (
ctr_img_id == img_hash or (hash_prefix and ctr_img_id.startswith(hash_prefix))
):
ctr_name = ctr.get("name") or ctr.get("Names", ["?"])[0]
status = ctr.get("status", ctr.get("state", "")).lower()
if status == "running":
in_use_running.append(ctr_name)
else:
in_use_stopped.append(ctr_name)
except Exception as e:
logger.debug("Could not fetch containers for in-use check: %s", e)
if in_use_running:
return (
f"Cannot delete '{display_name}': image is used by running container(s): "
+ ", ".join(in_use_running)
)
if in_use_stopped:
stopped_name = in_use_stopped[0]
return (
f"Cannot delete '{display_name}': image is used by stopped container "
f"'{stopped_name}'.\n"
f"Delete the container first or run system_prune to clean up stopped containers."
)
if not confirmed:
return (
f"Preview: would delete {display_name} ({size_str}).\n"
f"Call delete_image(image_id={image_id!r}, confirmed=True) to confirm."
)
# DSM Container Manager expects a POST with version=1 and an
# "images" JSON array — confirmed via browser DevTools capture.
# Format: images=[{"repository": "nginx", "tags": ["1.24"]}]
delete_repo = repo
delete_tag = img_tags[0] if img_tags else tag
images_param = json.dumps([{"repository": delete_repo, "tags": [delete_tag]}])
sys.stderr.write(
f"[delete_image] POST SYNO.Docker.Image/delete v1 images={images_param!r}\n"
)
sys.stderr.flush()
try:
await client.post_request(
"SYNO.Docker.Image",
"delete",
version=1,
params={"images": images_param},
)
except Exception as e:
code = getattr(e, "code", "?")
sys.stderr.write(f"[delete_image] failed: {e} (DSM code {code})\n")
sys.stderr.flush()
return f"Error deleting '{display_name}': {e} [DSM code {code}]"
return f"Deleted {display_name}{size_str} freed."
@mcp.tool()
async def check_image_updates(project_name: str | None = None):
"""Check which local images have updates available (upgradable flag from NAS registry)."""
try:
data = await client.request(
"SYNO.Docker.Image",
"list",
params={"limit": "-1", "offset": "0", "show_dsm": "false"},
)
except Exception as e:
return f"Error listing images: {e}"
images: list[dict[str, Any]] = data.get("images", [])
if not images:
return "No images found."
if project_name:
images = await _filter_images_for_project(client, project_name, images)
if not images:
return f"No images found for project '{project_name}'."
header = f"Image update status for project '{project_name}':\n"
else:
header = f"Image update status (all {len(images)} images):\n"
upgradable = [img for img in images if img.get("upgradable")]
up_to_date = [img for img in images if not img.get("upgradable")]
lines = [header]
if upgradable:
lines.append(f"Updates available ({len(upgradable)}):")
for img in sorted(upgradable, key=lambda x: x.get("repository", "")):
repo = img.get("repository", "?")
tags = ", ".join(img.get("tags", []))
size_mb = img.get("size", 0) // (1024 * 1024)
lines.append(f" {repo}:{tags} ({size_mb} MiB) ← UPDATE AVAILABLE")
lines.append("")
if up_to_date:
lines.append(f"Up to date ({len(up_to_date)}):")
for img in sorted(up_to_date, key=lambda x: x.get("repository", "")):
repo = img.get("repository", "?")
tags = ", ".join(img.get("tags", []))
size_mb = img.get("size", 0) // (1024 * 1024)
lines.append(f" {repo}:{tags} ({size_mb} MiB)")
if not upgradable:
lines.append("All images are up to date.")
return "\n".join(lines)