"""MCP tools for SYNO.Docker.Container: list, status, logs, exec.""" from __future__ import annotations import json import logging import re from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP from mcp_synology_container.config import AppConfig from mcp_synology_container.dsm_client import DsmClient logger = logging.getLogger(__name__) # Matches DSM hash-prefixed container names like "f93cb8b504f7_jenkins" _HASH_PREFIX_RE = re.compile(r"^[a-f0-9]{12}_(.+)$") def _strip_hash_prefix(name: str) -> str: """Strip 12-char hex hash prefix from container names. DSM sometimes returns names like 'f93cb8b504f7_jenkins' when the service name in compose.yaml differs from container_name. Returns the clean name (also strips a leading slash if present). """ clean = name.lstrip("/") match = _HASH_PREFIX_RE.match(clean) return match.group(1) if match else clean async def _resolve_container_name(client: DsmClient, user_name: str) -> str: """Resolve a user-supplied name to the actual DSM container name. Needed because DSM may store the container as 'f93cb8b504f7_jenkins' while the user passes 'jenkins'. Falls back to user_name unchanged if the list cannot be fetched or no match is found. Args: client: DsmClient instance. user_name: Name as provided by the user (may or may not have prefix). Returns: Actual container name as registered in DSM. """ clean_user = _strip_hash_prefix(user_name) try: data = await client.request( "SYNO.Docker.Container", "list", params={"limit": "-1", "offset": "0", "type": "all"}, ) for c in data.get("containers", []): actual = c.get("name", "") if actual == user_name or _strip_hash_prefix(actual) == clean_user: return actual except Exception: pass return user_name def register_containers(mcp: FastMCP, config: AppConfig, client: DsmClient) -> None: """Register all container management tools with the MCP server.""" @mcp.tool() async def list_containers(project_name: str | None = None): """List all containers, optionally filtered by project name.""" try: data = await client.request( "SYNO.Docker.Container", "list", params={"limit": "-1", "offset": "0", "type": "all"}, ) except Exception as e: return f"Error listing containers: {e}" containers: list[dict[str, Any]] = data.get("containers", []) if not containers: return "No containers found." # Filter by project if specified if project_name: containers = [ c for c in containers if c.get("project_name") == project_name or _container_in_project(c, project_name) ] if not containers: return f"No containers found for project '{project_name}'." lines = [f"Containers ({len(containers)} total):", ""] for container in sorted(containers, key=lambda c: c.get("name", "")): name = _strip_hash_prefix(container.get("name", "?")) state = container.get("status", container.get("state", "?")) image = container.get("image", "?") lines.append(f" {name}") lines.append(f" Status: {state}") lines.append(f" Image: {image}") lines.append("") return "\n".join(lines).rstrip() @mcp.tool() async def get_container_status(container_name: str): """Get detailed status, uptime, ports, and mounts for a container.""" # SYNO.Docker.Container/get accepts the clean name (no hash prefix). clean_name = _strip_hash_prefix(container_name) try: data = await client.request( "SYNO.Docker.Container", "get", params={"name": clean_name}, ) except Exception as e: return f"Error getting container '{container_name}': {e}" if not data: return f"Container '{container_name}' not found." return _format_container_detail(clean_name, data) @mcp.tool() async def get_container_logs( container_name: str, tail: int = 100, keyword: str | None = None, ): """Get recent log output from a container, with optional keyword filter.""" resolved_name = await _resolve_container_name(client, container_name) params: dict[str, Any] = { "name": resolved_name, "limit": tail, "offset": 0, "sort_dir": "DESC", } if keyword: params["keyword"] = keyword try: data = await client.request( "SYNO.Docker.Container.Log", "get", params=params, ) except Exception as e: return f"Error getting logs for '{container_name}': {e}" logs: list[dict[str, Any]] = data.get("logs", []) if not logs: return f"No logs found for container '{container_name}'." total = data.get("total", len(logs)) display_name = _strip_hash_prefix(container_name) header = f"Logs for {display_name} (showing {len(logs)} of {total}):\n" # Logs are returned in DESC order, reverse for chronological display lines = [] for entry in reversed(logs): timestamp = entry.get("created", "") stream = entry.get("stream", "") text = entry.get("text", "") stream_tag = f"[{stream}] " if stream else "" lines.append(f"{timestamp} {stream_tag}{text}") return header + "\n".join(lines) @mcp.tool() async def container_stats(container_name: str): """Get live CPU, memory, network, and block I/O stats for a container.""" try: data = await client.request("SYNO.Docker.Container", "stats") except Exception as e: return f"Error fetching container stats: {e}" if not data: return "No stats data returned." # Response is a dict keyed by container ID hash; each entry has "name" # with a leading slash (e.g. "/jenkins") and may have a hash prefix. clean_query = _strip_hash_prefix(container_name) target: dict[str, Any] | None = None for entry in data.values(): entry_name = _strip_hash_prefix(entry.get("name", "")) if entry_name == clean_query: target = entry break if target is None: available = ", ".join(_strip_hash_prefix(v.get("name", "?")) for v in data.values()) return f"Container '{container_name}' not found in stats. Available: {available}" # ── CPU % ──────────────────────────────────────────────────────────── cpu_stats = target.get("cpu_stats", {}) precpu_stats = target.get("precpu_stats", {}) cpu_usage = cpu_stats.get("cpu_usage", {}) precpu_usage = precpu_stats.get("cpu_usage", {}) cpu_delta = cpu_usage.get("total_usage", 0) - precpu_usage.get("total_usage", 0) system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get( "system_cpu_usage", 0 ) online_cpus = cpu_stats.get("online_cpus") or len(cpu_usage.get("percpu_usage") or [1]) if system_delta > 0 and cpu_delta >= 0: cpu_pct = (cpu_delta / system_delta) * online_cpus * 100.0 else: cpu_pct = 0.0 # ── Memory ─────────────────────────────────────────────────────────── mem_stats = target.get("memory_stats", {}) mem_usage = mem_stats.get("usage", 0) mem_limit = mem_stats.get("limit", 0) # ── Network I/O ────────────────────────────────────────────────────── net_rx = 0 net_tx = 0 for iface in (target.get("networks") or {}).values(): net_rx += iface.get("rx_bytes", 0) net_tx += iface.get("tx_bytes", 0) # ── Block I/O ──────────────────────────────────────────────────────── blk_read = 0 blk_write = 0 for entry_io in target.get("blkio_stats", {}).get("io_service_bytes_recursive") or []: op = entry_io.get("op", "").lower() val = entry_io.get("value", 0) if op == "read": blk_read += val elif op == "write": blk_write += val # ── Format ─────────────────────────────────────────────────────────── from mcp_synology_container.modules.images import _human_size # reuse helper mem_limit_str = f" / {_human_size(mem_limit)}" if mem_limit else "" lines = [ f"Stats for {container_name}:", f" CPU: {cpu_pct:.2f}% ({online_cpus} CPUs)", f" Memory: {_human_size(mem_usage)}{mem_limit_str}", f" Net I/O: rx {_human_size(net_rx)} / tx {_human_size(net_tx)}", f" Block I/O: read {_human_size(blk_read)} / write {_human_size(blk_write)}", ] return "\n".join(lines) @mcp.tool() async def exec_in_container( container_name: str, command: str, confirmed: bool = False, ): """Execute a shell command inside a running container. Requires confirmed=True.""" if not confirmed: return ( f"About to run in container '{container_name}':\n" f" $ {command}\n\n" f"Call this tool again with confirmed=True to proceed." ) resolved_name = await _resolve_container_name(client, container_name) try: data = await client.request( "SYNO.Docker.Container", "exec", params={ "name": resolved_name, "command": command, }, ) except Exception as e: return f"Error executing command in '{container_name}': {e}" output = data.get("output", "") exit_code = data.get("exit_code", 0) result_lines = [f"Command executed in '{container_name}':"] result_lines.append(f" Exit code: {exit_code}") if output: result_lines.append(" Output:") result_lines.append(output) return "\n".join(result_lines) @mcp.tool() async def delete_container(container_name: str, confirmed: bool = False): """Delete a stopped container. Requires confirmed=True.""" if not confirmed: return ( f"Preview: would delete container '{container_name}'.\n" f"Call this tool again with confirmed=True to proceed." ) resolved_name = await _resolve_container_name(client, container_name) display_name = _strip_hash_prefix(resolved_name) try: data = await client.request( "SYNO.Docker.Container", "get", params={"name": resolved_name}, ) except Exception as e: return f"Error inspecting container '{container_name}': {e}" if not data: return f"Container '{container_name}' not found." details = data.get("details", {}) or {} state = details.get("State", {}) or {} running = state.get("Running", False) if running: return ( f"Cannot delete '{display_name}': container is still running.\n" f"Stop the container first with stop_project or stop_container." ) try: await client.request( "SYNO.Docker.Container", "delete", params={ "name": json.dumps(resolved_name), "force": "false", "preserve_profile": "false", }, ) return f"Deleted container '{display_name}'." except Exception as e: return f"Error deleting '{display_name}': {e}" def _container_in_project(container: dict[str, Any], project_name: str) -> bool: """Check if a container belongs to a project based on its labels.""" labels = container.get("labels", {}) or {} if isinstance(labels, dict): return labels.get("com.docker.compose.project") == project_name return False def _format_container_detail(name: str, data: dict[str, Any]) -> str: """Format container inspect data as human-readable text. SYNO.Docker.Container/get returns two top-level keys: - "details": Docker Engine inspect format (State, NetworkSettings, Mounts, …) - "profile": DSM format (image, port_bindings, …) """ details: dict[str, Any] = data.get("details", {}) or {} profile: dict[str, Any] = data.get("profile", {}) or {} state: dict[str, Any] = details.get("State", {}) or {} status_str = state.get("Status", "?") running = state.get("Running", False) image_str = profile.get("image", "?") lines = [ f"Container: {name}", f" Status: {status_str}", f" Running: {running}", f" Image: {image_str}", ] if state.get("StartedAt"): lines.append(f" Started: {state['StartedAt']}") if state.get("FinishedAt") and not running: lines.append(f" Finished: {state['FinishedAt']}") lines.append(f" Exit code: {state.get('ExitCode', '?')}") # IP addresses from all attached networks networks: dict[str, Any] = (details.get("NetworkSettings", {}) or {}).get("Networks", {}) or {} for net_name, net in networks.items(): ip = (net or {}).get("IPAddress", "") if ip: lines.append(f" IP ({net_name}): {ip}") # Port bindings from DSM profile port_bindings: list[dict[str, Any]] = profile.get("port_bindings", []) or [] if port_bindings: lines.append(" Ports:") for pb in port_bindings: host = pb.get("host_port", "?") ctr = pb.get("container_port", "?") proto = pb.get("type", "tcp") lines.append(f" {host} → {ctr}/{proto}") # Mounts from Docker Engine inspect mounts: list[dict[str, Any]] = details.get("Mounts", []) or [] if mounts: lines.append(" Mounts:") for m in mounts: src = m.get("Source", "?") dst = m.get("Destination", "?") mtype = m.get("Type", "") rw = "" if m.get("RW", True) else " [ro]" type_tag = f" ({mtype})" if mtype else "" lines.append(f" {src} → {dst}{type_tag}{rw}") return "\n".join(lines)