File size: 5,619 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""Implementation of the abstract runner class.

This class defines the interface that the W&B launch runner uses to manage the lifecycle
of runs launched in different environments (e.g. runs launched locally or in a cluster).
"""

import logging
import os
import shutil
import subprocess
import sys
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Literal, Optional, Union

import wandb
from wandb.apis.internal import Api
from wandb.sdk.lib import runid

from .._project_spec import LaunchProject

_logger = logging.getLogger(__name__)


State = Literal[
    "unknown",
    "starting",
    "running",
    "failed",
    "finished",
    "stopping",
    "stopped",
    "preempted",
]


class Status:
    def __init__(self, state: "State" = "unknown", messages: List[str] = None):  # type: ignore
        self.state = state
        self.messages = messages or []

    def __repr__(self) -> "State":
        return self.state

    def __str__(self) -> str:
        return self.state

    def __eq__(self, __value: object) -> bool:
        if isinstance(__value, Status):
            return self.state == __value.state
        else:
            return self.state == __value

    def __hash__(self) -> int:
        return hash(self.state)


class AbstractRun(ABC):
    """Wrapper around a W&B launch run.

    A launched run is a subprocess running an entry point
    command, that exposes methods for waiting on and cancelling the run.
    This class defines the interface that the W&B launch runner uses to manage the lifecycle
    of runs launched in different environments (e.g. runs launched locally or in a cluster).
    ``AbstractRun`` is not thread-safe. That is, concurrent calls to wait() / cancel()
    from multiple threads may inadvertently kill resources (e.g. local processes) unrelated to the
    run.
    """

    def __init__(self) -> None:
        self._status = Status()

    @property
    def status(self) -> Status:
        return self._status

    @abstractmethod
    async def get_logs(self) -> Optional[str]:
        """Return the logs associated with the run."""

    def _run_cmd(
        self, cmd: List[str], output_only: Optional[bool] = False
    ) -> Optional[Union["subprocess.Popen[bytes]", bytes]]:
        """Run the command and returns a popen object or the stdout of the command.

        Arguments:
        cmd: The command to run
        output_only: If true just return the stdout bytes
        """
        try:
            env = os.environ
            popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
            if output_only:
                popen.wait()
                if popen.stdout is not None:
                    return popen.stdout.read()
            return popen
        except subprocess.CalledProcessError as e:
            wandb.termerror(f"Command failed: {e}")
            return None

    @abstractmethod
    async def wait(self) -> bool:
        """Wait for the run to finish, returning True if the run succeeded and false otherwise.

        Note that in some cases, we may wait until the remote job completes rather than until the W&B run completes.
        """

    @abstractmethod
    async def get_status(self) -> Status:
        """Get status of the run."""

    @abstractmethod
    async def cancel(self) -> None:
        """Cancel the run (interrupts the command subprocess, cancels the run, etc).

        Cancels the run and waits for it to terminate. The W&B run status may not be
        set correctly upon run cancellation.
        """

    @property
    @abstractmethod
    def id(self) -> Optional[str]:
        pass


class AbstractRunner(ABC):
    """Abstract plugin class defining the interface needed to execute W&B Launches.

    You can define subclasses of ``AbstractRunner`` and expose them as third-party
    plugins to enable running W&B projects against custom execution backends
    (e.g. to run projects against your team's in-house cluster or job scheduler).
    """

    _type: str

    def __init__(
        self,
        api: Api,
        backend_config: Dict[str, Any],
    ) -> None:
        self._api = api
        self.backend_config = backend_config
        self._cwd = os.getcwd()
        self._namespace = runid.generate_id()

    def find_executable(
        self,
        cmd: str,
    ) -> Union[str, None]:
        """Cross platform utility for checking if a program is available."""
        return shutil.which(cmd)

    @property
    def api_key(self) -> Any:
        return self._api.api_key

    def verify(self) -> bool:
        """This is called on first boot to verify the needed commands, and permissions are available.

        For now just call `wandb.termerror` and `sys.exit(1)`
        """
        if self._api.api_key is None:
            wandb.termerror(
                "Couldn't find W&B api key, run wandb login or set WANDB_API_KEY"
            )
            sys.exit(1)
        return True

    @abstractmethod
    async def run(
        self,
        launch_project: LaunchProject,
        image_uri: str,
    ) -> Optional[AbstractRun]:
        """Submit an LaunchProject to be run.

        Returns a SubmittedRun object to track the execution
        Arguments:
        launch_project: Object of _project_spec.LaunchProject class representing a wandb launch project

        Returns:
            A :py:class:`wandb.sdk.launch.runners.SubmittedRun`. This function is expected to run
            the project asynchronously, i.e. it should trigger project execution and then
            immediately return a `SubmittedRun` to track execution status.
        """