File size: 8,458 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
"""
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""
import argparse
import os
from .test_ds_aio_utils import refine_integer_value
from .ds_aio_constants import AIO_HANDLE, AIO_BASIC, TORCH_FAST_IO, TORCH_IO, VALID_ENGINES
from deepspeed.accelerator import get_accelerator
MAPPING_DELIMITER = ':'
def refine_args(args):
if args.io_size and type(args.io_size) == str:
args.io_size = refine_integer_value(args.io_size)
if args.block_size and type(args.block_size) == str:
args.block_size = refine_integer_value(args.block_size)
if args.fast_io_size and type(args.fast_io_size) == str:
args.fast_io_size = refine_integer_value(args.fast_io_size)
return args
def _get_mapping_dict(args):
if args.folder is not None:
d = {i: args.folder for i in range(args.multi_process)}
else:
d = {}
for m in args.folder_to_device_mapping:
fields = m.split(MAPPING_DELIMITER)
d[fields[1]] = fields[0]
return d
def _validate_folder_mapping(args):
no_error = True
error_messages = []
invalid_mappings = [m for m in args.folder_to_device_mapping if MAPPING_DELIMITER not in m]
if len(invalid_mappings) > 0:
error_messages.append(
f'Missing delimiter ({MAPPING_DELIMITER}) in folder_to_device_mapping {invalid_mappings}')
no_error = False
folder_list = [m.split(MAPPING_DELIMITER)[0] for m in args.folder_to_device_mapping]
invalid_folders = [d for d in folder_list if not os.path.exists(d)]
if len(invalid_folders) > 0:
error_messages.append(f'Invalid folders in folder_to_device_mapping: {invalid_folders}')
no_error = False
if args.gpu:
device_list = [int(m.split(MAPPING_DELIMITER)[1]) for m in args.folder_to_device_mapping]
invalid_device_list = [dev_id for dev_id in device_list if not dev_id < get_accelerator().device_count()]
if len(invalid_device_list) > 0:
error_messages.append(f'Invalid device ids in folder_to_device_mapping: {invalid_device_list}')
no_error = False
return no_error, error_messages
def validate_args(args):
no_error = True
error_messages = []
if args.folder is not None and len(args.folder_to_device_mapping) > 0:
error_messages.append(f'--folder and --folder_to_device_mapping cannot be specified together.')
no_error = False
elif args.folder is None and len(args.folder_to_device_mapping) == 0:
error_messages.append(f'At least one of --folder or --folder_to_device_mapping must be specified.')
no_error = False
# Validate --folder
if args.folder is not None and not os.path.exists(args.folder):
no_error = False
error_messages.append(f'Invalid folder in --folder: {args.folder} ')
# Validate --folder_mapping_to_device
if len(args.folder_to_device_mapping) > 0:
no_mapping_error, mapping_error_messages = _validate_folder_mapping(args)
no_error = no_error and no_mapping_error
error_messages += mapping_error_messages
# Validate --engine
if args.engine not in VALID_ENGINES:
no_error = False
error_messages.append(f'Invalid engine {args.engine}. Valid options = {VALID_ENGINES}')
# Validate --engine=torch_io
if args.engine == TORCH_IO:
if args.read:
no_error = False
error_messages.append(f'Read not currently supported for --engine={TORCH_IO}')
if not no_error:
print(f'Found {len(error_messages)} validation error(s)')
# Validate --gpu, --use_gds
if args.use_gds and not args.gpu:
error_messages.append(f'--gpu must be set to transfer with --use_gds')
no_error = False
if not no_error:
print(f'Found {len(error_messages)} validation errors')
for i, msg in enumerate(error_messages):
print(f'{i+1}: {msg}')
return no_error
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--folder', default=None, type=str, help='Folder to use for I/O.')
parser.add_argument('--folder_to_device_mapping',
default=[],
nargs='+',
help='Specification of mapping of folder to (gpu) device id, (ignored for cpu accesses).'
'Can be specified multiple times for multi-process runs,'
'e.g. --folder_to_device_mapping /mnt/nvme0:0 --folder_to_device_mapping /mnt/nvme1:15 --gpu'
'means access /mnt/nvme0 with gpu 0 and /mnt/nvme1 with gpu 15')
parser.add_argument('--io_size', type=str, default=None, required=True, help='Number of bytes to read or write.')
parser.add_argument('--fast_io_size', type=str, default='64M', help='Size of fast_io pinned buffer (bytes).')
parser.add_argument('--read', action='store_true', help='Perform read I/O (default is write)')
parser.add_argument('--multi_process',
type=int,
default=1,
help='Number of parallel processes doing I/O (default 1).')
parser.add_argument('--block_size',
type=str,
default='1M',
help='I/O block size. Can use K, M, or G suffix (default 1M for 1 megabytes).')
parser.add_argument('--queue_depth', type=int, default=32, help='I/O queue depth (default 32).')
parser.add_argument('--single_submit',
action='store_true',
help='Submit I/O requests in singles (default is submit queue_depth amount at once.).')
parser.add_argument(
'--sequential_requests',
action='store_true',
help=
'Delay I/O request submission until completion of prior requests (default is overlap I/O submission and completion requests.).'
)
parser.add_argument('--validate', action='store_true', help='Perform validation of I/O transfer in library.')
parser.add_argument(
'--engine',
type=str,
default=AIO_HANDLE,
help=
f'Engine to perform I/O. Options are [{AIO_HANDLE}, {AIO_BASIC}, {TORCH_IO}, {TORCH_FAST_IO}]. Default is aio_handle'
)
parser.add_argument('--loops', type=int, default=3, help='Count of operation repetitions')
parser.add_argument('--io_parallel', type=int, default=None, help='Per iop parallelism')
parser.add_argument('--gpu', action='store_true', help='Use GPU memory')
parser.add_argument('--use_gds', action='store_true', help='Enable GDS AIO')
parser.add_argument('--slow_bounce_buffer',
action='store_true',
help='For GPU memory transfers, measure impact of bounce buffer pinning on critical path.')
parser.add_argument('--torch_legacy_save', action='store_true', help='Use torch legacy save approach')
parser.add_argument('--use_accelerator_pin_memory',
action='store_true',
help='Obtain pinned (CPU page-locked) tensors from accelerator')
parser.add_argument('--warmup_loops', type=int, default=1, help='Count of operation warmup repetitions')
parser.add_argument('--include_warmup_time', action='store_true', help='Include warmup latency in results')
parser.add_argument('--different_file_each_iteration',
action='store_true',
help='Read/write a different file on each iteration.')
args = parser.parse_args()
print(f'args = {args}')
return args
def get_validated_args():
args = parse_arguments()
args = refine_args(args)
if not validate_args(args):
quit()
print(f'Successful validation of command line arguments')
args.total_loops = args.warmup_loops + args.loops
peer_tag = 'gpu' if args.gpu else 'process'
args.mapping_dict = _get_mapping_dict(args)
args.mapping_list = [(device_id, folder) for device_id, folder in args.mapping_dict.items()]
assert len(args.mapping_dict) == len(args.mapping_list)
print(f'Configuring {len(args.mapping_list)} {peer_tag} to folder mapping')
for i, (device_id, folder) in enumerate(args.mapping_list):
print(f'[{i}]: {peer_tag} {device_id} <----> {folder}')
return args
|