|
|
|
|
|
import json |
|
import tempfile |
|
import subprocess |
|
from pathlib import Path |
|
from concurrent.futures import ProcessPoolExecutor, as_completed |
|
from datetime import datetime |
|
|
|
|
|
INPUT_PATH = Path("data/raw/dockerfiles.jsonl") |
|
OUTPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl") |
|
FAILED_LOG = Path("data/labeled/failed_dockerfiles.jsonl") |
|
MISSING_FIXES_LOG = Path("data/labeled/missing_fixes.txt") |
|
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
FIXES_PATH = Path("data/fixes/fixes.json") |
|
HADOLINT_BIN = "hadolint" |
|
MAX_WORKERS = 6 |
|
TIMEOUT_SECONDS = 5 |
|
|
|
|
|
with open(FIXES_PATH, encoding="utf-8") as f: |
|
FIXES = json.load(f) |
|
|
|
MISSING_FIXES = set() |
|
|
|
def attach_fixes(rules_triggered: list[str]) -> dict: |
|
suggestions = {} |
|
for rule in rules_triggered: |
|
if rule in FIXES: |
|
suggestions[rule] = FIXES[rule] |
|
else: |
|
MISSING_FIXES.add(rule) |
|
return suggestions |
|
|
|
def lint_dockerfile(entry: dict) -> dict: |
|
try: |
|
content = entry["content"] |
|
joined = "\n".join(content) |
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".Dockerfile", delete=False) as tmp: |
|
tmp.write(joined) |
|
tmp.flush() |
|
temp_path = tmp.name |
|
|
|
result = subprocess.run( |
|
[HADOLINT_BIN, temp_path, "-f", "json"], |
|
capture_output=True, |
|
text=True, |
|
timeout=TIMEOUT_SECONDS |
|
) |
|
|
|
Path(temp_path).unlink(missing_ok=True) |
|
|
|
if result.returncode == 0: |
|
return { |
|
"label": "good", |
|
"rules_triggered": [], |
|
"lines": {}, |
|
"fix_suggestions": {}, |
|
"repo": entry["repo"], |
|
"path": entry["path"], |
|
"content": content, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
try: |
|
findings = json.loads(result.stdout) |
|
rules = sorted(set(item["code"] for item in findings if "code" in item)) |
|
line_map = {} |
|
for item in findings: |
|
code = item.get("code") |
|
line = item.get("line") |
|
if code and line: |
|
line_map.setdefault(code, line) |
|
|
|
fix_suggestions = attach_fixes(rules) |
|
|
|
except Exception as e: |
|
rules = ["lint-parse-error"] |
|
line_map = {} |
|
fix_suggestions = {} |
|
|
|
return { |
|
"label": "bad", |
|
"rules_triggered": rules, |
|
"lines": line_map, |
|
"fix_suggestions": fix_suggestions, |
|
"repo": entry["repo"], |
|
"path": entry["path"], |
|
"content": content, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
except subprocess.TimeoutExpired: |
|
return { |
|
"label": "bad", |
|
"rules_triggered": ["lint-timeout"], |
|
"lines": {}, |
|
"fix_suggestions": {}, |
|
"repo": entry.get("repo"), |
|
"path": entry.get("path"), |
|
"content": entry.get("content"), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
except Exception as e: |
|
return { |
|
"label": "bad", |
|
"rules_triggered": [f"lint-error:{str(e)}"], |
|
"lines": {}, |
|
"fix_suggestions": {}, |
|
"repo": entry.get("repo"), |
|
"path": entry.get("path"), |
|
"content": entry.get("content"), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
def main(): |
|
with open(INPUT_PATH, encoding="utf-8") as f: |
|
records = [json.loads(line) for line in f if line.strip()] |
|
|
|
print(f"🚀 Start analizy {len(records)} Dockerfile (wątki={MAX_WORKERS})") |
|
|
|
results, failed = [], [] |
|
|
|
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor: |
|
futures = [executor.submit(lint_dockerfile, row) for row in records] |
|
|
|
for i, future in enumerate(as_completed(futures)): |
|
try: |
|
result = future.result() |
|
if "rules_triggered" not in result: |
|
failed.append(result) |
|
else: |
|
results.append(result) |
|
except Exception as e: |
|
failed.append({ |
|
"label": "bad", |
|
"rules_triggered": [f"future-error:{str(e)}"], |
|
"lines": {}, |
|
"fix_suggestions": {}, |
|
"repo": "unknown", |
|
"path": "unknown", |
|
"content": [], |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
if (i + 1) % 250 == 0: |
|
print(f" 🔄 {i+1}/{len(records)} przetworzonych...") |
|
|
|
with open(OUTPUT_PATH, "w", encoding="utf-8") as f_out: |
|
for rec in results: |
|
json.dump(rec, f_out) |
|
f_out.write("\n") |
|
|
|
with open(FAILED_LOG, "w", encoding="utf-8") as f_fail: |
|
for rec in failed: |
|
json.dump(rec, f_fail) |
|
f_fail.write("\n") |
|
|
|
if MISSING_FIXES: |
|
print(f"\n⚠️ Brakuje fixów dla {len(MISSING_FIXES)} reguł – zapisuję do {MISSING_FIXES_LOG}") |
|
with open(MISSING_FIXES_LOG, "w", encoding="utf-8") as f_miss: |
|
for rule in sorted(MISSING_FIXES): |
|
f_miss.write(rule + "\n") |
|
else: |
|
print("✅ Wszystkie reguły mają przypisany fix!") |
|
|
|
print(f"\n✅ Zapisano {len(results)} Dockerfile z etykietami i fixami → {OUTPUT_PATH}") |
|
print(f"❌ Nieudanych: {len(failed)} → {FAILED_LOG}") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|