MatteoMass's picture
removed the tokens from env
9ad58d0
"""
MCP Tools exposing GitHub issue operations.
"""
from typing import List, Dict, Any
from mcp.server.fastmcp import Context
from pmcp.mcp_server.github_server.services.issues import IssueService
from pmcp.mcp_server.github_server.github import github_client
service = IssueService(github_client)
# ───────────────────────────────────────────────────────────────────────── #
# READ
async def get_issues(ctx: Context, owner: str, repo: str) -> Dict[str, List[str]]:
"""
Retrieves issues title and number from repo
Args:
ctx: FastMCP request context (handles errors).
owner (str): Repository owner.
repo (str): Repository name.
Returns:
{"issues": [{"issue_number": 123, "issue_title": "Issue Title"}, ...]}
"""
try:
issues = await service.get_issues(owner, repo)
# Extract both title and number, skipping pull requests
issue_info = [{"issue_number": i["number"], "issue_title": i["title"]} for i in issues if "pull_request" not in i]
return {"issues": issue_info}
except Exception as exc:
error_msg = f"Failed to get issues: {str(exc)}"
await ctx.error(str(exc))
raise
# ───────────────────────────────────────────────────────────────────────── #
# WRITE
async def create_issue(
ctx: Context, owner: str, repo: str, title: str, body: str | None = None
) -> Dict[str, Any]:
"""
Opens a new issue.
Args:
ctx: FastMCP request context (handles errors).
owner (str): Repository owner.
repo (str): Repository name.
title (str): Issue title.
body (str): Body of the issue.
Returns:
{"url": "...", "number": 123}
"""
try:
result = await service.create_issue(owner, repo, title, body)
return {"url": result["html_url"], "number": result["number"]}
except Exception as exc:
error_msg = f"Failed to create issue {str(exc)}"
await ctx.error(str(exc))
raise
async def comment_issue(
ctx: Context, owner: str, repo: str, issue_number: int, body: str
) -> Dict[str, str]:
"""
Adds a comment on an existing issue.
Args:
ctx: FastMCP request context (handles errors).
owner (str): Repository owner.
repo (str): Repository name.
issue_number (int): Issue number.
body (str): Body of the issue.
"""
try:
result = await service.comment_issue(owner, repo, issue_number, body)
return {"url": result["html_url"]}
except Exception as exc:
error_msg = f"Failed to add the comment {str(exc)}"
await ctx.error(str(exc))
raise
async def close_issue(
ctx: Context, owner: str, repo: str, issue_number: int
) -> Dict[str, str]:
"""
Closes an issue.
Args:
ctx: FastMCP request context (handles errors).
owner (str): Repository owner.
repo (str): Repository name.
issue_number (int): Issue number.
Returns:
Dict
"""
try:
await service.close_issue(owner, repo, issue_number)
return {"status": "closed"}
except Exception as exc:
error_msg = f"Failed to close the issue number {issue_number} in repo {repo}"
await ctx.error(str(exc))
raise