Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Repository Service | |
This module provides functionality for cloning and managing Git repositories. | |
""" | |
import os | |
import shutil | |
import tempfile | |
import logging | |
import re | |
from git import Repo | |
from git.exc import GitCommandError | |
logger = logging.getLogger(__name__) | |
class RepositoryService: | |
""" | |
Service for cloning and managing Git repositories. | |
""" | |
def __init__(self, base_temp_dir=None): | |
""" | |
Initialize the RepositoryService. | |
Args: | |
base_temp_dir (str, optional): Base directory for temporary repositories. | |
If None, system temp directory will be used. | |
""" | |
self.base_temp_dir = base_temp_dir or tempfile.gettempdir() | |
self.repos = {} | |
logger.info(f"Initialized RepositoryService with base temp dir: {self.base_temp_dir}") | |
def validate_github_url(self, url): | |
""" | |
Validate if the provided URL is a valid GitHub repository URL. | |
Args: | |
url (str): The GitHub repository URL to validate. | |
Returns: | |
bool: True if the URL is valid, False otherwise. | |
""" | |
# GitHub URL patterns | |
patterns = [ | |
r'^https?://github\.com/[\w.-]+/[\w.-]+(\.git)?$', # https://github.com/user/repo[.git] | |
r'^git@github\.com:[\w.-]+/[\w.-]+(\.git)?$', # git@github.com:user/repo[.git] | |
] | |
for pattern in patterns: | |
if re.match(pattern, url): | |
return True | |
return False | |
def normalize_github_url(self, url): | |
""" | |
Normalize a GitHub URL to a consistent format. | |
Args: | |
url (str): The GitHub repository URL to normalize. | |
Returns: | |
str: The normalized URL. | |
""" | |
# Convert SSH URL to HTTPS URL | |
if url.startswith('git@github.com:'): | |
user_repo = url[len('git@github.com:'):] | |
if user_repo.endswith('.git'): | |
user_repo = user_repo[:-4] | |
return f"https://github.com/{user_repo}" | |
# Ensure HTTPS URL ends without .git | |
if url.startswith('http'): | |
if url.endswith('.git'): | |
return url[:-4] | |
return url | |
def extract_repo_name(self, url): | |
""" | |
Extract repository name from a GitHub URL. | |
Args: | |
url (str): The GitHub repository URL. | |
Returns: | |
str: The repository name. | |
""" | |
normalized_url = self.normalize_github_url(url) | |
return normalized_url.split('/')[-1] | |
def clone_repository(self, url, branch=None): | |
""" | |
Clone a Git repository from the provided URL. | |
Args: | |
url (str): The repository URL to clone. | |
branch (str, optional): The branch to checkout. If None, the default branch is used. | |
Returns: | |
str: The path to the cloned repository. | |
Raises: | |
ValueError: If the URL is not a valid GitHub repository URL. | |
GitCommandError: If there's an error during the Git operation. | |
""" | |
if not self.validate_github_url(url): | |
raise ValueError(f"Invalid GitHub repository URL: {url}") | |
repo_name = self.extract_repo_name(url) | |
repo_dir = os.path.join(self.base_temp_dir, f"codereview_{repo_name}_{os.urandom(4).hex()}") | |
logger.info(f"Cloning repository {url} to {repo_dir}") | |
try: | |
# Clone the repository | |
if branch: | |
repo = Repo.clone_from(url, repo_dir, branch=branch) | |
logger.info(f"Cloned repository {url} (branch: {branch}) to {repo_dir}") | |
else: | |
repo = Repo.clone_from(url, repo_dir) | |
logger.info(f"Cloned repository {url} (default branch) to {repo_dir}") | |
# Store the repository instance | |
self.repos[repo_dir] = repo | |
return repo_dir | |
except GitCommandError as e: | |
logger.error(f"Error cloning repository {url}: {e}") | |
# Clean up the directory if it was created | |
if os.path.exists(repo_dir): | |
shutil.rmtree(repo_dir, ignore_errors=True) | |
raise | |
def get_repository_info(self, repo_path): | |
""" | |
Get information about a repository. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: A dictionary containing repository information. | |
""" | |
if repo_path not in self.repos: | |
try: | |
self.repos[repo_path] = Repo(repo_path) | |
except Exception as e: | |
logger.error(f"Error opening repository at {repo_path}: {e}") | |
return {} | |
repo = self.repos[repo_path] | |
try: | |
# Get the active branch | |
try: | |
active_branch = repo.active_branch.name | |
except TypeError: | |
# Detached HEAD state | |
active_branch = 'HEAD detached' | |
# Get the latest commit | |
latest_commit = repo.head.commit | |
# Get remote URL | |
try: | |
remote_url = repo.remotes.origin.url | |
except AttributeError: | |
remote_url = 'No remote URL found' | |
# Get repository size (approximate) | |
repo_size = sum(os.path.getsize(os.path.join(dirpath, filename)) | |
for dirpath, _, filenames in os.walk(repo_path) | |
for filename in filenames) | |
# Count files | |
file_count = sum(len(files) for _, _, files in os.walk(repo_path)) | |
return { | |
'path': repo_path, | |
'active_branch': active_branch, | |
'latest_commit': { | |
'hash': latest_commit.hexsha, | |
'author': f"{latest_commit.author.name} <{latest_commit.author.email}>", | |
'date': latest_commit.committed_datetime.isoformat(), | |
'message': latest_commit.message.strip(), | |
}, | |
'remote_url': remote_url, | |
'size_bytes': repo_size, | |
'file_count': file_count, | |
} | |
except Exception as e: | |
logger.error(f"Error getting repository info for {repo_path}: {e}") | |
return { | |
'path': repo_path, | |
'error': str(e), | |
} | |
def cleanup_repository(self, repo_path): | |
""" | |
Clean up a cloned repository. | |
Args: | |
repo_path (str): The path to the repository to clean up. | |
Returns: | |
bool: True if the cleanup was successful, False otherwise. | |
""" | |
logger.info(f"Cleaning up repository at {repo_path}") | |
# Remove the repository from the tracked repos | |
if repo_path in self.repos: | |
del self.repos[repo_path] | |
# Remove the directory | |
try: | |
if os.path.exists(repo_path): | |
shutil.rmtree(repo_path, ignore_errors=True) | |
return True | |
except Exception as e: | |
logger.error(f"Error cleaning up repository at {repo_path}: {e}") | |
return False | |
def cleanup_all_repositories(self): | |
""" | |
Clean up all cloned repositories. | |
Returns: | |
bool: True if all cleanups were successful, False otherwise. | |
""" | |
logger.info("Cleaning up all repositories") | |
success = True | |
for repo_path in list(self.repos.keys()): | |
if not self.cleanup_repository(repo_path): | |
success = False | |
return success |