import logging import httpx from pmcp.mcp_server.github_server.utils.repo_to_text_utils import fetch_file_contents, fetch_repo_sha, fetch_repo_tree, format_repo_contents, parse_repo_url GITHUB_API_BASE = "https://api.github.com/repos" class GithubClient: """ Client class for interacting with the Github API over REST. """ def __init__(self, api_key: str): self.api_key = api_key self.base_url = GITHUB_API_BASE self.client = httpx.AsyncClient(base_url=self.base_url) async def close(self): await self.client.aclose() async def GET(self, endpoint: str, params: dict = None): return await self._request("get", endpoint, params=params) async def POST(self, endpoint: str, json: dict): return await self._request("post", endpoint, json=json) async def PATCH(self, endpoint: str, json: dict): return await self._request("patch", endpoint, json=json) async def PUT(self, endpoint: str, json: dict = None): return await self._request("put", endpoint, json=json) async def _request(self, method: str, endpoint: str, **kwargs): headers = self._get_headers() try: response = await self.client.request(method, endpoint, headers=headers, **kwargs) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise httpx.HTTPStatusError( f"Failed to {method.upper()} {endpoint}: {str(e)}", request=e.request, response=e.response, ) except httpx.RequestError as e: raise httpx.RequestError(f"Failed to {method.upper()} {endpoint}: {str(e)}") def _get_headers(self): return { "Accept": "application/vnd.github+json", "Authorization": f"Bearer {self.api_key}", "X-GitHub-Api-Version": "2022-11-28", } async def REPO_TO_TEXT(self, repo_url: str): owner, repo, ref, path = parse_repo_url(repo_url) sha = fetch_repo_sha(owner, repo, ref, path, self.api_key) tree = fetch_repo_tree(owner, repo, sha, self.api_key) blobs = [item for item in tree if item['type'] == 'blob'] common_exts = ('.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.html', '.css') selected_files = [item for item in blobs if item['path'].lower().endswith(common_exts)] for item in selected_files: item['url'] = f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}?ref={ref}" if ref else f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}" contents = fetch_file_contents(selected_files,self. api_key) return format_repo_contents(contents)