# Copyright (c) Microsoft Corporation. # SPDX-License-Identifier: Apache-2.0 # DeepSpeed Team """ Functionality of swapping optimizer tensors to/from (NVMe) storage devices. """ import torch import os import time from deepspeed.ops.aio import AsyncIOBuilder from deepspeed.ops.op_builder import GDSBuilder from deepspeed.accelerator import get_accelerator from .test_ds_aio_utils import task_log, create_filename, create_file, create_page_locked_tensor from .ds_aio_constants import * class AIOHandle_Engine(object): def __init__(self, args, tid, read_op): self.ctxt = self._create_context(args, tid, read_op) def fini(self): for buf in [BUFFER, BOUNCE_BUFFER]: if self.ctxt[buf] is not None: if self.ctxt[USE_CPU_LOCKED_TENSOR]: self.ctxt[HANDLE].free_cpu_locked_tensor(self.ctxt[buf]) self.ctxt[buf].detach() self.ctxt[buf] = None def read(self, args, tid, loop_id): handle = self.ctxt[HANDLE] start_time = time.time() dest_buffer = BOUNCE_BUFFER if self.ctxt[BOUNCE_BUFFER] is not None else BUFFER ret = handle.pread(self.ctxt[dest_buffer], self.ctxt[FILE][loop_id], args.validate, True) assert ret != -1 handle.wait() if dest_buffer == BOUNCE_BUFFER: self.ctxt[BUFFER].data.copy_(self.ctxt[BOUNCE_BUFFER].data) end_time = time.time() self.ctxt[ELAPSED_SEC].append(end_time - start_time) def write(self, args, tid, loop_id): # Avoid overwriting existing files as it could be artificially faster # if os.path.isfile(self.ctxt[FILE]): # os.remove(self.ctxt[FILE]) handle = self.ctxt[HANDLE] start_time = time.time() if self.ctxt[BOUNCE_BUFFER] is not None: source_buffer = BOUNCE_BUFFER self.ctxt[BOUNCE_BUFFER].data.copy_(self.ctxt[BUFFER].data) else: source_buffer = BUFFER ret = handle.pwrite(self.ctxt[source_buffer], self.ctxt[FILE][loop_id], args.validate, True) assert ret != -1 handle.wait() end_time = time.time() self.ctxt[ELAPSED_SEC].append(end_time - start_time) def _create_files(self, args, folder, tid): if args.different_file_each_iteration: filenames = [ create_filename(folder, args.read, args.io_size, f'{tid}_{l}') for l in range(args.total_loops) ] else: filenames = [ create_filename(folder, args.read, args.io_size, f'{tid}_{0}') for _ in range(args.total_loops) ] if args.read: for f in filenames: if not (os.path.isfile(f) and os.path.getsize(f) == args.io_size): create_file(f, args.io_size) else: for f in filenames: if os.path.isfile(f): os.remove(f) return filenames def _create_context(self, args, tid, read_op): io_string = "Read" if read_op else "Write" device_id, folder = args.mapping_list[tid] filenames = self._create_files(args, folder, tid) gds = True if args.use_gds else False io_parallel = args.io_parallel if args.io_parallel else 1 if gds: handle = GDSBuilder().load().gds_handle(args.block_size, args.queue_depth, args.single_submit, not args.sequential_requests, io_parallel) else: handle = AsyncIOBuilder().load().aio_handle(args.block_size, args.queue_depth, args.single_submit, not args.sequential_requests, io_parallel) task_log(tid, f'Created DeepNVMe handle engine') bounce_buffer = None if args.gpu: device_name = get_accelerator().device_name(device_id) buffer = torch.randint(high=128, size=(args.io_size, ), dtype=torch.uint8, device=device_name) if gds: handle.pin_device_tensor(buffer) elif not args.slow_bounce_buffer: bounce_buffer = create_page_locked_tensor(args.io_size, args.use_accelerator_pin_memory, handle) else: buffer = create_page_locked_tensor(args.io_size, args.use_accelerator_pin_memory, handle) task_log(tid, f'Allocate tensor of size {args.io_size} bytes') ctxt = {} ctxt[FILE] = filenames ctxt[NUM_BYTES] = args.io_size ctxt[HANDLE] = handle ctxt[USE_GDS] = gds ctxt[BUFFER] = buffer ctxt[BOUNCE_BUFFER] = bounce_buffer ctxt[ELAPSED_SEC] = [] ctxt[USE_CPU_LOCKED_TENSOR] = not args.use_accelerator_pin_memory task_log(tid, f'{io_string} file {filenames} of size {args.io_size} bytes from buffer on device {buffer.device}', force=True) return ctxt