File size: 1,049 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
"""Router - handle message router (queue).
Router to manage responses from a queue.
"""
from __future__ import annotations
import queue
from typing import TYPE_CHECKING
from wandb.proto import wandb_internal_pb2 as pb
from wandb.sdk.mailbox import Mailbox
from .router import MessageRouter
if TYPE_CHECKING:
from queue import Queue
class MessageQueueRouter(MessageRouter):
_request_queue: Queue[pb.Record]
_response_queue: Queue[pb.Result]
def __init__(
self,
request_queue: Queue[pb.Record],
response_queue: Queue[pb.Result],
mailbox: Mailbox | None = None,
) -> None:
self._request_queue = request_queue
self._response_queue = response_queue
super().__init__(mailbox=mailbox)
def _read_message(self) -> pb.Result | None:
try:
msg = self._response_queue.get(timeout=1)
except queue.Empty:
return None
return msg
def _send_message(self, record: pb.Record) -> None:
self._request_queue.put(record)
|