MatteoMass's picture
merged
a65e06d
import logging
import httpx
from pmcp.mcp_server.github_server.utils.repo_to_text_utils import fetch_file_contents, fetch_repo_sha, fetch_repo_tree, format_repo_contents, parse_repo_url
GITHUB_API_BASE = "https://api.github.com/repos"
class GithubClient:
"""
Client class for interacting with the Github API over REST.
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = GITHUB_API_BASE
self.client = httpx.AsyncClient(base_url=self.base_url)
async def close(self):
await self.client.aclose()
async def GET(self, endpoint: str, params: dict = None):
return await self._request("get", endpoint, params=params)
async def POST(self, endpoint: str, json: dict):
return await self._request("post", endpoint, json=json)
async def PATCH(self, endpoint: str, json: dict):
return await self._request("patch", endpoint, json=json)
async def PUT(self, endpoint: str, json: dict = None):
return await self._request("put", endpoint, json=json)
async def _request(self, method: str, endpoint: str, **kwargs):
headers = self._get_headers()
try:
response = await self.client.request(method, endpoint, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise httpx.HTTPStatusError(
f"Failed to {method.upper()} {endpoint}: {str(e)}",
request=e.request,
response=e.response,
)
except httpx.RequestError as e:
raise httpx.RequestError(f"Failed to {method.upper()} {endpoint}: {str(e)}")
def _get_headers(self):
return {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {self.api_key}",
"X-GitHub-Api-Version": "2022-11-28",
}
async def REPO_TO_TEXT(self, repo_url: str):
owner, repo, ref, path = parse_repo_url(repo_url)
sha = fetch_repo_sha(owner, repo, ref, path, self.api_key)
tree = fetch_repo_tree(owner, repo, sha, self.api_key)
blobs = [item for item in tree if item['type'] == 'blob']
common_exts = ('.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.html', '.css')
selected_files = [item for item in blobs if item['path'].lower().endswith(common_exts)]
for item in selected_files:
item['url'] = f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}?ref={ref}" if ref else f"https://api.github.com/repos/{owner}/{repo}/contents/{item['path']}"
contents = fetch_file_contents(selected_files,self. api_key)
return format_repo_contents(contents)