File size: 3,847 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import time
from multiprocessing import Pool, Barrier

from .ds_aio_constants import AIO_BASIC, TORCH_FAST_IO, TORCH_IO
from .test_ds_aio_utils import report_results, task_log, task_barrier
from .ds_aio_handle import AIOHandle_Engine
from .ds_aio_basic import AIOBasic_Engine
from .torch_io import TorchIO_Engine
from .torch_fastio_engine import Torch_FastIO_Engine


def prepare_operation(args, tid, read_op):
    if args.engine == TORCH_IO:
        io_engine = TorchIO_Engine(args, tid, read_op)
    elif args.engine == AIO_BASIC:
        io_engine = AIOBasic_Engine(args, tid, read_op)
    elif args.engine == TORCH_FAST_IO:
        io_engine = Torch_FastIO_Engine(args, tid, read_op)
    else:
        io_engine = AIOHandle_Engine(args, tid, read_op)

    return io_engine


def prepare_read(pool_params):
    args, tid = pool_params
    return prepare_operation(args, tid, True)


def prepare_write(pool_params):
    args, tid = pool_params
    return prepare_operation(args, tid, False)


def post_operation(pool_params):
    _, _, io_engine = pool_params
    io_engine.fini()


def read_operation(pool_params):
    args, tid, loop_id, io_engine = pool_params
    return io_engine.read(args, tid, loop_id)


def write_operation(pool_params):
    args, tid, loop_id, io_engine = pool_params
    return io_engine.write(args, tid, loop_id)


def get_schedule(args, read_op):
    schedule = {}
    if read_op:
        schedule['pre'] = prepare_read
        schedule['post'] = post_operation
        schedule['main'] = read_operation
    else:
        schedule['pre'] = prepare_write
        schedule['post'] = post_operation
        schedule['main'] = write_operation

    return schedule


def io_engine_tasklet(pool_params):
    args, tid, read_op = pool_params
    num_processes = len(args.mapping_dict)

    # Create schedule
    schedule = get_schedule(args, read_op)
    task_log(tid, f'schedule = {schedule}')
    task_barrier(aio_barrier, num_processes)

    # Run pre task
    task_log(tid, f'running pre-task')
    io_engine = schedule["pre"]((args, tid))
    task_barrier(aio_barrier, num_processes)

    # Run main tasks in a loop
    io_engine.ctxt["main_task_sec"] = []
    for i in range(args.total_loops):
        task_log(tid, f'running main task {i}')
        start_time = time.time()
        schedule["main"]((args, tid, i, io_engine))
        task_barrier(aio_barrier, num_processes)
        stop_time = time.time()
        io_engine.ctxt["main_task_sec"].append(stop_time - start_time)

    # Run post task
    task_log(tid, f'running post-task')
    schedule["post"]((args, tid, io_engine))
    task_barrier(aio_barrier, num_processes)

    ctxt = io_engine.ctxt
    # return ctxt["main_task_sec"], ctxt["elapsed_sec"], ctxt["num_bytes"] * args.loops
    if args.include_warmup_time:
        e2e_latency_sec = sum(ctxt["main_task_sec"])
        task_latency_sec = sum(ctxt["elapsed_sec"])
        actual_loops = args.total_loops
    else:
        e2e_latency_sec = sum(ctxt["main_task_sec"][args.warmup_loops:])
        task_latency_sec = sum(ctxt["elapsed_sec"][args.warmup_loops:])
        actual_loops = args.loops

    l = ctxt["elapsed_sec"]
    task_log(tid, f'task_latency_sec = {l}')
    return e2e_latency_sec, task_latency_sec, ctxt["num_bytes"] * actual_loops


def _init_takslet(b):
    global aio_barrier
    aio_barrier = b


def io_engine_multiprocessing(args, read_op):
    num_processes = len(args.mapping_dict)
    b = Barrier(num_processes)
    pool_params = [(args, p, read_op) for p in range(num_processes)]
    with Pool(processes=num_processes, initializer=_init_takslet, initargs=(b, )) as p:
        pool_results = p.map(io_engine_tasklet, pool_params)

    report_results(args, read_op, pool_results)