feat: add check_permission + 3 sharing tools (v0.2.10)
Implements Group 3 of the planned tool set: - check_permission: SYNO.FileStation.CheckPermission/write - create_sharing_link: SYNO.FileStation.Sharing/create (password + expiry optional) - list_sharing_links: SYNO.FileStation.Sharing/list (paginated table) - delete_sharing_link: SYNO.FileStation.Sharing/delete Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
"""MCP server for Synology FileStation."""
|
||||
|
||||
__version__ = "0.2.9"
|
||||
__version__ = "0.2.10"
|
||||
|
||||
@@ -993,3 +993,148 @@ def register_filestation(
|
||||
return f"Error: {e}"
|
||||
|
||||
return f"Uploaded: {path}/{filename}"
|
||||
|
||||
# ── permission + sharing tools ────────────────────────────────────────
|
||||
|
||||
@mcp.tool()
|
||||
async def check_permission(
|
||||
path: str,
|
||||
filename: str,
|
||||
overwrite: bool = False,
|
||||
create_only: bool = False,
|
||||
):
|
||||
"""Check write permission for filename in path.
|
||||
Returns 'Permission granted' or an error message."""
|
||||
from mcp_synology_filestation.client import SynologyError
|
||||
|
||||
req_params: dict[str, Any] = {"path": path, "filename": filename}
|
||||
if overwrite:
|
||||
req_params["overwrite"] = "true"
|
||||
if create_only:
|
||||
req_params["create_only"] = "true"
|
||||
|
||||
try:
|
||||
await client.request(
|
||||
"SYNO.FileStation.CheckPermission",
|
||||
"write",
|
||||
version=3,
|
||||
params=req_params,
|
||||
)
|
||||
except SynologyError as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
return f"Permission granted: write {filename!r} into {path}"
|
||||
|
||||
@mcp.tool()
|
||||
async def create_sharing_link(
|
||||
path: str,
|
||||
password: str = "",
|
||||
date_expired: str = "",
|
||||
date_available: str = "",
|
||||
):
|
||||
"""Create a public sharing link for a file or folder.
|
||||
date_expired/date_available: YYYY-MM-DD, optional."""
|
||||
from mcp_synology_filestation.client import SynologyError
|
||||
|
||||
req_params: dict[str, Any] = {"path": json.dumps(path)}
|
||||
if password:
|
||||
req_params["password"] = password
|
||||
if date_expired:
|
||||
req_params["date_expired"] = date_expired
|
||||
if date_available:
|
||||
req_params["date_available"] = date_available
|
||||
|
||||
try:
|
||||
data = await client.request(
|
||||
"SYNO.FileStation.Sharing",
|
||||
"create",
|
||||
version=3,
|
||||
params=req_params,
|
||||
)
|
||||
except SynologyError as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
links = data.get("links", [])
|
||||
if not links:
|
||||
return "Error: DSM returned no sharing link."
|
||||
|
||||
link = links[0]
|
||||
link_id = link.get("id", "?")
|
||||
url = link.get("url", "?")
|
||||
has_password = link.get("has_password", False)
|
||||
|
||||
lines = [f"Sharing link created: {url}", f"ID: {link_id}"]
|
||||
if has_password:
|
||||
lines.append("Password protected: yes")
|
||||
return "\n".join(lines)
|
||||
|
||||
@mcp.tool()
|
||||
async def list_sharing_links(offset: int = 0, limit: int = 100):
|
||||
"""List sharing links (paginated). Table: ID, URL, path, owner, expiry, status."""
|
||||
from mcp_synology_filestation.client import SynologyError
|
||||
|
||||
try:
|
||||
data = await client.request(
|
||||
"SYNO.FileStation.Sharing",
|
||||
"list",
|
||||
version=3,
|
||||
params={"offset": str(offset), "limit": str(limit)},
|
||||
)
|
||||
except SynologyError as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
links = data.get("links", [])
|
||||
total = data.get("total", 0)
|
||||
|
||||
if not links:
|
||||
return f"No sharing links found. (total={total})"
|
||||
|
||||
rows = []
|
||||
for lnk in links:
|
||||
link_id = lnk.get("id", "?")
|
||||
url = lnk.get("url", "?")
|
||||
lpath = lnk.get("path", "?")
|
||||
owner = lnk.get("link_owner", "?")
|
||||
expiry = lnk.get("date_expired", "") or "never"
|
||||
status = lnk.get("status", "?")
|
||||
rows.append((link_id, url, lpath, owner, expiry, status))
|
||||
|
||||
headers = ("ID", "URL", "Path", "Owner", "Expires", "Status")
|
||||
col_widths = [max(len(h), *(len(r[i]) for r in rows)) for i, h in enumerate(headers)]
|
||||
|
||||
def _sep() -> str:
|
||||
return "+" + "+".join("-" * (w + 2) for w in col_widths) + "+"
|
||||
|
||||
def _row(vals: tuple[str, ...]) -> str:
|
||||
return "| " + " | ".join(f"{v:<{col_widths[i]}}" for i, v in enumerate(vals)) + " |"
|
||||
|
||||
lines = [_sep(), _row(headers), _sep()]
|
||||
for r in rows:
|
||||
lines.append(_row(r))
|
||||
lines.append(_sep())
|
||||
|
||||
if offset + len(rows) < total:
|
||||
lines.append(
|
||||
f"\nShowing {offset + 1}–{offset + len(rows)} of {total}. Pass offset to paginate."
|
||||
)
|
||||
else:
|
||||
lines.append(f"\n{len(rows)} of {total} link(s).")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
@mcp.tool()
|
||||
async def delete_sharing_link(link_id: str):
|
||||
"""Delete a sharing link by its ID. IRREVERSIBLE."""
|
||||
from mcp_synology_filestation.client import SynologyError
|
||||
|
||||
try:
|
||||
await client.request(
|
||||
"SYNO.FileStation.Sharing",
|
||||
"delete",
|
||||
version=3,
|
||||
params={"id": json.dumps(link_id)},
|
||||
)
|
||||
except SynologyError as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
return f"Deleted sharing link: {link_id}"
|
||||
|
||||
Reference in New Issue
Block a user