Spaces:
Running
Running
| """ | |
| Download, preprocess and serve the TinyStories dataset as a DataLoader. | |
| """ | |
| import argparse | |
| import glob | |
| import json | |
| import os | |
| import random | |
| from typing import List | |
| from concurrent.futures import ProcessPoolExecutor | |
| from functools import partial | |
| import numpy as np | |
| import requests | |
| import sentencepiece as spm | |
| import torch | |
| import torch.distributed as dist | |
| from tqdm import tqdm | |
| from tokenizer import Tokenizer | |
| DATA_CACHE_DIR = "data" | |
| def download_file(url: str, fname: str, chunk_size=1024): | |
| """Helper function to download a file from a given url""" | |
| resp = requests.get(url, stream=True) | |
| total = int(resp.headers.get("content-length", 0)) | |
| with open(fname, "wb") as file, tqdm( | |
| desc=fname, | |
| total=total, | |
| unit="iB", | |
| unit_scale=True, | |
| unit_divisor=1024, | |
| ) as bar: | |
| for data in resp.iter_content(chunk_size=chunk_size): | |
| size = file.write(data) | |
| bar.update(size) | |
| def download(): | |
| """Downloads the TinyStories dataset to DATA_CACHE_DIR""" | |
| os.makedirs(DATA_CACHE_DIR, exist_ok=True) | |
| # download the TinyStories dataset, unless it's already downloaded | |
| data_url = "https://huggingface.co/datasets/roneneldan/TinyStories/resolve/main/TinyStories_all_data.tar.gz" | |
| data_filename = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data.tar.gz") | |
| if not os.path.exists(data_filename): | |
| print(f"Downloading {data_url} to {data_filename}...") | |
| download_file(data_url, data_filename) | |
| else: | |
| print(f"{data_filename} already exists, skipping download...") | |
| # unpack the tar.gz file into all the data shards (json files) | |
| data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
| if not os.path.exists(data_dir): | |
| os.makedirs(data_dir, exist_ok=True) | |
| print(f"Unpacking {data_filename}...") | |
| os.system(f"tar -xzf {data_filename} -C {data_dir}") | |
| else: | |
| print(f"{data_dir} already exists, skipping unpacking...") | |
| # print a single example just for debugging and such | |
| shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) | |
| with open(shard_filenames[0], "r") as f: | |
| data = json.load(f) | |
| print("Download done.") | |
| print(f"Number of shards: {len(shard_filenames)}") | |
| print(f"Example story:\n{data[0]}") | |
| def train_vocab(vocab_size): | |
| """ | |
| Trains a custom sentencepiece tokenizer on the TinyStories dataset. | |
| The custom tokenizer files will be saved in DATA_CACHE_DIR/tok{N} directories, | |
| where N is the vocab size. This is also where the pretok .bin files will go. | |
| """ | |
| assert vocab_size > 0, "Vocab size must be positive" | |
| # output file prefix path for sentencepiece | |
| prefix = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") | |
| # how many shards we'll use for vocab training, kept low for efficiency | |
| num_shards = 10 | |
| # 1) export a large chunk of text as a single text file tiny.txt | |
| tiny_file = os.path.join(DATA_CACHE_DIR, "tiny.txt") | |
| data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
| shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) | |
| print(f"Writing temporary file {tiny_file} with {num_shards} shards...") | |
| with open(tiny_file, "w", encoding="utf-8") as of: | |
| for shard in tqdm(shard_filenames[:num_shards]): | |
| with open(shard, "r") as f: | |
| data = json.load(f) | |
| for example in data: | |
| text = example["story"] | |
| text = text.strip() | |
| of.write(text + "\n") | |
| print(f"Size is: {os.path.getsize(tiny_file) / 1024 / 1024:.2f} MB") | |
| # 2) train the sentencepiece model | |
| print("Will now train the vocab...") | |
| spm.SentencePieceTrainer.train(input=tiny_file, | |
| model_prefix=prefix, | |
| model_type="bpe", | |
| vocab_size=vocab_size, | |
| self_test_sample_size=0, | |
| input_format="text", | |
| character_coverage=1.0, | |
| num_threads=os.cpu_count(), | |
| split_digits=True, | |
| allow_whitespace_only_pieces=True, | |
| byte_fallback=True, | |
| unk_surface=r" \342\201\207 ", | |
| normalization_rule_name="identity") | |
| # 3) optional cleanup, ask the user if they'd like to delete tiny.txt | |
| dec = input(f"Delete the temporary file {tiny_file}? [y/N] ") | |
| if dec.lower() == "y": | |
| os.remove(tiny_file) | |
| print(f"Deleted {tiny_file}") | |
| print(f"Trained tokenizer is in {prefix}.model") | |
| print("Done.") | |
| def process_shard(args, vocab_size): | |
| shard_id, shard = args | |
| tokenizer_model = get_tokenizer_model_path(vocab_size) | |
| enc = Tokenizer(tokenizer_model) | |
| with open(shard, "r") as f: | |
| data = json.load(f) | |
| all_tokens = [] | |
| for example in tqdm(data, position=shard_id): | |
| text = example["story"] | |
| text = text.strip() # get rid of leading/trailing whitespace | |
| tokens = enc.encode(text, bos=True, eos=False) # encode the text, use BOS | |
| all_tokens.extend(tokens) | |
| # convert to uint16 nparray | |
| all_tokens = np.array(all_tokens, dtype=np.uint16) | |
| # calculate the output filename | |
| if vocab_size == 0: | |
| # if we're using Llama 2, just save the tokenized file in the same dir | |
| tokenized_filename = shard.replace(".json", ".bin") | |
| else: | |
| # save .bin files into a new tok{N} directory | |
| bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") | |
| shard_basename = os.path.basename(shard) | |
| bin_basename = shard_basename.replace(".json", ".bin") | |
| tokenized_filename = os.path.join(bin_dir, bin_basename) | |
| # write the bytes | |
| with open(tokenized_filename, "wb") as f: | |
| f.write(all_tokens.tobytes()) | |
| # calculate the average sequence length (they are separated by BOS=1) | |
| avg_seq_len = all_tokens.size / ((all_tokens == 1).sum()) | |
| print(f"Saved {tokenized_filename}, average seqlen: {avg_seq_len:.2f}") | |
| def pretokenize(vocab_size): | |
| # iterate the shards and tokenize all of them one by one | |
| data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
| shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) | |
| if vocab_size > 0: | |
| # .bin files will be saved into tok{N} directory, create it once here | |
| bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") | |
| os.makedirs(bin_dir, exist_ok=True) | |
| # process all the shards in a process pool | |
| fun = partial(process_shard, vocab_size=vocab_size) | |
| with ProcessPoolExecutor() as executor: | |
| executor.map(fun, enumerate(shard_filenames)) | |
| print("Done.") | |
| class PretokDataset(torch.utils.data.IterableDataset): | |
| """Loads pretokenized examples from disk and yields them as PyTorch tensors.""" | |
| def __init__(self, split, max_seq_len, vocab_size, vocab_source): | |
| super().__init__() | |
| self.split = split | |
| self.max_seq_len = max_seq_len | |
| self.vocab_size = vocab_size | |
| self.vocab_source = vocab_source | |
| def __iter__(self): | |
| # get worker info within a DataLoader | |
| worker_info = torch.utils.data.get_worker_info() | |
| worker_id = worker_info.id if worker_info else 0 | |
| # get DDP rank info | |
| rank = dist.get_rank() if dist.is_initialized() else 0 | |
| # combine the worker_id and worker_rank to create a unique seed for rng | |
| seed = 42 + worker_id + 1337 * rank | |
| rng = random.Random(seed) | |
| print(f"Created a PretokDataset with rng seed {seed}") | |
| if self.vocab_source == "llama2": | |
| # the .bin files are right along the .json files | |
| bin_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
| shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin"))) | |
| elif self.vocab_source == "custom": | |
| # the .bin files are in tok{N} directory | |
| bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{self.vocab_size}") | |
| shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin"))) | |
| # train/test split. let's use only shard 0 for test split, rest train | |
| shard_filenames = shard_filenames[1:] if self.split == "train" else shard_filenames[:1] | |
| assert len(shard_filenames)>0, f"No bin files found in {bin_dir}" | |
| while True: | |
| rng.shuffle(shard_filenames) | |
| for shard in shard_filenames: | |
| # open the dataset for reading but keep it on disk with memmap | |
| m = np.memmap(shard, dtype=np.uint16, mode="r") | |
| num_batches = len(m) // self.max_seq_len | |
| num_batches -= 1 # drop the last partial batch | |
| assert num_batches > 0, "this shard is way too small? investigate." | |
| ixs = list(range(num_batches)) | |
| rng.shuffle(ixs) | |
| for ix in ixs: | |
| start = ix * self.max_seq_len | |
| end = start + self.max_seq_len + 1 | |
| # calling .astype will copy the data into a new numpy array, now in RAM | |
| chunk = torch.from_numpy((m[start:end]).astype(np.int64)) | |
| x = chunk[:-1] | |
| y = chunk[1:] | |
| yield x, y | |
| # ----------------------------------------------------------------------------- | |
| # public interface functions | |
| def get_tokenizer_model_path(vocab_size): | |
| """ | |
| Returns path to the sentencepiece tokenizer model for a given vocab size | |
| vocab_size = 0 designates the default Llama 2 tokenizer, in that case | |
| None is returned. | |
| """ | |
| if vocab_size == 0: | |
| return None | |
| else: | |
| return os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}.model") | |
| class Task: | |
| def iter_batches(batch_size, device, num_workers=0, **dataset_kwargs): | |
| ds = PretokDataset(**dataset_kwargs) | |
| dl = torch.utils.data.DataLoader( | |
| ds, batch_size=batch_size, pin_memory=True, num_workers=num_workers | |
| ) | |
| for x, y in dl: | |
| x = x.to(device, non_blocking=True) | |
| y = y.to(device, non_blocking=True) | |
| yield x, y | |
| # ----------------------------------------------------------------------------- | |
| # CLI for constructing the dataset | |
| if __name__ == "__main__": | |
| """ | |
| These stages are designed to be run in order. | |
| To tokenize data with the Llama 2 tokenizer: | |
| python tinystories.py download | |
| python tinystories.py pretokenize | |
| To tokenize data with a custom tokenizer we train ourselves with sentencepiece, e.g.: | |
| python tinystories.py download | |
| python tinystories.py train_vocab --vocab_size=2048 | |
| python tinystories.py pretokenize --vocab_size=2048 | |
| """ | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("stage", type=str, choices=["download", "pretokenize", "train_vocab"]) | |
| parser.add_argument("--vocab_size", type=int, default=0, help="pretokenization vocab size. 0 = use Llama 2 tokenizer.") | |
| args = parser.parse_args() | |
| # depending on the stage call the appropriate function | |
| if args.stage == "download": | |
| download() | |
| elif args.stage == "train_vocab": | |
| train_vocab(vocab_size=args.vocab_size) | |
| elif args.stage == "pretokenize": | |
| pretokenize(vocab_size=args.vocab_size) | |
| else: | |
| raise ValueError(f"Unknown stage {args.stage}") | |