File size: 8,458 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
"""
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""

import argparse
import os
from .test_ds_aio_utils import refine_integer_value
from .ds_aio_constants import AIO_HANDLE, AIO_BASIC, TORCH_FAST_IO, TORCH_IO, VALID_ENGINES
from deepspeed.accelerator import get_accelerator

MAPPING_DELIMITER = ':'


def refine_args(args):
    if args.io_size and type(args.io_size) == str:
        args.io_size = refine_integer_value(args.io_size)

    if args.block_size and type(args.block_size) == str:
        args.block_size = refine_integer_value(args.block_size)

    if args.fast_io_size and type(args.fast_io_size) == str:
        args.fast_io_size = refine_integer_value(args.fast_io_size)

    return args


def _get_mapping_dict(args):
    if args.folder is not None:
        d = {i: args.folder for i in range(args.multi_process)}
    else:
        d = {}
        for m in args.folder_to_device_mapping:
            fields = m.split(MAPPING_DELIMITER)
            d[fields[1]] = fields[0]

    return d


def _validate_folder_mapping(args):
    no_error = True
    error_messages = []
    invalid_mappings = [m for m in args.folder_to_device_mapping if MAPPING_DELIMITER not in m]
    if len(invalid_mappings) > 0:
        error_messages.append(
            f'Missing delimiter ({MAPPING_DELIMITER}) in folder_to_device_mapping {invalid_mappings}')
        no_error = False

    folder_list = [m.split(MAPPING_DELIMITER)[0] for m in args.folder_to_device_mapping]
    invalid_folders = [d for d in folder_list if not os.path.exists(d)]
    if len(invalid_folders) > 0:
        error_messages.append(f'Invalid folders in folder_to_device_mapping: {invalid_folders}')
        no_error = False

    if args.gpu:
        device_list = [int(m.split(MAPPING_DELIMITER)[1]) for m in args.folder_to_device_mapping]
        invalid_device_list = [dev_id for dev_id in device_list if not dev_id < get_accelerator().device_count()]
        if len(invalid_device_list) > 0:
            error_messages.append(f'Invalid device ids in folder_to_device_mapping: {invalid_device_list}')
            no_error = False

    return no_error, error_messages


def validate_args(args):
    no_error = True
    error_messages = []

    if args.folder is not None and len(args.folder_to_device_mapping) > 0:
        error_messages.append(f'--folder and --folder_to_device_mapping cannot be specified together.')
        no_error = False
    elif args.folder is None and len(args.folder_to_device_mapping) == 0:
        error_messages.append(f'At least one of --folder or --folder_to_device_mapping must be specified.')
        no_error = False

    # Validate --folder
    if args.folder is not None and not os.path.exists(args.folder):
        no_error = False
        error_messages.append(f'Invalid folder in --folder: {args.folder} ')

    # Validate --folder_mapping_to_device
    if len(args.folder_to_device_mapping) > 0:
        no_mapping_error, mapping_error_messages = _validate_folder_mapping(args)
        no_error = no_error and no_mapping_error
        error_messages += mapping_error_messages

    # Validate --engine
    if args.engine not in VALID_ENGINES:
        no_error = False
        error_messages.append(f'Invalid engine {args.engine}. Valid options = {VALID_ENGINES}')

    # Validate --engine=torch_io
    if args.engine == TORCH_IO:
        if args.read:
            no_error = False
            error_messages.append(f'Read not currently supported for --engine={TORCH_IO}')

    if not no_error:
        print(f'Found {len(error_messages)} validation error(s)')
    # Validate --gpu, --use_gds
    if args.use_gds and not args.gpu:
        error_messages.append(f'--gpu must be set to transfer with --use_gds')
        no_error = False

    if not no_error:
        print(f'Found {len(error_messages)} validation errors')
        for i, msg in enumerate(error_messages):
            print(f'{i+1}: {msg}')

    return no_error


def parse_arguments():
    parser = argparse.ArgumentParser()

    parser.add_argument('--folder', default=None, type=str, help='Folder to use for I/O.')

    parser.add_argument('--folder_to_device_mapping',
                        default=[],
                        nargs='+',
                        help='Specification of mapping of folder to (gpu) device id, (ignored for cpu accesses).'
                        'Can be specified multiple times for multi-process runs,'
                        'e.g. --folder_to_device_mapping /mnt/nvme0:0 --folder_to_device_mapping /mnt/nvme1:15 --gpu'
                        'means access /mnt/nvme0 with gpu 0 and /mnt/nvme1 with gpu 15')

    parser.add_argument('--io_size', type=str, default=None, required=True, help='Number of bytes to read or write.')

    parser.add_argument('--fast_io_size', type=str, default='64M', help='Size of fast_io pinned buffer (bytes).')

    parser.add_argument('--read', action='store_true', help='Perform read I/O (default is write)')

    parser.add_argument('--multi_process',
                        type=int,
                        default=1,
                        help='Number of parallel processes doing I/O (default 1).')

    parser.add_argument('--block_size',
                        type=str,
                        default='1M',
                        help='I/O block size. Can use K, M, or G suffix (default 1M for 1 megabytes).')

    parser.add_argument('--queue_depth', type=int, default=32, help='I/O queue depth (default 32).')

    parser.add_argument('--single_submit',
                        action='store_true',
                        help='Submit I/O requests in singles (default is submit queue_depth amount at once.).')

    parser.add_argument(
        '--sequential_requests',
        action='store_true',
        help=
        'Delay I/O request submission until completion of prior requests (default is overlap I/O submission and completion requests.).'
    )

    parser.add_argument('--validate', action='store_true', help='Perform validation of I/O transfer in library.')

    parser.add_argument(
        '--engine',
        type=str,
        default=AIO_HANDLE,
        help=
        f'Engine to perform I/O. Options are [{AIO_HANDLE}, {AIO_BASIC}, {TORCH_IO}, {TORCH_FAST_IO}]. Default is aio_handle'
    )

    parser.add_argument('--loops', type=int, default=3, help='Count of operation repetitions')

    parser.add_argument('--io_parallel', type=int, default=None, help='Per iop parallelism')

    parser.add_argument('--gpu', action='store_true', help='Use GPU memory')

    parser.add_argument('--use_gds', action='store_true', help='Enable GDS AIO')

    parser.add_argument('--slow_bounce_buffer',
                        action='store_true',
                        help='For GPU memory transfers, measure impact of bounce buffer pinning on critical path.')

    parser.add_argument('--torch_legacy_save', action='store_true', help='Use torch legacy save approach')

    parser.add_argument('--use_accelerator_pin_memory',
                        action='store_true',
                        help='Obtain pinned (CPU page-locked) tensors from accelerator')

    parser.add_argument('--warmup_loops', type=int, default=1, help='Count of operation warmup repetitions')

    parser.add_argument('--include_warmup_time', action='store_true', help='Include warmup latency in results')

    parser.add_argument('--different_file_each_iteration',
                        action='store_true',
                        help='Read/write a different file on each iteration.')

    args = parser.parse_args()
    print(f'args = {args}')
    return args


def get_validated_args():
    args = parse_arguments()
    args = refine_args(args)
    if not validate_args(args):
        quit()
    print(f'Successful validation of command line arguments')
    args.total_loops = args.warmup_loops + args.loops
    peer_tag = 'gpu' if args.gpu else 'process'
    args.mapping_dict = _get_mapping_dict(args)
    args.mapping_list = [(device_id, folder) for device_id, folder in args.mapping_dict.items()]
    assert len(args.mapping_dict) == len(args.mapping_list)
    print(f'Configuring {len(args.mapping_list)} {peer_tag} to folder mapping')
    for i, (device_id, folder) in enumerate(args.mapping_list):
        print(f'[{i}]: {peer_tag} {device_id} <----> {folder}')

    return args