File size: 5,619 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
"""Implementation of the abstract runner class.
This class defines the interface that the W&B launch runner uses to manage the lifecycle
of runs launched in different environments (e.g. runs launched locally or in a cluster).
"""
import logging
import os
import shutil
import subprocess
import sys
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Literal, Optional, Union
import wandb
from wandb.apis.internal import Api
from wandb.sdk.lib import runid
from .._project_spec import LaunchProject
_logger = logging.getLogger(__name__)
State = Literal[
"unknown",
"starting",
"running",
"failed",
"finished",
"stopping",
"stopped",
"preempted",
]
class Status:
def __init__(self, state: "State" = "unknown", messages: List[str] = None): # type: ignore
self.state = state
self.messages = messages or []
def __repr__(self) -> "State":
return self.state
def __str__(self) -> str:
return self.state
def __eq__(self, __value: object) -> bool:
if isinstance(__value, Status):
return self.state == __value.state
else:
return self.state == __value
def __hash__(self) -> int:
return hash(self.state)
class AbstractRun(ABC):
"""Wrapper around a W&B launch run.
A launched run is a subprocess running an entry point
command, that exposes methods for waiting on and cancelling the run.
This class defines the interface that the W&B launch runner uses to manage the lifecycle
of runs launched in different environments (e.g. runs launched locally or in a cluster).
``AbstractRun`` is not thread-safe. That is, concurrent calls to wait() / cancel()
from multiple threads may inadvertently kill resources (e.g. local processes) unrelated to the
run.
"""
def __init__(self) -> None:
self._status = Status()
@property
def status(self) -> Status:
return self._status
@abstractmethod
async def get_logs(self) -> Optional[str]:
"""Return the logs associated with the run."""
def _run_cmd(
self, cmd: List[str], output_only: Optional[bool] = False
) -> Optional[Union["subprocess.Popen[bytes]", bytes]]:
"""Run the command and returns a popen object or the stdout of the command.
Arguments:
cmd: The command to run
output_only: If true just return the stdout bytes
"""
try:
env = os.environ
popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
if output_only:
popen.wait()
if popen.stdout is not None:
return popen.stdout.read()
return popen
except subprocess.CalledProcessError as e:
wandb.termerror(f"Command failed: {e}")
return None
@abstractmethod
async def wait(self) -> bool:
"""Wait for the run to finish, returning True if the run succeeded and false otherwise.
Note that in some cases, we may wait until the remote job completes rather than until the W&B run completes.
"""
@abstractmethod
async def get_status(self) -> Status:
"""Get status of the run."""
@abstractmethod
async def cancel(self) -> None:
"""Cancel the run (interrupts the command subprocess, cancels the run, etc).
Cancels the run and waits for it to terminate. The W&B run status may not be
set correctly upon run cancellation.
"""
@property
@abstractmethod
def id(self) -> Optional[str]:
pass
class AbstractRunner(ABC):
"""Abstract plugin class defining the interface needed to execute W&B Launches.
You can define subclasses of ``AbstractRunner`` and expose them as third-party
plugins to enable running W&B projects against custom execution backends
(e.g. to run projects against your team's in-house cluster or job scheduler).
"""
_type: str
def __init__(
self,
api: Api,
backend_config: Dict[str, Any],
) -> None:
self._api = api
self.backend_config = backend_config
self._cwd = os.getcwd()
self._namespace = runid.generate_id()
def find_executable(
self,
cmd: str,
) -> Union[str, None]:
"""Cross platform utility for checking if a program is available."""
return shutil.which(cmd)
@property
def api_key(self) -> Any:
return self._api.api_key
def verify(self) -> bool:
"""This is called on first boot to verify the needed commands, and permissions are available.
For now just call `wandb.termerror` and `sys.exit(1)`
"""
if self._api.api_key is None:
wandb.termerror(
"Couldn't find W&B api key, run wandb login or set WANDB_API_KEY"
)
sys.exit(1)
return True
@abstractmethod
async def run(
self,
launch_project: LaunchProject,
image_uri: str,
) -> Optional[AbstractRun]:
"""Submit an LaunchProject to be run.
Returns a SubmittedRun object to track the execution
Arguments:
launch_project: Object of _project_spec.LaunchProject class representing a wandb launch project
Returns:
A :py:class:`wandb.sdk.launch.runners.SubmittedRun`. This function is expected to run
the project asynchronously, i.e. it should trigger project execution and then
immediately return a `SubmittedRun` to track execution status.
"""
|