|
|
|
|
|
|
|
import json |
|
import shutil |
|
import hashlib |
|
from pathlib import Path |
|
from git import Repo |
|
from datetime import datetime |
|
import argparse |
|
|
|
|
|
REPO_LIST_PATH = Path("data/metadata/repos_filtered.json") |
|
CLONE_DIR = Path("temp_repos") |
|
OUTPUT_FILE = Path("data/raw/dockerfiles.jsonl") |
|
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
REMOVE_DIRS = [".git", ".github", "docs", "tests", "__pycache__", ".idea", ".vscode"] |
|
|
|
def clean_repo(path: Path): |
|
for d in REMOVE_DIRS: |
|
shutil.rmtree(path / d, ignore_errors=True) |
|
|
|
def compute_sha1(text: str) -> str: |
|
return hashlib.sha1(text.encode("utf-8")).hexdigest() |
|
|
|
def is_valid_dockerfile(path: Path) -> bool: |
|
try: |
|
text = path.read_text(encoding="utf-8").strip() |
|
lines = [l.strip().lower() for l in text.splitlines() if l.strip()] |
|
if len(lines) < 5 or path.stat().st_size > 200_000: |
|
return False |
|
top_lines = lines[:10] |
|
has_from = any(l.startswith("from") for l in top_lines) |
|
has_run = any(l.startswith(("run", "cmd", "copy")) for l in lines) |
|
return has_from and has_run |
|
except Exception as e: |
|
print(f"⚠️ Błąd walidacji pliku {path}: {e}") |
|
return False |
|
|
|
def find_dockerfiles(repo_path: Path) -> list[Path]: |
|
return [ |
|
f for f in repo_path.rglob("*") |
|
if f.name.lower() == "dockerfile" and f.is_file() |
|
] |
|
|
|
def clone_repo(url: str, full_name: str) -> Path | None: |
|
dest = CLONE_DIR / full_name.replace("/", "__") |
|
if dest.exists(): |
|
print(f"⚠️ Repo {full_name} już istnieje – pomijam klonowanie.") |
|
return dest |
|
try: |
|
print(f"⬇️ Klonuję {full_name}...") |
|
Repo.clone_from(url, dest, depth=1) |
|
clean_repo(dest) |
|
return dest |
|
except Exception as e: |
|
print(f"❌ Błąd klonowania {full_name}: {e}") |
|
return None |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--purge", action="store_true", help="Usuń repozytorium po ekstrakcji") |
|
args = parser.parse_args() |
|
|
|
with open(REPO_LIST_PATH) as f: |
|
repos = json.load(f) |
|
|
|
saved, skipped = 0, 0 |
|
seen_hashes = set() |
|
|
|
with open(OUTPUT_FILE, "w", encoding="utf-8") as out_f: |
|
for repo in repos: |
|
full_name = repo["fullName"] |
|
url = repo["url"] |
|
repo_path = clone_repo(url, full_name) |
|
if not repo_path: |
|
continue |
|
|
|
for file in find_dockerfiles(repo_path): |
|
if not is_valid_dockerfile(file): |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
lines = file.read_text(encoding="utf-8").strip().splitlines() |
|
lines = [l.rstrip() for l in lines if l.strip()] |
|
file_id = compute_sha1("\n".join(lines)) |
|
if file_id in seen_hashes: |
|
skipped += 1 |
|
continue |
|
seen_hashes.add(file_id) |
|
|
|
json.dump({ |
|
"repo": full_name, |
|
"path": str(file.relative_to(repo_path)), |
|
"file_id": file_id, |
|
"content": lines, |
|
"size_bytes": file.stat().st_size, |
|
"line_count": len(lines), |
|
"valid": True, |
|
"cloned_at": datetime.now().isoformat() |
|
}, out_f) |
|
out_f.write("\n") |
|
saved += 1 |
|
|
|
except Exception as e: |
|
print(f"⚠️ Błąd przy zapisie {file}: {e}") |
|
skipped += 1 |
|
|
|
if args.purge: |
|
shutil.rmtree(repo_path, ignore_errors=True) |
|
|
|
print(f"\n✅ Zapisano {saved} poprawnych Dockerfile do {OUTPUT_FILE}") |
|
print(f"🚫 Pominięto {skipped} plików (nieważne, błędne, zduplikowane)") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|