# 09.1_prepare_binary_dataset.py (v3) import json from pathlib import Path from datasets import Dataset, DatasetDict from transformers import AutoTokenizer from sklearn.model_selection import train_test_split from collections import Counter import numpy as np # === Konfiguracja === INPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl") OUTPUT_DIR = Path("data/processed/dataset_binary") TOKENIZER_NAME = "microsoft/codebert-base" MAX_LENGTH = 512 SEED = 42 MIN_LINES = 5 # sanity check def load_data(): print("📂 Wczytywanie danych...") records = [] with INPUT_PATH.open(encoding="utf-8") as f: for line in f: obj = json.loads(line) if obj["label"] not in ("good", "bad"): continue content = obj.get("content", []) if isinstance(content, list): content_text = "\n".join(content) else: content_text = content if len(content_text.strip()) < 10 or len(content) < MIN_LINES: continue records.append({ "text": content_text, "label": 0 if obj["label"] == "good" else 1 }) label_counts = Counter([r["label"] for r in records]) lengths = [len(r["text"].splitlines()) for r in records] print(f"✅ Wczytano {len(records)} rekordów") print(f"📊 Rozkład klas: {dict(label_counts)}") print(f"📏 Średnia długość pliku: {np.mean(lengths):.2f} linii") return records def split_data(records): print("🔀 Dzielę dane na train/val/test...") train_val, test = train_test_split( records, test_size=0.1, random_state=SEED, stratify=[r["label"] for r in records] ) train, val = train_test_split( train_val, test_size=0.1111, random_state=SEED, stratify=[r["label"] for r in train_val] ) return train, val, test def tokenize_dataset(train, val, test): print("🔤 Tokenizuję dane...") tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME) ds = DatasetDict({ "train": Dataset.from_list(train), "validation": Dataset.from_list(val), "test": Dataset.from_list(test), }) def tokenize(example): return tokenizer( example["text"], padding="max_length", truncation=True, max_length=MAX_LENGTH ) ds_tokenized = ds.map(tokenize, batched=True) ds_tokenized = ds_tokenized.remove_columns(["text"]) return ds_tokenized def save_dataset(ds_tokenized): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) print(f"💾 Zapisuję dane do {OUTPUT_DIR} ...") ds_tokenized.save_to_disk(str(OUTPUT_DIR)) print("✅ Gotowe.") def main(): records = load_data() train, val, test = split_data(records) ds_tokenized = tokenize_dataset(train, val, test) save_dataset(ds_tokenized) if __name__ == "__main__": main()