File size: 4,314 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from __future__ import annotations

import logging
import secrets
import string
import threading

from wandb.proto import wandb_internal_pb2 as pb
from wandb.proto import wandb_server_pb2 as spb

from .mailbox_handle import MailboxHandle
from .response_handle import MailboxResponseHandle

_logger = logging.getLogger(__name__)


class MailboxClosedError(Exception):
    """The mailbox has been closed and cannot be used."""


class Mailbox:
    """Matches service responses to requests.

    The mailbox can set an address on a server request and create a handle for
    waiting for a response to that record. Responses are delivered by calling
    `deliver()`. The `close()` method abandons all handles in case the
    service process becomes unreachable.
    """

    def __init__(self) -> None:
        self._handles: dict[str, MailboxResponseHandle] = {}
        self._handles_lock = threading.Lock()
        self._closed = False

    def require_response(
        self,
        request: spb.ServerRequest | pb.Record,
    ) -> MailboxHandle[spb.ServerResponse]:
        """Set a response address on a request.

        Args:
            request: The request on which to set a request ID or mailbox slot.
                This is mutated. An address must not already be set.

        Returns:
            A handle for waiting for the response to the request.

        Raises:
            MailboxClosedError: If the mailbox has been closed, in which case
                no new responses are expected to be delivered and new handles
                cannot be created.
        """
        if isinstance(request, spb.ServerRequest):
            if address := request.request_id:
                raise ValueError(f"Request already has an address ({address})")

            address = self._new_address()
            request.request_id = address
        else:
            if address := request.control.mailbox_slot:
                raise ValueError(f"Request already has an address ({address})")

            address = self._new_address()
            request.control.mailbox_slot = address

        with self._handles_lock:
            if self._closed:
                raise MailboxClosedError()

            handle = MailboxResponseHandle(address)
            self._handles[address] = handle

        return handle

    def _new_address(self) -> str:
        """Returns an unused address for a request.

        Assumes `_handles_lock` is held.
        """

        def generate():
            return "".join(
                secrets.choice(string.ascii_lowercase + string.digits)
                for i in range(12)
            )

        address = generate()

        # Being extra cautious. This loop will almost never be entered.
        while address in self._handles:
            address = generate()

        return address

    def deliver(self, response: spb.ServerResponse) -> None:
        """Deliver a response from the service.

        If the response address is invalid, this does nothing.
        It is a no-op if the mailbox has been closed.
        """
        address = response.request_id
        if not address:
            kind: str | None = response.WhichOneof("server_response_type")
            if kind == "result_communicate":
                result_type = response.result_communicate.WhichOneof("result_type")
                kind = f"result_communicate.{result_type}"

            _logger.error(f"Received response with no mailbox slot: {kind}")
            return

        with self._handles_lock:
            # NOTE: If the mailbox is closed, this returns None because
            # we clear the dict.
            handle = self._handles.pop(address, None)

        # It is not an error if there is no handle for the address:
        # handles can be abandoned if the result is no longer needed.
        if handle:
            handle.deliver(response)

    def close(self) -> None:
        """Indicate no further responses will be delivered.

        Abandons all handles.
        """
        with self._handles_lock:
            self._closed = True

            _logger.info(
                f"Closing mailbox, abandoning {len(self._handles)} handles.",
            )

            for handle in self._handles.values():
                handle.abandon()
            self._handles.clear()