|
|
|
|
|
|
|
|
|
""" |
|
Functionality of swapping optimizer tensors to/from (NVMe) storage devices. |
|
""" |
|
|
|
import torch |
|
import os |
|
import time |
|
from deepspeed.ops.aio import AsyncIOBuilder |
|
from deepspeed.ops.op_builder import GDSBuilder |
|
from deepspeed.accelerator import get_accelerator |
|
from .test_ds_aio_utils import task_log, create_filename, create_file, create_page_locked_tensor |
|
from .ds_aio_constants import * |
|
|
|
|
|
class AIOHandle_Engine(object): |
|
|
|
def __init__(self, args, tid, read_op): |
|
self.ctxt = self._create_context(args, tid, read_op) |
|
|
|
def fini(self): |
|
for buf in [BUFFER, BOUNCE_BUFFER]: |
|
if self.ctxt[buf] is not None: |
|
if self.ctxt[USE_CPU_LOCKED_TENSOR]: |
|
self.ctxt[HANDLE].free_cpu_locked_tensor(self.ctxt[buf]) |
|
|
|
self.ctxt[buf].detach() |
|
self.ctxt[buf] = None |
|
|
|
def read(self, args, tid, loop_id): |
|
handle = self.ctxt[HANDLE] |
|
|
|
start_time = time.time() |
|
dest_buffer = BOUNCE_BUFFER if self.ctxt[BOUNCE_BUFFER] is not None else BUFFER |
|
ret = handle.pread(self.ctxt[dest_buffer], self.ctxt[FILE][loop_id], args.validate, True) |
|
assert ret != -1 |
|
handle.wait() |
|
if dest_buffer == BOUNCE_BUFFER: |
|
self.ctxt[BUFFER].data.copy_(self.ctxt[BOUNCE_BUFFER].data) |
|
end_time = time.time() |
|
self.ctxt[ELAPSED_SEC].append(end_time - start_time) |
|
|
|
def write(self, args, tid, loop_id): |
|
|
|
|
|
|
|
|
|
handle = self.ctxt[HANDLE] |
|
start_time = time.time() |
|
if self.ctxt[BOUNCE_BUFFER] is not None: |
|
source_buffer = BOUNCE_BUFFER |
|
self.ctxt[BOUNCE_BUFFER].data.copy_(self.ctxt[BUFFER].data) |
|
else: |
|
source_buffer = BUFFER |
|
ret = handle.pwrite(self.ctxt[source_buffer], self.ctxt[FILE][loop_id], args.validate, True) |
|
assert ret != -1 |
|
handle.wait() |
|
end_time = time.time() |
|
self.ctxt[ELAPSED_SEC].append(end_time - start_time) |
|
|
|
def _create_files(self, args, folder, tid): |
|
if args.different_file_each_iteration: |
|
filenames = [ |
|
create_filename(folder, args.read, args.io_size, f'{tid}_{l}') for l in range(args.total_loops) |
|
] |
|
else: |
|
filenames = [ |
|
create_filename(folder, args.read, args.io_size, f'{tid}_{0}') for _ in range(args.total_loops) |
|
] |
|
|
|
if args.read: |
|
for f in filenames: |
|
if not (os.path.isfile(f) and os.path.getsize(f) == args.io_size): |
|
create_file(f, args.io_size) |
|
else: |
|
for f in filenames: |
|
if os.path.isfile(f): |
|
os.remove(f) |
|
|
|
return filenames |
|
|
|
def _create_context(self, args, tid, read_op): |
|
io_string = "Read" if read_op else "Write" |
|
device_id, folder = args.mapping_list[tid] |
|
filenames = self._create_files(args, folder, tid) |
|
|
|
gds = True if args.use_gds else False |
|
io_parallel = args.io_parallel if args.io_parallel else 1 |
|
if gds: |
|
handle = GDSBuilder().load().gds_handle(args.block_size, args.queue_depth, args.single_submit, |
|
not args.sequential_requests, io_parallel) |
|
else: |
|
handle = AsyncIOBuilder().load().aio_handle(args.block_size, args.queue_depth, args.single_submit, |
|
not args.sequential_requests, io_parallel) |
|
task_log(tid, f'Created DeepNVMe handle engine') |
|
|
|
bounce_buffer = None |
|
if args.gpu: |
|
device_name = get_accelerator().device_name(device_id) |
|
buffer = torch.randint(high=128, size=(args.io_size, ), dtype=torch.uint8, device=device_name) |
|
if gds: |
|
handle.pin_device_tensor(buffer) |
|
elif not args.slow_bounce_buffer: |
|
bounce_buffer = create_page_locked_tensor(args.io_size, args.use_accelerator_pin_memory, handle) |
|
else: |
|
buffer = create_page_locked_tensor(args.io_size, args.use_accelerator_pin_memory, handle) |
|
task_log(tid, f'Allocate tensor of size {args.io_size} bytes') |
|
|
|
ctxt = {} |
|
ctxt[FILE] = filenames |
|
ctxt[NUM_BYTES] = args.io_size |
|
ctxt[HANDLE] = handle |
|
ctxt[USE_GDS] = gds |
|
ctxt[BUFFER] = buffer |
|
ctxt[BOUNCE_BUFFER] = bounce_buffer |
|
ctxt[ELAPSED_SEC] = [] |
|
ctxt[USE_CPU_LOCKED_TENSOR] = not args.use_accelerator_pin_memory |
|
|
|
task_log(tid, |
|
f'{io_string} file {filenames} of size {args.io_size} bytes from buffer on device {buffer.device}', |
|
force=True) |
|
|
|
return ctxt |
|
|