Initial implementation

This commit is contained in:
2026-04-13 14:22:37 +02:00
commit a0c1b6ed93
26 changed files with 4125 additions and 0 deletions
@@ -0,0 +1 @@
"""MCP tool modules for Synology Docker API."""
@@ -0,0 +1,324 @@
"""MCP tools for reading and writing compose files via FileStation API.
Compose files are read/written via SYNO.FileStation.Download / Upload.
Supported filenames: docker-compose.yml, docker-compose.yaml, compose.yml, compose.yaml
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
import yaml
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_container.config import AppConfig
from mcp_synology_container.dsm_client import DsmClient
logger = logging.getLogger(__name__)
# Recognized compose file names (in priority order)
_COMPOSE_FILENAMES = [
"docker-compose.yml",
"docker-compose.yaml",
"compose.yml",
"compose.yaml",
]
def register_compose(mcp: FastMCP, config: AppConfig, client: DsmClient) -> None:
"""Register all compose file management tools with the MCP server."""
@mcp.tool()
async def read_compose(project_name: str) -> str:
"""Read the compose file of a project.
Args:
project_name: Name of the Container Manager project.
Returns:
The compose file content as YAML text.
"""
path = await _find_compose_path(client, config, project_name)
if path is None:
return (
f"No compose file found for project '{project_name}'.\n"
f"Looked in {config.compose_base_path}/{project_name}/ for: "
+ ", ".join(_COMPOSE_FILENAMES)
)
try:
content = await client.download_text(path)
except Exception as e:
return f"Error reading compose file '{path}': {e}"
return f"Compose file: {path}\n\n{content}"
@mcp.tool()
async def update_image_tag(
project_name: str,
service_name: str,
new_tag: str,
confirmed: bool = False,
) -> str:
"""Update the image tag of a service in the compose file.
After confirming, suggests running redeploy_project.
Args:
project_name: Name of the Container Manager project.
service_name: Name of the service within the compose file.
new_tag: New image tag (e.g. "latest", "1.2.3").
confirmed: Must be True to proceed. Set to True to confirm the change.
"""
path = await _find_compose_path(client, config, project_name)
if path is None:
return f"No compose file found for project '{project_name}'."
try:
content = await client.download_text(path)
except Exception as e:
return f"Error reading compose file: {e}"
try:
compose = yaml.safe_load(content)
except yaml.YAMLError as e:
return f"Error parsing compose file: {e}"
services = compose.get("services", {}) or {}
if service_name not in services:
available = ", ".join(sorted(services.keys()))
return f"Service '{service_name}' not found. Available services: {available}"
current_image: str = services[service_name].get("image", "")
if not current_image:
return f"Service '{service_name}' has no 'image' field."
# Parse current image into name and tag
if ":" in current_image:
image_name, current_tag = current_image.rsplit(":", 1)
else:
image_name = current_image
current_tag = "latest"
new_image = f"{image_name}:{new_tag}"
if not confirmed:
return (
f"About to update service '{service_name}' in project '{project_name}':\n"
f" Before: {current_image}\n"
f" After: {new_image}\n\n"
f"Call this tool again with confirmed=True to apply the change."
)
services[service_name]["image"] = new_image
new_content = yaml.dump(compose, default_flow_style=False, sort_keys=False, allow_unicode=True)
folder_path = path.rsplit("/", 1)[0]
filename = path.rsplit("/", 1)[1]
try:
await client.upload_text(folder_path, filename, new_content)
except Exception as e:
return f"Error writing compose file: {e}"
return (
f"Updated '{service_name}' image in '{project_name}':\n"
f" {current_image}{new_image}\n\n"
f"Tip: Run redeploy_project('{project_name}', confirmed=True) to apply the change."
)
@mcp.tool()
async def update_env_var(
project_name: str,
service_name: str,
var_name: str,
var_value: str,
confirmed: bool = False,
) -> str:
"""Add or update an environment variable in a service's compose definition.
After confirming, suggests running redeploy_project.
Args:
project_name: Name of the Container Manager project.
service_name: Name of the service within the compose file.
var_name: Environment variable name.
var_value: New value for the variable.
confirmed: Must be True to proceed. Set to True to confirm the change.
"""
path = await _find_compose_path(client, config, project_name)
if path is None:
return f"No compose file found for project '{project_name}'."
try:
content = await client.download_text(path)
except Exception as e:
return f"Error reading compose file: {e}"
try:
compose = yaml.safe_load(content)
except yaml.YAMLError as e:
return f"Error parsing compose file: {e}"
services = compose.get("services", {}) or {}
if service_name not in services:
available = ", ".join(sorted(services.keys()))
return f"Service '{service_name}' not found. Available services: {available}"
service = services[service_name]
env_list = service.get("environment") or []
# Determine previous value and build description
old_value: str | None = None
if isinstance(env_list, list):
for i, entry in enumerate(env_list):
if isinstance(entry, str) and entry.startswith(f"{var_name}="):
old_value = entry.split("=", 1)[1]
break
elif entry == var_name:
old_value = "(no value)"
break
elif isinstance(env_list, dict):
old_value = str(env_list.get(var_name)) if var_name in env_list else None
action = "update" if old_value is not None else "add"
if not confirmed:
if old_value is not None:
return (
f"About to update environment variable in '{service_name}' ({project_name}):\n"
f" {var_name}={old_value}{var_name}={var_value}\n\n"
f"Call this tool again with confirmed=True to apply the change."
)
else:
return (
f"About to add environment variable to '{service_name}' ({project_name}):\n"
f" {var_name}={var_value}\n\n"
f"Call this tool again with confirmed=True to apply the change."
)
# Apply the change
if isinstance(env_list, list):
new_entry = f"{var_name}={var_value}"
updated = False
for i, entry in enumerate(env_list):
if isinstance(entry, str) and entry.startswith(f"{var_name}="):
env_list[i] = new_entry
updated = True
break
elif entry == var_name:
env_list[i] = new_entry
updated = True
break
if not updated:
env_list.append(new_entry)
service["environment"] = env_list
elif isinstance(env_list, dict):
env_list[var_name] = var_value
service["environment"] = env_list
else:
service["environment"] = [f"{var_name}={var_value}"]
new_content = yaml.dump(compose, default_flow_style=False, sort_keys=False, allow_unicode=True)
folder_path = path.rsplit("/", 1)[0]
filename = path.rsplit("/", 1)[1]
try:
await client.upload_text(folder_path, filename, new_content)
except Exception as e:
return f"Error writing compose file: {e}"
return (
f"{'Updated' if action == 'update' else 'Added'} env var in '{service_name}' ({project_name}):\n"
f" {var_name}={var_value}\n\n"
f"Tip: Run redeploy_project('{project_name}', confirmed=True) to apply the change."
)
@mcp.tool()
async def update_compose(
project_name: str,
new_content: str,
confirmed: bool = False,
) -> str:
"""Replace the entire compose file with new content.
Validates that the content is valid YAML before writing.
After confirming, suggests running redeploy_project.
Args:
project_name: Name of the Container Manager project.
new_content: Complete new content for the compose file (must be valid YAML).
confirmed: Must be True to proceed. Set to True to confirm the overwrite.
"""
# Validate YAML before anything else
try:
parsed = yaml.safe_load(new_content)
except yaml.YAMLError as e:
return f"Invalid YAML content: {e}"
if not isinstance(parsed, dict) or "services" not in parsed:
return "Invalid compose file: must be a YAML document with a 'services' key."
path = await _find_compose_path(client, config, project_name)
if path is None:
# Default to docker-compose.yml if no existing file found
path = f"{config.compose_base_path}/{project_name}/docker-compose.yml"
service_count = len(parsed.get("services", {}))
if not confirmed:
return (
f"About to overwrite compose file for project '{project_name}':\n"
f" Path: {path}\n"
f" Services defined: {service_count}\n\n"
f"Call this tool again with confirmed=True to apply the change."
)
folder_path = path.rsplit("/", 1)[0]
filename = path.rsplit("/", 1)[1]
try:
await client.upload_text(folder_path, filename, new_content)
except Exception as e:
return f"Error writing compose file: {e}"
return (
f"Compose file updated for project '{project_name}'.\n"
f" Path: {path}\n"
f" Services: {service_count}\n\n"
f"Tip: Run redeploy_project('{project_name}', confirmed=True) to apply the change."
)
async def _find_compose_path(
client: DsmClient, config: AppConfig, project_name: str
) -> str | None:
"""Find the compose file path for a project.
Tries each recognized filename under {compose_base_path}/{project_name}/.
Args:
client: DsmClient instance.
config: AppConfig with compose_base_path.
project_name: Project name.
Returns:
Full path to the compose file if found, None otherwise.
"""
base = f"{config.compose_base_path}/{project_name}"
for filename in _COMPOSE_FILENAMES:
path = f"{base}/{filename}"
try:
# Try to list the file; if it exists, return the path
await client.request(
"SYNO.FileStation.Info",
"get",
params={"path": path, "additional": "[]"},
)
logger.debug("Found compose file: %s", path)
return path
except Exception:
continue
return None
@@ -0,0 +1,215 @@
"""MCP tools for SYNO.Docker.Container: list, status, logs, exec."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_container.config import AppConfig
from mcp_synology_container.dsm_client import DsmClient
logger = logging.getLogger(__name__)
def register_containers(mcp: FastMCP, config: AppConfig, client: DsmClient) -> None:
"""Register all container management tools with the MCP server."""
@mcp.tool()
async def list_containers(project_name: str | None = None) -> str:
"""List containers, optionally filtered by project name.
Args:
project_name: Optional project name to filter containers.
If omitted, lists all containers.
"""
try:
data = await client.request(
"SYNO.Docker.Container",
"list",
params={"limit": "-1", "offset": "0", "type": "all"},
)
except Exception as e:
return f"Error listing containers: {e}"
containers: list[dict[str, Any]] = data.get("containers", [])
if not containers:
return "No containers found."
# Filter by project if specified
if project_name:
containers = [
c for c in containers
if c.get("project_name") == project_name
or _container_in_project(c, project_name)
]
if not containers:
return f"No containers found for project '{project_name}'."
lines = [f"Containers ({len(containers)} total):", ""]
for container in sorted(containers, key=lambda c: c.get("name", "")):
name = container.get("name", "?")
state = container.get("status", container.get("state", "?"))
image = container.get("image", "?")
lines.append(f" {name}")
lines.append(f" Status: {state}")
lines.append(f" Image: {image}")
lines.append("")
return "\n".join(lines).rstrip()
@mcp.tool()
async def get_container_status(container_name: str) -> str:
"""Get detailed status, uptime, and resource usage of a container.
Args:
container_name: Name of the container to inspect.
"""
try:
data = await client.request(
"SYNO.Docker.Container",
"get",
params={"name": container_name},
)
except Exception as e:
return f"Error getting container '{container_name}': {e}"
if not data:
return f"Container '{container_name}' not found."
return _format_container_detail(container_name, data)
@mcp.tool()
async def get_container_logs(
container_name: str,
tail: int = 100,
keyword: str | None = None,
) -> str:
"""Get log output from a container.
Args:
container_name: Name of the container.
tail: Number of recent log lines to return (default 100).
keyword: Optional keyword to filter log lines.
"""
params: dict[str, Any] = {
"name": container_name,
"limit": tail,
"offset": 0,
"sort_dir": "DESC",
}
if keyword:
params["keyword"] = keyword
try:
data = await client.request(
"SYNO.Docker.Container.Log",
"get",
params=params,
)
except Exception as e:
return f"Error getting logs for '{container_name}': {e}"
logs: list[dict[str, Any]] = data.get("logs", [])
if not logs:
return f"No logs found for container '{container_name}'."
total = data.get("total", len(logs))
header = f"Logs for {container_name} (showing {len(logs)} of {total}):\n"
# Logs are returned in DESC order, reverse for chronological display
lines = []
for entry in reversed(logs):
timestamp = entry.get("created", "")
stream = entry.get("stream", "")
text = entry.get("text", "")
stream_tag = f"[{stream}] " if stream else ""
lines.append(f"{timestamp} {stream_tag}{text}")
return header + "\n".join(lines)
@mcp.tool()
async def exec_in_container(
container_name: str,
command: str,
confirmed: bool = False,
) -> str:
"""Execute a command in a running container.
This executes a shell command inside the container. Use with caution.
Requires confirmation before executing.
Args:
container_name: Name of the container.
command: Shell command to execute.
confirmed: Must be True to proceed. Set to True to confirm execution.
"""
if not confirmed:
return (
f"About to run in container '{container_name}':\n"
f" $ {command}\n\n"
f"Call this tool again with confirmed=True to proceed."
)
try:
data = await client.request(
"SYNO.Docker.Container",
"exec",
params={
"name": container_name,
"command": command,
},
)
except Exception as e:
return f"Error executing command in '{container_name}': {e}"
output = data.get("output", "")
exit_code = data.get("exit_code", 0)
result_lines = [f"Command executed in '{container_name}':"]
result_lines.append(f" Exit code: {exit_code}")
if output:
result_lines.append(" Output:")
result_lines.append(output)
return "\n".join(result_lines)
def _container_in_project(container: dict[str, Any], project_name: str) -> bool:
"""Check if a container belongs to a project based on its labels."""
labels = container.get("labels", {}) or {}
if isinstance(labels, dict):
return labels.get("com.docker.compose.project") == project_name
return False
def _format_container_detail(name: str, data: dict[str, Any]) -> str:
"""Format container inspect data as human-readable text."""
state = data.get("State", {}) or {}
config = data.get("Config", {}) or {}
host_config = data.get("HostConfig", {}) or {}
lines = [
f"Container: {name}",
f" Status: {state.get('Status', '?')}",
f" Running: {state.get('Running', False)}",
f" Image: {config.get('Image', '?')}",
]
if state.get("StartedAt"):
lines.append(f" Started: {state.get('StartedAt')}")
if state.get("FinishedAt") and not state.get("Running"):
lines.append(f" Finished: {state.get('FinishedAt')}")
lines.append(f" Exit code: {state.get('ExitCode', '?')}")
memory = host_config.get("Memory", 0)
if memory:
mb = memory // (1024 * 1024)
lines.append(f" Memory limit: {mb} MiB")
env = config.get("Env", []) or []
if env:
lines.append(f" Env vars: {len(env)}")
return "\n".join(lines)
@@ -0,0 +1,145 @@
"""MCP tools for SYNO.Docker.Image: list and check for updates."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_container.config import AppConfig
from mcp_synology_container.dsm_client import DsmClient
logger = logging.getLogger(__name__)
def register_images(mcp: FastMCP, config: AppConfig, client: DsmClient) -> None:
"""Register all image management tools with the MCP server."""
@mcp.tool()
async def check_image_updates(project_name: str | None = None) -> str:
"""Check for available image updates for a project or all images.
Queries the local image list and reports which images have the
'upgradable' flag set by the NAS registry check.
Args:
project_name: Optional project name to filter images.
If omitted, checks all locally available images.
"""
try:
data = await client.request(
"SYNO.Docker.Image",
"list",
params={"limit": "-1", "offset": "0", "show_dsm": "false"},
)
except Exception as e:
return f"Error listing images: {e}"
images: list[dict[str, Any]] = data.get("images", [])
if not images:
return "No images found."
# If project_name given, cross-reference with project containers
if project_name:
images = await _filter_images_for_project(client, project_name, images)
if not images:
return f"No images found for project '{project_name}'."
header = f"Image update status for project '{project_name}':\n"
else:
header = f"Image update status (all {len(images)} images):\n"
upgradable = [img for img in images if img.get("upgradable")]
up_to_date = [img for img in images if not img.get("upgradable")]
lines = [header]
if upgradable:
lines.append(f"Updates available ({len(upgradable)}):")
for img in sorted(upgradable, key=lambda x: x.get("repository", "")):
repo = img.get("repository", "?")
tags = ", ".join(img.get("tags", []))
size_mb = img.get("size", 0) // (1024 * 1024)
lines.append(f" {repo}:{tags} ({size_mb} MiB) ← UPDATE AVAILABLE")
lines.append("")
if up_to_date:
lines.append(f"Up to date ({len(up_to_date)}):")
for img in sorted(up_to_date, key=lambda x: x.get("repository", "")):
repo = img.get("repository", "?")
tags = ", ".join(img.get("tags", []))
size_mb = img.get("size", 0) // (1024 * 1024)
lines.append(f" {repo}:{tags} ({size_mb} MiB)")
if not upgradable:
lines.append("All images are up to date.")
return "\n".join(lines)
async def _filter_images_for_project(
client: DsmClient,
project_name: str,
all_images: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Filter images to those used by a specific project.
Fetches project details and cross-references container image IDs.
Falls back to name-based matching if project details unavailable.
Args:
client: DsmClient instance.
project_name: Project name to filter for.
all_images: Full list of images from SYNO.Docker.Image.
Returns:
Subset of images used by the project.
"""
# Get project details to find used images
try:
# Find project by name
list_data = await client.request("SYNO.Docker.Project", "list")
projects: dict[str, Any] = list_data if isinstance(list_data, dict) else {}
project_entry = next(
(p for p in projects.values() if p.get("name") == project_name), None
)
if not project_entry:
return []
project_id = project_entry.get("id", "")
# Get project detail which includes container image info
detail_data = await client.request(
"SYNO.Docker.Project",
"get",
params={"id": project_id},
)
containers = detail_data.get("containers", []) or []
image_ids: set[str] = set()
image_names: set[str] = set()
for container in containers:
img_id = container.get("Image", "")
if img_id:
image_ids.add(img_id)
cfg_image = container.get("Config", {}).get("Image", "")
if cfg_image:
# Strip tag for name matching
name = cfg_image.split(":")[0] if ":" in cfg_image else cfg_image
image_names.add(name)
# Match images
result = []
for img in all_images:
img_id = img.get("id", "")
repo = img.get("repository", "")
if img_id in image_ids or repo in image_names:
result.append(img)
return result
except Exception as e:
logger.debug("Could not filter images for project '%s': %s", project_name, e)
# Fallback: return all images
return all_images
@@ -0,0 +1,227 @@
"""MCP tools for SYNO.Docker.Project: list, status, start, stop, redeploy."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
from mcp_synology_container.config import AppConfig
from mcp_synology_container.dsm_client import DsmClient
logger = logging.getLogger(__name__)
def register_projects(mcp: FastMCP, config: AppConfig, client: DsmClient) -> None:
"""Register all project management tools with the MCP server."""
@mcp.tool()
async def list_projects() -> str:
"""List all Container Manager projects with their current status.
Returns a formatted table of projects including name, status, path,
and container count.
"""
try:
data = await client.request("SYNO.Docker.Project", "list")
except Exception as e:
return f"Error listing projects: {e}"
projects: dict[str, Any] = data if isinstance(data, dict) else {}
if not projects:
return "No projects found."
lines = ["Projects:", ""]
for project_id, proj in sorted(projects.items(), key=lambda x: x[1].get("name", "")):
name = proj.get("name", "?")
status = proj.get("status", "?")
path = proj.get("path", "?")
container_count = len(proj.get("containerIds", []))
lines.append(f" {name}")
lines.append(f" Status: {status}")
lines.append(f" Path: {path}")
lines.append(f" Containers: {container_count}")
lines.append("")
return "\n".join(lines).rstrip()
@mcp.tool()
async def get_project_status(project_name: str) -> str:
"""Get detailed status of a specific project.
Args:
project_name: Name of the project to inspect.
"""
project = await _find_project(client, project_name)
if project is None:
return f"Project '{project_name}' not found."
return _format_project_detail(project)
@mcp.tool()
async def start_project(project_name: str) -> str:
"""Start a Container Manager project.
Args:
project_name: Name of the project to start.
"""
project = await _find_project(client, project_name)
if project is None:
return f"Project '{project_name}' not found."
project_id = project.get("id", "")
try:
await client.request(
"SYNO.Docker.Project",
"start",
params={"id": project_id},
)
return f"Project '{project_name}' started successfully."
except Exception as e:
return f"Error starting project '{project_name}': {e}"
@mcp.tool()
async def stop_project(project_name: str, confirmed: bool = False) -> str:
"""Stop a running Container Manager project.
This operation stops all containers in the project.
Requires confirmation before executing.
Args:
project_name: Name of the project to stop.
confirmed: Must be True to proceed. Set to True to confirm the stop operation.
"""
if not confirmed:
return (
f"Stopping project '{project_name}' will halt all its containers.\n"
f"Call this tool again with confirmed=True to proceed."
)
project = await _find_project(client, project_name)
if project is None:
return f"Project '{project_name}' not found."
project_id = project.get("id", "")
try:
await client.request(
"SYNO.Docker.Project",
"stop",
params={"id": project_id},
)
return f"Project '{project_name}' stopped successfully."
except Exception as e:
return f"Error stopping project '{project_name}': {e}"
@mcp.tool()
async def redeploy_project(project_name: str, confirmed: bool = False) -> str:
"""Redeploy a project: pull latest images, stop, and restart.
This operation will briefly take the project offline.
Requires confirmation before executing.
Args:
project_name: Name of the project to redeploy.
confirmed: Must be True to proceed. Set to True to confirm the redeploy.
"""
if not confirmed:
return (
f"Redeploying project '{project_name}' will:\n"
f" 1. Pull latest images\n"
f" 2. Stop all containers\n"
f" 3. Restart with new images\n\n"
f"Call this tool again with confirmed=True to proceed."
)
project = await _find_project(client, project_name)
if project is None:
return f"Project '{project_name}' not found."
project_id = project.get("id", "")
results = []
try:
# Step 1: Pull latest images via build (triggers compose pull)
results.append("Step 1/3: Pulling latest images...")
try:
await client.request(
"SYNO.Docker.Project",
"build",
params={"id": project_id, "force": "true"},
)
results.append(" Images pulled.")
except Exception as e:
results.append(f" Warning: pull step failed ({e}), continuing with restart.")
# Step 2: Stop the project
results.append("Step 2/3: Stopping project...")
await client.request(
"SYNO.Docker.Project",
"stop",
params={"id": project_id},
)
results.append(" Project stopped.")
# Step 3: Start the project
results.append("Step 3/3: Starting project...")
await client.request(
"SYNO.Docker.Project",
"start",
params={"id": project_id},
)
results.append(" Project started.")
results.append(f"\nProject '{project_name}' redeployed successfully.")
except Exception as e:
results.append(f"Error during redeploy: {e}")
return "\n".join(results)
async def _find_project(client: DsmClient, name: str) -> dict[str, Any] | None:
"""Find a project by name from the list.
Args:
client: DsmClient instance.
name: Project name to search for.
Returns:
Project dict if found, None otherwise.
"""
try:
data = await client.request("SYNO.Docker.Project", "list")
except Exception:
return None
projects: dict[str, Any] = data if isinstance(data, dict) else {}
for project in projects.values():
if project.get("name") == name:
return dict(project)
return None
def _format_project_detail(project: dict[str, Any]) -> str:
"""Format project details as human-readable text."""
lines = [
f"Project: {project.get('name', '?')}",
f" ID: {project.get('id', '?')}",
f" Status: {project.get('status', '?')}",
f" Path: {project.get('path', '?')}",
f" Share path: {project.get('share_path', '?')}",
f" Created: {project.get('created_at', '?')}",
f" Updated: {project.get('updated_at', '?')}",
]
container_ids = project.get("containerIds", [])
lines.append(f" Containers: {len(container_ids)}")
for cid in container_ids:
lines.append(f" - {cid[:12]}")
services = project.get("services") or []
if services:
lines.append(f" Services: {len(services)}")
for svc in services:
lines.append(f" - {svc.get('display_name', '?')}")
return "\n".join(lines)