|
|
|
|
|
|
|
|
|
""" |
|
Functionality of swapping optimizer tensors to/from (NVMe) storage devices. |
|
""" |
|
import os |
|
import sys |
|
import argparse |
|
import json |
|
import itertools |
|
import shutil |
|
|
|
from deepspeed.ops.op_builder import AsyncIOBuilder, GDSBuilder |
|
from .ds_aio_job import Job, run_job |
|
from .perf_sweep_utils import READ_OP_DESC, WRITE_OP_DESC, BENCH_LOG_DIR, \ |
|
READ_LOG_DIR, WRITE_LOG_DIR |
|
|
|
OTHER_OPTIONS = '--engine aio_handle' |
|
PERF_SCRIPT = 'ds_io' |
|
DEFAULT_SWEEP_CONFIG = { |
|
"block_size": ["1M", "8M"], |
|
"queue_depth": [32, 128], |
|
"sequential_requests": [True, False], |
|
"single_submit": [False, True], |
|
"io_parallel": [1, 2, 4, 8], |
|
} |
|
|
|
|
|
class SweepConfig(object): |
|
|
|
def __init__(self, args): |
|
self.folder_to_device_mapping = get_ftd_map(args.nvme_dir) |
|
self.search_space = get_sweep_config_dict(args.sweep_config) |
|
self.search_space.update(self.folder_to_device_mapping) |
|
self.read = not args.no_read |
|
self.write = not args.no_write |
|
self.flush_cache = args.flush_page_cache |
|
self.log_dir = args.log_dir |
|
self.verbose = args.verbose |
|
self.other_options = f'{OTHER_OPTIONS} --loops {args.loops} --io_size {args.io_size}' |
|
if args.gpu: |
|
self.other_options += ' --gpu' |
|
if args.gds: |
|
self.other_options += ' --use_gds' |
|
|
|
|
|
def validate_arguments(args): |
|
if not async_io_setup(): |
|
error_msg = """ |
|
Failing because environment is not properly configured for deepspeed async i/o module. |
|
Possible fix: apt install libaio-dev. |
|
""" |
|
print(error_msg) |
|
quit() |
|
|
|
if args.gds and not gds_io_setup(): |
|
error_msg = """ |
|
Failing because environment is not properly configured for deepspeed GDS I/O operator. |
|
""" |
|
print(error_msg) |
|
quit() |
|
|
|
|
|
def parse_sweep_arguments(): |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument('--nvme_dir', |
|
nargs='+', |
|
required=True, |
|
help='Directory in which to perform I/O tests. A writeable directory on a NVMe device.') |
|
|
|
parser.add_argument('--sweep_config', type=str, default=None, help='Performance sweep configuration json file.') |
|
|
|
parser.add_argument('--no_read', action='store_true', help='Disable read performance measurements.') |
|
|
|
parser.add_argument('--no_write', action='store_true', help='Disable write performance measurements.') |
|
|
|
parser.add_argument('--io_size', |
|
type=str, |
|
default="400M", |
|
help='Number of I/O bytes to read/write for performance measurements.') |
|
|
|
parser.add_argument('--gpu', action='store_true', help='Test tensor transfers between GPU device and NVME device.') |
|
|
|
parser.add_argument('--gds', action='store_true', help='Run the sweep over NVIDIA GPUDirectStorage operator') |
|
|
|
parser.add_argument( |
|
'--flush_page_cache', |
|
action='store_true', |
|
help= |
|
'Page cache will not be flushed and reported read speeds may be higher than actual ***Requires sudo access***.' |
|
) |
|
|
|
parser.add_argument( |
|
'--log_dir', |
|
type=str, |
|
default=BENCH_LOG_DIR, |
|
help=f'Output directory for performance log files. Default is {os.path.join(".", BENCH_LOG_DIR)}') |
|
|
|
parser.add_argument('--loops', type=int, default=1, help='Count of operation repetitions') |
|
|
|
parser.add_argument('--verbose', action='store_true', help='Print debugging information.') |
|
|
|
args = parser.parse_args() |
|
if args.verbose: |
|
print(f'args = {args}') |
|
validate_arguments(args) |
|
|
|
return args |
|
|
|
|
|
def dump_cmd_lines(cmd_lines): |
|
print(f'cmd line count = {len(cmd_lines)}') |
|
for i, cmd in enumerate(cmd_lines): |
|
print(f'{i}: {cmd}') |
|
|
|
|
|
def get_ftd_map(nvme_dir_list): |
|
ftd_list = [f'{dir}:{dev}' for dev, dir in enumerate(nvme_dir_list)] |
|
ftd_arg = [' '.join(ftd for ftd in ftd_list)] |
|
return {'folder_to_device_mapping': ftd_arg} |
|
|
|
|
|
def get_sweep_config_dict(sweep_config_json): |
|
if sweep_config_json is None: |
|
return DEFAULT_SWEEP_CONFIG |
|
|
|
with open(sweep_config_json) as fp: |
|
sweep_config = json.load(fp) |
|
return sweep_config |
|
|
|
|
|
def get_sweep_cmd_lines(sweep_config_dict): |
|
|
|
def flatten_options(key, value_list): |
|
flat_list = [] |
|
for v in value_list: |
|
if not type(v) is bool: |
|
flat_list.append(f'--{key} {v}') |
|
elif v: |
|
flat_list.append(f'--{key}') |
|
else: |
|
flat_list.append(' ') |
|
|
|
return flat_list |
|
|
|
flat_list = [flatten_options(key, value) for key, value in sweep_config_dict.items()] |
|
cmd_list = list(itertools.product(*flat_list)) |
|
cmd_list = [list(cmd) for cmd in cmd_list] |
|
|
|
return cmd_list |
|
|
|
|
|
def launch_sweep(sweep_jobs, sync_job, flush_cache_job, verbose): |
|
for perf_job in sweep_jobs: |
|
if flush_cache_job is not None: |
|
run_job(sync_job, verbose) |
|
run_job(flush_cache_job, verbose) |
|
|
|
run_job(perf_job, verbose) |
|
|
|
run_job(sync_job, verbose) |
|
|
|
|
|
def create_cmd_tags(cmd_line): |
|
tags = {} |
|
for param_value in cmd_line: |
|
fields = param_value.split() |
|
if len(fields) == 1: |
|
tags[fields[0]] = None |
|
elif len(fields) == 2: |
|
if fields[0] == '--folder_to_device_mapping': |
|
tags[fields[0]] = len(fields[1:]) |
|
else: |
|
tags[fields[0]] = fields[1] |
|
elif len(fields) > 2: |
|
tags[fields[0]] = len(fields[1:]) |
|
return tags |
|
|
|
|
|
def get_log_file(io_op_desc, cmd_line): |
|
QUEUE_DEPTH = "--queue_depth" |
|
BLOCK_SIZE = "--block_size" |
|
SINGLE_SUBMIT = "--single_submit" |
|
SEQUENTIAL_REQUESTS = "--sequential_requests" |
|
FTD_MAP = "--folder_to_device_mapping" |
|
IO_PARALLEL = "--io_parallel" |
|
|
|
tag_map = { |
|
QUEUE_DEPTH: "d", |
|
BLOCK_SIZE: "bs", |
|
SINGLE_SUBMIT: "single", |
|
SEQUENTIAL_REQUESTS: "sequential", |
|
FTD_MAP: "ftd", |
|
IO_PARALLEL: "p" |
|
} |
|
|
|
tag_default = { |
|
QUEUE_DEPTH: 1, |
|
BLOCK_SIZE: "1M", |
|
SINGLE_SUBMIT: "block", |
|
SEQUENTIAL_REQUESTS: "overlap", |
|
FTD_MAP: 1, |
|
IO_PARALLEL: 1 |
|
} |
|
|
|
def get_default_value(tag): |
|
value = tag_default[tag] |
|
if tag in [SINGLE_SUBMIT, SEQUENTIAL_REQUESTS]: |
|
return value |
|
return f'{tag_map[tag]}{value}' |
|
|
|
def get_config_value(tag, value): |
|
tag_key = tag_map[tag] |
|
if value is None: |
|
return tag_key |
|
return f'{tag_key}{value}' |
|
|
|
tag_list = [SINGLE_SUBMIT, SEQUENTIAL_REQUESTS, FTD_MAP, QUEUE_DEPTH, BLOCK_SIZE, IO_PARALLEL] |
|
log_tags = [io_op_desc] |
|
cmd_tags = create_cmd_tags(cmd_line) |
|
for tag in tag_list: |
|
if tag in cmd_tags: |
|
log_tags.append(get_config_value(tag, cmd_tags[tag])) |
|
else: |
|
log_tags.append(get_default_value(tag)) |
|
|
|
log_file = '_'.join(log_tags) |
|
log_file += '.txt' |
|
return log_file |
|
|
|
|
|
def create_perf_jobs(io_op_desc, log_dir, cmd_lines): |
|
py_cmd = [os.path.join(script_path(), PERF_SCRIPT)] |
|
|
|
perf_jobs = [] |
|
for cmd in cmd_lines: |
|
log_file = os.path.join(log_dir, get_log_file(io_op_desc, cmd)) |
|
job = Job(cmd_line=py_cmd + cmd, output_file=log_file) |
|
perf_jobs.append(job) |
|
|
|
return perf_jobs |
|
|
|
|
|
def script_path(): |
|
return os.path.dirname(os.path.realpath(sys.argv[0])) |
|
|
|
|
|
def async_io_setup(): |
|
return AsyncIOBuilder().is_compatible() |
|
|
|
|
|
def gds_io_setup(): |
|
return GDSBuilder().is_compatible() |
|
|
|
|
|
def remove_folder(folder): |
|
assert os.path.isdir(folder), f"Error: cannot remove {folder} - folder not found" |
|
shutil.rmtree(folder) |
|
|
|
|
|
def run_read_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines): |
|
read_cmd_lines = [[f'--read {sweep_config.other_options}'] + cmd for cmd in cmd_lines] |
|
|
|
|
|
log_folder = os.path.join(sweep_config.log_dir, f'{READ_LOG_DIR}') |
|
os.makedirs(log_folder, exist_ok=True) |
|
|
|
perf_jobs = create_perf_jobs(io_op_desc=READ_OP_DESC, log_dir=log_folder, cmd_lines=read_cmd_lines) |
|
|
|
launch_sweep(sweep_jobs=perf_jobs, |
|
sync_job=sync_job, |
|
flush_cache_job=flush_cache_job, |
|
verbose=sweep_config.verbose) |
|
|
|
|
|
def run_write_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines): |
|
write_cmd_lines = [[f'{sweep_config.other_options}'] + cmd for cmd in cmd_lines] |
|
|
|
|
|
log_folder = os.path.join(sweep_config.log_dir, f'{WRITE_LOG_DIR}') |
|
os.makedirs(log_folder, exist_ok=True) |
|
|
|
perf_jobs = create_perf_jobs(io_op_desc=WRITE_OP_DESC, log_dir=log_folder, cmd_lines=write_cmd_lines) |
|
|
|
launch_sweep(sweep_jobs=perf_jobs, |
|
sync_job=sync_job, |
|
flush_cache_job=flush_cache_job, |
|
verbose=sweep_config.verbose) |
|
|
|
|
|
def sweep_main(args): |
|
sweep_config = SweepConfig(args) |
|
cmd_lines = get_sweep_cmd_lines(sweep_config.search_space) |
|
|
|
if sweep_config.flush_cache: |
|
flush_cache_job = Job(cmd_line=['sudo', 'bash -c', "'echo 1 > /proc/sys/vm/drop_caches'"]) |
|
else: |
|
flush_cache_job = None |
|
|
|
sync_job = Job(cmd_line=['sync']) |
|
|
|
if sweep_config.read: |
|
run_read_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines) |
|
|
|
if sweep_config.write: |
|
run_write_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines) |
|
|
|
|
|
def main(): |
|
args = parse_sweep_arguments() |
|
print(f"Running DeepNVMe performance sweep on {args.nvme_dir}") |
|
sweep_main(args) |
|
|
|
|
|
if __name__ == "__main__": |
|
sweep_main() |
|
|