binary-dockerfile-model / scripts /06_label_with_fixes.py
LeeSek's picture
Add scripts
e9b8340 verified
# 06_label_with_fixes.py – wersja v4
import json
import tempfile
import subprocess
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
# === Ścieżki ===
INPUT_PATH = Path("data/raw/dockerfiles.jsonl")
OUTPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl")
FAILED_LOG = Path("data/labeled/failed_dockerfiles.jsonl")
MISSING_FIXES_LOG = Path("data/labeled/missing_fixes.txt")
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
FIXES_PATH = Path("data/fixes/fixes.json")
HADOLINT_BIN = "hadolint"
MAX_WORKERS = 6
TIMEOUT_SECONDS = 5
# === Globalny słownik fixów ===
with open(FIXES_PATH, encoding="utf-8") as f:
FIXES = json.load(f)
MISSING_FIXES = set()
def attach_fixes(rules_triggered: list[str]) -> dict:
suggestions = {}
for rule in rules_triggered:
if rule in FIXES:
suggestions[rule] = FIXES[rule]
else:
MISSING_FIXES.add(rule)
return suggestions
def lint_dockerfile(entry: dict) -> dict:
try:
content = entry["content"]
joined = "\n".join(content)
with tempfile.NamedTemporaryFile("w", suffix=".Dockerfile", delete=False) as tmp:
tmp.write(joined)
tmp.flush()
temp_path = tmp.name
result = subprocess.run(
[HADOLINT_BIN, temp_path, "-f", "json"],
capture_output=True,
text=True,
timeout=TIMEOUT_SECONDS
)
Path(temp_path).unlink(missing_ok=True)
if result.returncode == 0:
return {
"label": "good",
"rules_triggered": [],
"lines": {},
"fix_suggestions": {},
"repo": entry["repo"],
"path": entry["path"],
"content": content,
"timestamp": datetime.now().isoformat()
}
try:
findings = json.loads(result.stdout)
rules = sorted(set(item["code"] for item in findings if "code" in item))
line_map = {}
for item in findings:
code = item.get("code")
line = item.get("line")
if code and line:
line_map.setdefault(code, line)
fix_suggestions = attach_fixes(rules)
except Exception as e:
rules = ["lint-parse-error"]
line_map = {}
fix_suggestions = {}
return {
"label": "bad",
"rules_triggered": rules,
"lines": line_map,
"fix_suggestions": fix_suggestions,
"repo": entry["repo"],
"path": entry["path"],
"content": content,
"timestamp": datetime.now().isoformat()
}
except subprocess.TimeoutExpired:
return {
"label": "bad",
"rules_triggered": ["lint-timeout"],
"lines": {},
"fix_suggestions": {},
"repo": entry.get("repo"),
"path": entry.get("path"),
"content": entry.get("content"),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"label": "bad",
"rules_triggered": [f"lint-error:{str(e)}"],
"lines": {},
"fix_suggestions": {},
"repo": entry.get("repo"),
"path": entry.get("path"),
"content": entry.get("content"),
"timestamp": datetime.now().isoformat()
}
def main():
with open(INPUT_PATH, encoding="utf-8") as f:
records = [json.loads(line) for line in f if line.strip()]
print(f"🚀 Start analizy {len(records)} Dockerfile (wątki={MAX_WORKERS})")
results, failed = [], []
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(lint_dockerfile, row) for row in records]
for i, future in enumerate(as_completed(futures)):
try:
result = future.result()
if "rules_triggered" not in result:
failed.append(result)
else:
results.append(result)
except Exception as e:
failed.append({
"label": "bad",
"rules_triggered": [f"future-error:{str(e)}"],
"lines": {},
"fix_suggestions": {},
"repo": "unknown",
"path": "unknown",
"content": [],
"timestamp": datetime.now().isoformat()
})
if (i + 1) % 250 == 0:
print(f" 🔄 {i+1}/{len(records)} przetworzonych...")
with open(OUTPUT_PATH, "w", encoding="utf-8") as f_out:
for rec in results:
json.dump(rec, f_out)
f_out.write("\n")
with open(FAILED_LOG, "w", encoding="utf-8") as f_fail:
for rec in failed:
json.dump(rec, f_fail)
f_fail.write("\n")
if MISSING_FIXES:
print(f"\n⚠️ Brakuje fixów dla {len(MISSING_FIXES)} reguł – zapisuję do {MISSING_FIXES_LOG}")
with open(MISSING_FIXES_LOG, "w", encoding="utf-8") as f_miss:
for rule in sorted(MISSING_FIXES):
f_miss.write(rule + "\n")
else:
print("✅ Wszystkie reguły mają przypisany fix!")
print(f"\n✅ Zapisano {len(results)} Dockerfile z etykietami i fixami → {OUTPUT_PATH}")
print(f"❌ Nieudanych: {len(failed)}{FAILED_LOG}")
if __name__ == "__main__":
main()