|
"""InterfaceQueue - Derived from InterfaceShared using queues to send to internal thread. |
|
|
|
See interface.py for how interface classes relate to each other. |
|
|
|
""" |
|
|
|
import logging |
|
from multiprocessing.process import BaseProcess |
|
from typing import TYPE_CHECKING, Optional |
|
|
|
from wandb.sdk.mailbox import Mailbox |
|
|
|
from .interface_shared import InterfaceShared |
|
|
|
if TYPE_CHECKING: |
|
from queue import Queue |
|
|
|
from wandb.proto import wandb_internal_pb2 as pb |
|
|
|
|
|
logger = logging.getLogger("wandb") |
|
|
|
|
|
class InterfaceQueue(InterfaceShared): |
|
def __init__( |
|
self, |
|
record_q: Optional["Queue[pb.Record]"] = None, |
|
result_q: Optional["Queue[pb.Result]"] = None, |
|
process: Optional[BaseProcess] = None, |
|
mailbox: Optional[Mailbox] = None, |
|
) -> None: |
|
self.record_q = record_q |
|
self.result_q = result_q |
|
self._process = process |
|
super().__init__(mailbox=mailbox) |
|
|
|
def _publish(self, record: "pb.Record", local: Optional[bool] = None) -> None: |
|
if self._process and not self._process.is_alive(): |
|
raise Exception("The wandb backend process has shutdown") |
|
if local: |
|
record.control.local = local |
|
if self.record_q: |
|
self.record_q.put(record) |
|
|