|
|
|
|
|
|
|
|
|
|
|
import os |
|
import sys |
|
import argparse |
|
import subprocess |
|
from deepspeed.utils import logger |
|
from deepspeed.launcher.constants import MPICH_LAUNCHER |
|
|
|
|
|
def parse_args(args=None): |
|
parser = argparse.ArgumentParser(description="DeepSpeed launcher helper to map environment variables for" |
|
"multi-node/multi-gpu training jobs.", |
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
|
|
|
parser.add_argument("--launcher", |
|
default=MPICH_LAUNCHER, |
|
type=str, |
|
help="(optional) choose launcher backend for multi-node " |
|
"training. Options currently include MPICH.") |
|
|
|
parser.add_argument("--module", |
|
action="store_true", |
|
help="Change each process to interpret the launch " |
|
"script as a Python module, executing with the same " |
|
"behavior as 'python -m'.") |
|
|
|
parser.add_argument("--no_python", |
|
action="store_true", |
|
help="Skip prepending the training script with " |
|
"'python' - just execute it directly.") |
|
|
|
parser.add_argument("user_script", type=str, help="User script to launch, followed by any required " |
|
"arguments.") |
|
|
|
parser.add_argument('user_args', nargs=argparse.REMAINDER) |
|
|
|
parser.add_argument("--bind_cores_to_rank", |
|
action="store_true", |
|
help="Bind each rank to different cores of the host") |
|
|
|
parser.add_argument("--bind_core_list", |
|
type=str, |
|
default=None, |
|
help="List of cores to bind to with comma separated list of " |
|
"numbers and range. i.e. 1,3-5,7 => [1,3,4,5,7]. When not " |
|
"specified, all cores on system would be used rank binding") |
|
|
|
return parser.parse_args(args=args) |
|
|
|
|
|
def env_mapping(env, rank_name_list=None, local_rank_name_list=None): |
|
rank = None |
|
for rank_name in rank_name_list: |
|
if rank_name in env: |
|
if rank == None: |
|
rank = env.get(rank_name) |
|
elif rank != env.get(rank_name): |
|
raise EnvironmentError(f"rank number doesn't match!") |
|
if rank == None: |
|
raise EnvironmentError(f"rank number is not in current env!") |
|
env['RANK'] = rank |
|
|
|
local_rank = None |
|
for local_rank_name in local_rank_name_list: |
|
if local_rank_name in env: |
|
if local_rank == None: |
|
local_rank = env.get(local_rank_name) |
|
elif local_rank != env.get(local_rank_name): |
|
raise EnvironmentError(f"local_rank number doesn't match!") |
|
if local_rank == None: |
|
raise EnvironmentError(f"rank number is not in current env!") |
|
env['LOCAL_RANK'] = local_rank |
|
|
|
return env |
|
|
|
|
|
def main(args=None): |
|
args = parse_args(args) |
|
|
|
env = os.environ.copy() |
|
|
|
args.launcher = args.launcher.lower() |
|
if args.launcher == MPICH_LAUNCHER: |
|
rank_name_list = ["PMIX_RANK"] + ["PMI_RANK"] |
|
local_rank_name_list = ["PALS_LOCAL_RANKID"] + ["MPI_LOCALRANKID"] |
|
env = env_mapping(env, rank_name_list=rank_name_list, local_rank_name_list=local_rank_name_list) |
|
else: |
|
raise NotImplementedError(f"Unknown launcher {args.launcher}") |
|
|
|
python_exec = [] |
|
if not args.no_python: |
|
python_exec += [sys.executable, "-u"] |
|
if args.module: |
|
python_exec.append("-m") |
|
cmd = python_exec + [args.user_script] + args.user_args |
|
|
|
logger.info(f"launcher_helper cmd = {' '.join(cmd)}") |
|
|
|
result = subprocess.Popen(cmd, env=env, close_fds=False) |
|
result.wait() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|