File size: 2,590 Bytes
e9b8340 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# 07_balance_dataset.py
import json
import random
from pathlib import Path
from collections import Counter
import shutil
# === Ścieżki
INPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl")
BACKUP_PATH = Path("data/labeled/labeled_dockerfiles_backup.jsonl")
TOP_RULES_PATH = Path("data/metadata/top_rules.json")
OUTPUT_PATH = INPUT_PATH
MAX_GOOD = 1500
MAX_BAD = 15000
TOP_N = 30
# === Backup
if INPUT_PATH.exists():
if not BACKUP_PATH.exists():
print(f"📦 Tworzę kopię zapasową → {BACKUP_PATH.name}")
shutil.copy(INPUT_PATH, BACKUP_PATH)
else:
print(f"ℹ️ Kopia zapasowa już istnieje: {BACKUP_PATH.name}")
# === Wczytaj top 30 reguł
with open(TOP_RULES_PATH, encoding="utf-8") as f:
top_rules = set(json.load(f)[:TOP_N])
print(f"🏆 Używamy top {TOP_N} reguł")
# === Wczytywanie danych
print("🔍 Wczytywanie danych...")
good_samples = []
bad_samples = []
with open(INPUT_PATH, encoding="utf-8") as f:
for line in f:
obj = json.loads(line)
if obj["label"] == "good":
good_samples.append(obj)
elif obj["label"] == "bad":
rules = set(obj.get("rules_triggered", []))
if rules & top_rules:
bad_samples.append(obj)
print(f"✅ Good: {len(good_samples)} | ❌ Bad zawierające top {TOP_N} reguły: {len(bad_samples)}")
# === Losowy wybór GOOD
balanced_good = random.sample(good_samples, min(MAX_GOOD, len(good_samples)))
# === Wybór BAD wg rzadkości top 30 reguł
print("⚙️ Oceniam pliki BAD pod kątem rzadkości reguł...")
rule_freq = Counter()
for sample in bad_samples:
rules = sample.get("rules_triggered", [])
rule_freq.update(r for r in rules if r in top_rules)
def compute_score(sample):
rules = set(sample.get("rules_triggered", [])) & top_rules
return sum(1 / rule_freq[r] for r in rules if rule_freq[r] > 0)
scored_bad = sorted(
bad_samples,
key=lambda s: (
compute_score(s),
-len(set(s.get("rules_triggered", [])) & top_rules)
),
reverse=True
)
balanced_bad = scored_bad[:MAX_BAD]
# === Łączenie i zapis
balanced_all = balanced_good + balanced_bad
random.shuffle(balanced_all)
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_PATH, "w", encoding="utf-8") as f_out:
for rec in balanced_all:
json.dump(rec, f_out)
f_out.write("\n")
print(f"\n✅ Zapisano zbalansowany zbiór (tylko top {TOP_N} reguły): {len(balanced_all)} → {OUTPUT_PATH.name}")
print(f" - Good: {len(balanced_good)}")
print(f" - Bad: {len(balanced_bad)}")
|