|
"""Router - handle message router (queue). |
|
|
|
Router to manage responses from a queue. |
|
|
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import queue |
|
from typing import TYPE_CHECKING |
|
|
|
from wandb.proto import wandb_internal_pb2 as pb |
|
from wandb.sdk.mailbox import Mailbox |
|
|
|
from .router import MessageRouter |
|
|
|
if TYPE_CHECKING: |
|
from queue import Queue |
|
|
|
|
|
class MessageQueueRouter(MessageRouter): |
|
_request_queue: Queue[pb.Record] |
|
_response_queue: Queue[pb.Result] |
|
|
|
def __init__( |
|
self, |
|
request_queue: Queue[pb.Record], |
|
response_queue: Queue[pb.Result], |
|
mailbox: Mailbox | None = None, |
|
) -> None: |
|
self._request_queue = request_queue |
|
self._response_queue = response_queue |
|
super().__init__(mailbox=mailbox) |
|
|
|
def _read_message(self) -> pb.Result | None: |
|
try: |
|
msg = self._response_queue.get(timeout=1) |
|
except queue.Empty: |
|
return None |
|
return msg |
|
|
|
def _send_message(self, record: pb.Record) -> None: |
|
self._request_queue.put(record) |
|
|