from __future__ import annotations import abc from typing import TYPE_CHECKING, Callable, Generic from typing_extensions import TypeVar, override # Necessary to break an import loop. if TYPE_CHECKING: from wandb.sdk.interface import interface _T = TypeVar("_T") _S = TypeVar("_S") class HandleAbandonedError(Exception): """The handle has no response and has been abandoned.""" class MailboxHandle(abc.ABC, Generic[_T]): """A thread-safe handle that allows waiting for a response to a request.""" def map(self, fn: Callable[[_T], _S]) -> MailboxHandle[_S]: """Returns a transformed handle. Methods on the returned handle call methods on this handle, but the response type is derived using the given function. Args: fn: A function to apply to this handle's result to get the new handle's result. The function should be pure and fast. """ return _MailboxMappedHandle(self, fn) @abc.abstractmethod def abandon(self) -> None: """Abandon the handle, indicating it will not receive a response.""" @abc.abstractmethod def cancel(self, iface: interface.InterfaceBase) -> None: """Cancel the handle, requesting any associated work to not complete. This automatically abandons the handle, as a response is no longer guaranteed. Args: iface: The interface on which to publish the cancel request. """ @abc.abstractmethod def check(self) -> _T | None: """Returns the response if it's ready.""" @abc.abstractmethod def wait_or(self, *, timeout: float | None) -> _T: """Wait for a response or a timeout. Args: timeout: A finite number of seconds or None to never time out. If less than or equal to zero, times out immediately unless the response is available. Returns: The response if it arrives before the timeout or has already arrived. Raises: TimeoutError: If the timeout is reached. HandleAbandonedError: If the handle becomes abandoned. """ @abc.abstractmethod async def wait_async(self, *, timeout: float | None) -> _T: """Wait for a response or timeout. This must run in an `asyncio` event loop. Args: timeout: A finite number of seconds or None to never time out. Returns: The response if it arrives before the timeout or has already arrived. Raises: TimeoutError: If the timeout is reached. HandleAbandonedError: If the handle becomes abandoned. """ class _MailboxMappedHandle(Generic[_S], MailboxHandle[_S]): """A mailbox handle whose result is derived from another handle.""" def __init__( self, handle: MailboxHandle[_T], fn: Callable[[_T], _S], ) -> None: self._handle = handle self._fn = fn @override def abandon(self) -> None: self._handle.abandon() @override def cancel(self, iface: interface.InterfaceBase) -> None: self._handle.cancel(iface) @override def check(self) -> _S | None: if response := self._handle.check(): return self._fn(response) else: return None @override def wait_or(self, *, timeout: float | None) -> _S: return self._fn(self._handle.wait_or(timeout=timeout)) @override async def wait_async(self, *, timeout: float | None) -> _S: response = await self._handle.wait_async(timeout=timeout) return self._fn(response)