jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
"""InterfaceQueue - Derived from InterfaceShared using queues to send to internal thread.
See interface.py for how interface classes relate to each other.
"""
import logging
from multiprocessing.process import BaseProcess
from typing import TYPE_CHECKING, Optional
from wandb.sdk.mailbox import Mailbox
from .interface_shared import InterfaceShared
if TYPE_CHECKING:
from queue import Queue
from wandb.proto import wandb_internal_pb2 as pb
logger = logging.getLogger("wandb")
class InterfaceQueue(InterfaceShared):
def __init__(
self,
record_q: Optional["Queue[pb.Record]"] = None,
result_q: Optional["Queue[pb.Result]"] = None,
process: Optional[BaseProcess] = None,
mailbox: Optional[Mailbox] = None,
) -> None:
self.record_q = record_q
self.result_q = result_q
self._process = process
super().__init__(mailbox=mailbox)
def _publish(self, record: "pb.Record", local: Optional[bool] = None) -> None:
if self._process and not self._process.is_alive():
raise Exception("The wandb backend process has shutdown")
if local:
record.control.local = local
if self.record_q:
self.record_q.put(record)