""" MCP Tools exposing GitHub issue operations. """ from typing import List, Dict, Any from mcp.server.fastmcp import Context from pmcp.mcp_server.github_server.services.issues import IssueService from pmcp.mcp_server.github_server.github import github_client service = IssueService(github_client) # ───────────────────────────────────────────────────────────────────────── # # READ async def get_issues(ctx: Context, owner: str, repo: str) -> Dict[str, List[str]]: """ Retrieves issues title and number from repo Args: ctx: FastMCP request context (handles errors). owner (str): Repository owner. repo (str): Repository name. Returns: {"issues": [{"issue_number": 123, "issue_title": "Issue Title"}, ...]} """ try: issues = await service.get_issues(owner, repo) # Extract both title and number, skipping pull requests issue_info = [{"issue_number": i["number"], "issue_title": i["title"]} for i in issues if "pull_request" not in i] return {"issues": issue_info} except Exception as exc: error_msg = f"Failed to get issues: {str(exc)}" await ctx.error(str(exc)) raise # ───────────────────────────────────────────────────────────────────────── # # WRITE async def create_issue( ctx: Context, owner: str, repo: str, title: str, body: str | None = None ) -> Dict[str, Any]: """ Opens a new issue. Args: ctx: FastMCP request context (handles errors). owner (str): Repository owner. repo (str): Repository name. title (str): Issue title. body (str): Body of the issue. Returns: {"url": "...", "number": 123} """ try: result = await service.create_issue(owner, repo, title, body) return {"url": result["html_url"], "number": result["number"]} except Exception as exc: error_msg = f"Failed to create issue {str(exc)}" await ctx.error(str(exc)) raise async def comment_issue( ctx: Context, owner: str, repo: str, issue_number: int, body: str ) -> Dict[str, str]: """ Adds a comment on an existing issue. Args: ctx: FastMCP request context (handles errors). owner (str): Repository owner. repo (str): Repository name. issue_number (int): Issue number. body (str): Body of the issue. """ try: result = await service.comment_issue(owner, repo, issue_number, body) return {"url": result["html_url"]} except Exception as exc: error_msg = f"Failed to add the comment {str(exc)}" await ctx.error(str(exc)) raise async def close_issue( ctx: Context, owner: str, repo: str, issue_number: int ) -> Dict[str, str]: """ Closes an issue. Args: ctx: FastMCP request context (handles errors). owner (str): Repository owner. repo (str): Repository name. issue_number (int): Issue number. Returns: Dict """ try: await service.close_issue(owner, repo, issue_number) return {"status": "closed"} except Exception as exc: error_msg = f"Failed to close the issue number {issue_number} in repo {repo}" await ctx.error(str(exc)) raise