File size: 3,460 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""wandb server.

Start up socket transport servers.
"""

import logging
import os
import sys
from typing import Optional

import wandb

from . import _startup_debug, port_file
from .server_sock import SocketServer
from .streams import StreamMux


class WandbServer:
    _pid: Optional[int]
    _sock_port: Optional[int]
    _debug: bool
    _sock_server: Optional[SocketServer]
    _startup_debug_enabled: bool

    def __init__(
        self,
        sock_port: Optional[int] = None,
        port_fname: Optional[str] = None,
        address: Optional[str] = None,
        pid: Optional[int] = None,
        debug: bool = True,
    ) -> None:
        self._sock_port = sock_port
        self._port_fname = port_fname
        self._address = address
        self._pid = pid
        self._debug = debug
        self._sock_server = None
        self._startup_debug_enabled = _startup_debug.is_enabled()

        if debug:
            logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

    def _inform_used_ports(self, sock_port: Optional[int]) -> None:
        if not self._port_fname:
            return
        pf = port_file.PortFile(sock_port=sock_port)
        pf.write(self._port_fname)

    def _start_sock(self, mux: StreamMux) -> int:
        address: str = self._address or "127.0.0.1"
        port: int = self._sock_port or 0
        self._sock_server = SocketServer(mux=mux, address=address, port=port)
        try:
            self._sock_server.start()
            port = self._sock_server.port
            if self._pid:
                mux.set_pid(self._pid)
        except KeyboardInterrupt:
            mux.cleanup()
            raise
        except Exception:
            mux.cleanup()
            raise
        return port

    def _stop_servers(self) -> None:
        if self._sock_server:
            self._sock_server.stop()

    def _startup_debug_print(self, message: str) -> None:
        if not self._startup_debug_enabled:
            return
        _startup_debug.print_message(message)

    def _setup_proctitle(self, sock_port: Optional[int]) -> None:
        # TODO: the internal_process should have a better way to have access to
        # settings.
        disable_setproctitle = os.environ.get("WANDB_X_DISABLE_SETPROCTITLE")
        if disable_setproctitle:
            return

        setproctitle = wandb.util.get_optional_module("setproctitle")
        if setproctitle:
            service_ver = 2
            pid = str(self._pid or 0)
            transport = "s" if sock_port else "g"
            port = sock_port or 0
            # this format is similar to the service token, but it's purely informative now
            # (consider unifying this in the future)
            service_id = f"{service_ver}-{pid}-{transport}-{port}"
            proc_title = f"wandb-service({service_id})"
            self._startup_debug_print("before_setproctitle")
            setproctitle.setproctitle(proc_title)
            self._startup_debug_print("after_setproctitle")

    def serve(self) -> None:
        mux = StreamMux()
        self._startup_debug_print("before_network")
        sock_port = self._start_sock(mux=mux)
        self._startup_debug_print("after_network")
        self._inform_used_ports(sock_port=sock_port)
        self._startup_debug_print("after_inform")
        self._setup_proctitle(sock_port=sock_port)
        self._startup_debug_print("before_loop")
        mux.loop()
        self._stop_servers()