# 04_clone_and_extract.py # Ekstrakcja Dockerfile – wersja v3 (bez parsera, z poprawnym zapisem JSONL) import json import shutil import hashlib from pathlib import Path from git import Repo from datetime import datetime import argparse # === Ścieżki REPO_LIST_PATH = Path("data/metadata/repos_filtered.json") CLONE_DIR = Path("temp_repos") OUTPUT_FILE = Path("data/raw/dockerfiles.jsonl") OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) REMOVE_DIRS = [".git", ".github", "docs", "tests", "__pycache__", ".idea", ".vscode"] def clean_repo(path: Path): for d in REMOVE_DIRS: shutil.rmtree(path / d, ignore_errors=True) def compute_sha1(text: str) -> str: return hashlib.sha1(text.encode("utf-8")).hexdigest() def is_valid_dockerfile(path: Path) -> bool: try: text = path.read_text(encoding="utf-8").strip() lines = [l.strip().lower() for l in text.splitlines() if l.strip()] if len(lines) < 5 or path.stat().st_size > 200_000: return False top_lines = lines[:10] has_from = any(l.startswith("from") for l in top_lines) has_run = any(l.startswith(("run", "cmd", "copy")) for l in lines) return has_from and has_run except Exception as e: print(f"⚠️ Błąd walidacji pliku {path}: {e}") return False def find_dockerfiles(repo_path: Path) -> list[Path]: return [ f for f in repo_path.rglob("*") if f.name.lower() == "dockerfile" and f.is_file() ] def clone_repo(url: str, full_name: str) -> Path | None: dest = CLONE_DIR / full_name.replace("/", "__") if dest.exists(): print(f"⚠️ Repo {full_name} już istnieje – pomijam klonowanie.") return dest try: print(f"⬇️ Klonuję {full_name}...") Repo.clone_from(url, dest, depth=1) clean_repo(dest) return dest except Exception as e: print(f"❌ Błąd klonowania {full_name}: {e}") return None def main(): parser = argparse.ArgumentParser() parser.add_argument("--purge", action="store_true", help="Usuń repozytorium po ekstrakcji") args = parser.parse_args() with open(REPO_LIST_PATH) as f: repos = json.load(f) saved, skipped = 0, 0 seen_hashes = set() with open(OUTPUT_FILE, "w", encoding="utf-8") as out_f: for repo in repos: full_name = repo["fullName"] url = repo["url"] repo_path = clone_repo(url, full_name) if not repo_path: continue for file in find_dockerfiles(repo_path): if not is_valid_dockerfile(file): skipped += 1 continue try: lines = file.read_text(encoding="utf-8").strip().splitlines() lines = [l.rstrip() for l in lines if l.strip()] file_id = compute_sha1("\n".join(lines)) if file_id in seen_hashes: skipped += 1 continue seen_hashes.add(file_id) json.dump({ "repo": full_name, "path": str(file.relative_to(repo_path)), "file_id": file_id, "content": lines, "size_bytes": file.stat().st_size, "line_count": len(lines), "valid": True, "cloned_at": datetime.now().isoformat() }, out_f) out_f.write("\n") saved += 1 except Exception as e: print(f"⚠️ Błąd przy zapisie {file}: {e}") skipped += 1 if args.purge: shutil.rmtree(repo_path, ignore_errors=True) print(f"\n✅ Zapisano {saved} poprawnych Dockerfile do {OUTPUT_FILE}") print(f"🚫 Pominięto {skipped} plików (nieważne, błędne, zduplikowane)") if __name__ == "__main__": main()