# 06_label_with_fixes.py – wersja v4 import json import tempfile import subprocess from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed from datetime import datetime # === Ścieżki === INPUT_PATH = Path("data/raw/dockerfiles.jsonl") OUTPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl") FAILED_LOG = Path("data/labeled/failed_dockerfiles.jsonl") MISSING_FIXES_LOG = Path("data/labeled/missing_fixes.txt") OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) FIXES_PATH = Path("data/fixes/fixes.json") HADOLINT_BIN = "hadolint" MAX_WORKERS = 6 TIMEOUT_SECONDS = 5 # === Globalny słownik fixów === with open(FIXES_PATH, encoding="utf-8") as f: FIXES = json.load(f) MISSING_FIXES = set() def attach_fixes(rules_triggered: list[str]) -> dict: suggestions = {} for rule in rules_triggered: if rule in FIXES: suggestions[rule] = FIXES[rule] else: MISSING_FIXES.add(rule) return suggestions def lint_dockerfile(entry: dict) -> dict: try: content = entry["content"] joined = "\n".join(content) with tempfile.NamedTemporaryFile("w", suffix=".Dockerfile", delete=False) as tmp: tmp.write(joined) tmp.flush() temp_path = tmp.name result = subprocess.run( [HADOLINT_BIN, temp_path, "-f", "json"], capture_output=True, text=True, timeout=TIMEOUT_SECONDS ) Path(temp_path).unlink(missing_ok=True) if result.returncode == 0: return { "label": "good", "rules_triggered": [], "lines": {}, "fix_suggestions": {}, "repo": entry["repo"], "path": entry["path"], "content": content, "timestamp": datetime.now().isoformat() } try: findings = json.loads(result.stdout) rules = sorted(set(item["code"] for item in findings if "code" in item)) line_map = {} for item in findings: code = item.get("code") line = item.get("line") if code and line: line_map.setdefault(code, line) fix_suggestions = attach_fixes(rules) except Exception as e: rules = ["lint-parse-error"] line_map = {} fix_suggestions = {} return { "label": "bad", "rules_triggered": rules, "lines": line_map, "fix_suggestions": fix_suggestions, "repo": entry["repo"], "path": entry["path"], "content": content, "timestamp": datetime.now().isoformat() } except subprocess.TimeoutExpired: return { "label": "bad", "rules_triggered": ["lint-timeout"], "lines": {}, "fix_suggestions": {}, "repo": entry.get("repo"), "path": entry.get("path"), "content": entry.get("content"), "timestamp": datetime.now().isoformat() } except Exception as e: return { "label": "bad", "rules_triggered": [f"lint-error:{str(e)}"], "lines": {}, "fix_suggestions": {}, "repo": entry.get("repo"), "path": entry.get("path"), "content": entry.get("content"), "timestamp": datetime.now().isoformat() } def main(): with open(INPUT_PATH, encoding="utf-8") as f: records = [json.loads(line) for line in f if line.strip()] print(f"🚀 Start analizy {len(records)} Dockerfile (wątki={MAX_WORKERS})") results, failed = [], [] with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(lint_dockerfile, row) for row in records] for i, future in enumerate(as_completed(futures)): try: result = future.result() if "rules_triggered" not in result: failed.append(result) else: results.append(result) except Exception as e: failed.append({ "label": "bad", "rules_triggered": [f"future-error:{str(e)}"], "lines": {}, "fix_suggestions": {}, "repo": "unknown", "path": "unknown", "content": [], "timestamp": datetime.now().isoformat() }) if (i + 1) % 250 == 0: print(f" 🔄 {i+1}/{len(records)} przetworzonych...") with open(OUTPUT_PATH, "w", encoding="utf-8") as f_out: for rec in results: json.dump(rec, f_out) f_out.write("\n") with open(FAILED_LOG, "w", encoding="utf-8") as f_fail: for rec in failed: json.dump(rec, f_fail) f_fail.write("\n") if MISSING_FIXES: print(f"\n⚠️ Brakuje fixów dla {len(MISSING_FIXES)} reguł – zapisuję do {MISSING_FIXES_LOG}") with open(MISSING_FIXES_LOG, "w", encoding="utf-8") as f_miss: for rule in sorted(MISSING_FIXES): f_miss.write(rule + "\n") else: print("✅ Wszystkie reguły mają przypisany fix!") print(f"\n✅ Zapisano {len(results)} Dockerfile z etykietami i fixami → {OUTPUT_PATH}") print(f"❌ Nieudanych: {len(failed)} → {FAILED_LOG}") if __name__ == "__main__": main()