|
"""wandb server. |
|
|
|
Start up socket transport servers. |
|
""" |
|
|
|
import logging |
|
import os |
|
import sys |
|
from typing import Optional |
|
|
|
import wandb |
|
|
|
from . import _startup_debug, port_file |
|
from .server_sock import SocketServer |
|
from .streams import StreamMux |
|
|
|
|
|
class WandbServer: |
|
_pid: Optional[int] |
|
_sock_port: Optional[int] |
|
_debug: bool |
|
_sock_server: Optional[SocketServer] |
|
_startup_debug_enabled: bool |
|
|
|
def __init__( |
|
self, |
|
sock_port: Optional[int] = None, |
|
port_fname: Optional[str] = None, |
|
address: Optional[str] = None, |
|
pid: Optional[int] = None, |
|
debug: bool = True, |
|
) -> None: |
|
self._sock_port = sock_port |
|
self._port_fname = port_fname |
|
self._address = address |
|
self._pid = pid |
|
self._debug = debug |
|
self._sock_server = None |
|
self._startup_debug_enabled = _startup_debug.is_enabled() |
|
|
|
if debug: |
|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) |
|
|
|
def _inform_used_ports(self, sock_port: Optional[int]) -> None: |
|
if not self._port_fname: |
|
return |
|
pf = port_file.PortFile(sock_port=sock_port) |
|
pf.write(self._port_fname) |
|
|
|
def _start_sock(self, mux: StreamMux) -> int: |
|
address: str = self._address or "127.0.0.1" |
|
port: int = self._sock_port or 0 |
|
self._sock_server = SocketServer(mux=mux, address=address, port=port) |
|
try: |
|
self._sock_server.start() |
|
port = self._sock_server.port |
|
if self._pid: |
|
mux.set_pid(self._pid) |
|
except KeyboardInterrupt: |
|
mux.cleanup() |
|
raise |
|
except Exception: |
|
mux.cleanup() |
|
raise |
|
return port |
|
|
|
def _stop_servers(self) -> None: |
|
if self._sock_server: |
|
self._sock_server.stop() |
|
|
|
def _startup_debug_print(self, message: str) -> None: |
|
if not self._startup_debug_enabled: |
|
return |
|
_startup_debug.print_message(message) |
|
|
|
def _setup_proctitle(self, sock_port: Optional[int]) -> None: |
|
|
|
|
|
disable_setproctitle = os.environ.get("WANDB_X_DISABLE_SETPROCTITLE") |
|
if disable_setproctitle: |
|
return |
|
|
|
setproctitle = wandb.util.get_optional_module("setproctitle") |
|
if setproctitle: |
|
service_ver = 2 |
|
pid = str(self._pid or 0) |
|
transport = "s" if sock_port else "g" |
|
port = sock_port or 0 |
|
|
|
|
|
service_id = f"{service_ver}-{pid}-{transport}-{port}" |
|
proc_title = f"wandb-service({service_id})" |
|
self._startup_debug_print("before_setproctitle") |
|
setproctitle.setproctitle(proc_title) |
|
self._startup_debug_print("after_setproctitle") |
|
|
|
def serve(self) -> None: |
|
mux = StreamMux() |
|
self._startup_debug_print("before_network") |
|
sock_port = self._start_sock(mux=mux) |
|
self._startup_debug_print("after_network") |
|
self._inform_used_ports(sock_port=sock_port) |
|
self._startup_debug_print("after_inform") |
|
self._setup_proctitle(sock_port=sock_port) |
|
self._startup_debug_print("before_loop") |
|
mux.loop() |
|
self._stop_servers() |
|
|