File size: 3,771 Bytes
e0be88b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import json

import datasets

from tests.trainer.test_trainer import StoreLossCallback
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    DataCollatorForLanguageModeling,
    HfArgumentParser,
    Trainer,
    TrainingArguments,
    set_seed,
)
from transformers.testing_utils import (
    TestCasePlus,
    backend_device_count,
    execute_subprocess_async,
    get_torch_dist_unique_port,
    require_torch_multi_accelerator,
    torch_device,
)


class TestTrainerDistributedLoss(TestCasePlus):
    @require_torch_multi_accelerator
    def test_trainer(self):
        device_count = backend_device_count(torch_device)
        min_bs = 2
        output_dir = self.get_auto_remove_tmp_dir()
        for gpu_num, enable, bs, name in (
            (1, True, min_bs * device_count, "base"),
            (device_count, False, min_bs, "broken"),
            (device_count, True, min_bs, "fixed"),
        ):
            distributed_args = f"""--nproc_per_node={gpu_num}
                --master_port={get_torch_dist_unique_port()}
                {self.test_file_dir}/test_trainer_distributed_loss.py
            """.split()
            args = f"--output_dir {output_dir}/{name} --per_device_train_batch_size {bs} --average_tokens_across_devices {enable}".split()
            cmd = ["torchrun"] + distributed_args + args
            execute_subprocess_async(cmd, env=self.get_env())
        with open(f"{output_dir}/base_losses.json") as f:
            base_loss = json.load(f)
        with open(f"{output_dir}/broken_losses.json") as f:
            broken_loss = json.load(f)
        with open(f"{output_dir}/fixed_losses.json") as f:
            fixed_loss = json.load(f)

        broken_diff = [abs(base_loss[i] - broken_loss[i]) for i in range(len(base_loss))]
        fixed_diff = [abs(base_loss[i] - fixed_loss[i]) for i in range(len(base_loss))]
        sum_base = sum(base_loss)
        sum_broken = sum(broken_loss)
        relative_broken = abs(sum_base - sum_broken) / max(sum_base, sum_broken)

        # the gap may be smaller for other models, but it still ok.
        self.assertGreater(max(broken_diff), 0.5)
        self.assertLess(max(fixed_diff), 0.005)
        self.assertLess(relative_broken, 0.1)


def run_distributed_training(training_args):
    set_seed(42)
    model_name = "nickypro/tinyllama-15M"
    dataset_name = "wikitext"
    dataset_config = "wikitext-2-raw-v1"
    dataset = datasets.load_dataset(dataset_name, dataset_config, split="train[:100]")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    def tokenize_function(examples):
        return tokenizer(examples["text"], max_length=16, padding="max_length", truncation=True)

    tokenized_dataset = dataset.map(tokenize_function, batched=True)

    tokenizer.pad_token = tokenizer.eos_token
    data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

    model = AutoModelForCausalLM.from_pretrained(model_name)

    loss_callback = StoreLossCallback()

    training_args.logging_steps = 1
    training_args.max_steps = 10
    training_args.learning_rate = 3e-4
    training_args.disable_tqdm = True
    training_args.dataloader_drop_last = True
    training_args.report_to = []

    trainer = Trainer(
        model,
        training_args,
        train_dataset=tokenized_dataset,
        callbacks=[loss_callback],
        data_collator=data_collator,
    )
    trainer.train()
    with open(training_args.output_dir + "_losses.json", "w") as f:
        json.dump(loss_callback.losses, f)


if __name__ == "__main__":
    parser = HfArgumentParser((TrainingArguments,))
    training_args = parser.parse_args_into_dataclasses()[0]
    run_distributed_training(training_args)