|
""" |
|
Custom transports, with nicely configured defaults. |
|
|
|
The following additional keyword arguments are currently supported by httpcore... |
|
|
|
* uds: str |
|
* local_address: str |
|
* retries: int |
|
|
|
Example usages... |
|
|
|
# Disable HTTP/2 on a single specific domain. |
|
mounts = { |
|
"all://": httpx.HTTPTransport(http2=True), |
|
"all://*example.org": httpx.HTTPTransport() |
|
} |
|
|
|
# Using advanced httpcore configuration, with connection retries. |
|
transport = httpx.HTTPTransport(retries=1) |
|
client = httpx.Client(transport=transport) |
|
|
|
# Using advanced httpcore configuration, with unix domain sockets. |
|
transport = httpx.HTTPTransport(uds="socket.uds") |
|
client = httpx.Client(transport=transport) |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import contextlib |
|
import typing |
|
from types import TracebackType |
|
|
|
if typing.TYPE_CHECKING: |
|
import ssl |
|
|
|
import httpx |
|
|
|
from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context |
|
from .._exceptions import ( |
|
ConnectError, |
|
ConnectTimeout, |
|
LocalProtocolError, |
|
NetworkError, |
|
PoolTimeout, |
|
ProtocolError, |
|
ProxyError, |
|
ReadError, |
|
ReadTimeout, |
|
RemoteProtocolError, |
|
TimeoutException, |
|
UnsupportedProtocol, |
|
WriteError, |
|
WriteTimeout, |
|
) |
|
from .._models import Request, Response |
|
from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream |
|
from .._urls import URL |
|
from .base import AsyncBaseTransport, BaseTransport |
|
|
|
T = typing.TypeVar("T", bound="HTTPTransport") |
|
A = typing.TypeVar("A", bound="AsyncHTTPTransport") |
|
|
|
SOCKET_OPTION = typing.Union[ |
|
typing.Tuple[int, int, int], |
|
typing.Tuple[int, int, typing.Union[bytes, bytearray]], |
|
typing.Tuple[int, int, None, int], |
|
] |
|
|
|
__all__ = ["AsyncHTTPTransport", "HTTPTransport"] |
|
|
|
HTTPCORE_EXC_MAP: dict[type[Exception], type[httpx.HTTPError]] = {} |
|
|
|
|
|
def _load_httpcore_exceptions() -> dict[type[Exception], type[httpx.HTTPError]]: |
|
import httpcore |
|
|
|
return { |
|
httpcore.TimeoutException: TimeoutException, |
|
httpcore.ConnectTimeout: ConnectTimeout, |
|
httpcore.ReadTimeout: ReadTimeout, |
|
httpcore.WriteTimeout: WriteTimeout, |
|
httpcore.PoolTimeout: PoolTimeout, |
|
httpcore.NetworkError: NetworkError, |
|
httpcore.ConnectError: ConnectError, |
|
httpcore.ReadError: ReadError, |
|
httpcore.WriteError: WriteError, |
|
httpcore.ProxyError: ProxyError, |
|
httpcore.UnsupportedProtocol: UnsupportedProtocol, |
|
httpcore.ProtocolError: ProtocolError, |
|
httpcore.LocalProtocolError: LocalProtocolError, |
|
httpcore.RemoteProtocolError: RemoteProtocolError, |
|
} |
|
|
|
|
|
@contextlib.contextmanager |
|
def map_httpcore_exceptions() -> typing.Iterator[None]: |
|
global HTTPCORE_EXC_MAP |
|
if len(HTTPCORE_EXC_MAP) == 0: |
|
HTTPCORE_EXC_MAP = _load_httpcore_exceptions() |
|
try: |
|
yield |
|
except Exception as exc: |
|
mapped_exc = None |
|
|
|
for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): |
|
if not isinstance(exc, from_exc): |
|
continue |
|
|
|
|
|
|
|
if mapped_exc is None or issubclass(to_exc, mapped_exc): |
|
mapped_exc = to_exc |
|
|
|
if mapped_exc is None: |
|
raise |
|
|
|
message = str(exc) |
|
raise mapped_exc(message) from exc |
|
|
|
|
|
class ResponseStream(SyncByteStream): |
|
def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None: |
|
self._httpcore_stream = httpcore_stream |
|
|
|
def __iter__(self) -> typing.Iterator[bytes]: |
|
with map_httpcore_exceptions(): |
|
for part in self._httpcore_stream: |
|
yield part |
|
|
|
def close(self) -> None: |
|
if hasattr(self._httpcore_stream, "close"): |
|
self._httpcore_stream.close() |
|
|
|
|
|
class HTTPTransport(BaseTransport): |
|
def __init__( |
|
self, |
|
verify: ssl.SSLContext | str | bool = True, |
|
cert: CertTypes | None = None, |
|
trust_env: bool = True, |
|
http1: bool = True, |
|
http2: bool = False, |
|
limits: Limits = DEFAULT_LIMITS, |
|
proxy: ProxyTypes | None = None, |
|
uds: str | None = None, |
|
local_address: str | None = None, |
|
retries: int = 0, |
|
socket_options: typing.Iterable[SOCKET_OPTION] | None = None, |
|
) -> None: |
|
import httpcore |
|
|
|
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy |
|
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) |
|
|
|
if proxy is None: |
|
self._pool = httpcore.ConnectionPool( |
|
ssl_context=ssl_context, |
|
max_connections=limits.max_connections, |
|
max_keepalive_connections=limits.max_keepalive_connections, |
|
keepalive_expiry=limits.keepalive_expiry, |
|
http1=http1, |
|
http2=http2, |
|
uds=uds, |
|
local_address=local_address, |
|
retries=retries, |
|
socket_options=socket_options, |
|
) |
|
elif proxy.url.scheme in ("http", "https"): |
|
self._pool = httpcore.HTTPProxy( |
|
proxy_url=httpcore.URL( |
|
scheme=proxy.url.raw_scheme, |
|
host=proxy.url.raw_host, |
|
port=proxy.url.port, |
|
target=proxy.url.raw_path, |
|
), |
|
proxy_auth=proxy.raw_auth, |
|
proxy_headers=proxy.headers.raw, |
|
ssl_context=ssl_context, |
|
proxy_ssl_context=proxy.ssl_context, |
|
max_connections=limits.max_connections, |
|
max_keepalive_connections=limits.max_keepalive_connections, |
|
keepalive_expiry=limits.keepalive_expiry, |
|
http1=http1, |
|
http2=http2, |
|
socket_options=socket_options, |
|
) |
|
elif proxy.url.scheme in ("socks5", "socks5h"): |
|
try: |
|
import socksio |
|
except ImportError: |
|
raise ImportError( |
|
"Using SOCKS proxy, but the 'socksio' package is not installed. " |
|
"Make sure to install httpx using `pip install httpx[socks]`." |
|
) from None |
|
|
|
self._pool = httpcore.SOCKSProxy( |
|
proxy_url=httpcore.URL( |
|
scheme=proxy.url.raw_scheme, |
|
host=proxy.url.raw_host, |
|
port=proxy.url.port, |
|
target=proxy.url.raw_path, |
|
), |
|
proxy_auth=proxy.raw_auth, |
|
ssl_context=ssl_context, |
|
max_connections=limits.max_connections, |
|
max_keepalive_connections=limits.max_keepalive_connections, |
|
keepalive_expiry=limits.keepalive_expiry, |
|
http1=http1, |
|
http2=http2, |
|
) |
|
else: |
|
raise ValueError( |
|
"Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h'," |
|
f" but got {proxy.url.scheme!r}." |
|
) |
|
|
|
def __enter__(self: T) -> T: |
|
self._pool.__enter__() |
|
return self |
|
|
|
def __exit__( |
|
self, |
|
exc_type: type[BaseException] | None = None, |
|
exc_value: BaseException | None = None, |
|
traceback: TracebackType | None = None, |
|
) -> None: |
|
with map_httpcore_exceptions(): |
|
self._pool.__exit__(exc_type, exc_value, traceback) |
|
|
|
def handle_request( |
|
self, |
|
request: Request, |
|
) -> Response: |
|
assert isinstance(request.stream, SyncByteStream) |
|
import httpcore |
|
|
|
req = httpcore.Request( |
|
method=request.method, |
|
url=httpcore.URL( |
|
scheme=request.url.raw_scheme, |
|
host=request.url.raw_host, |
|
port=request.url.port, |
|
target=request.url.raw_path, |
|
), |
|
headers=request.headers.raw, |
|
content=request.stream, |
|
extensions=request.extensions, |
|
) |
|
with map_httpcore_exceptions(): |
|
resp = self._pool.handle_request(req) |
|
|
|
assert isinstance(resp.stream, typing.Iterable) |
|
|
|
return Response( |
|
status_code=resp.status, |
|
headers=resp.headers, |
|
stream=ResponseStream(resp.stream), |
|
extensions=resp.extensions, |
|
) |
|
|
|
def close(self) -> None: |
|
self._pool.close() |
|
|
|
|
|
class AsyncResponseStream(AsyncByteStream): |
|
def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None: |
|
self._httpcore_stream = httpcore_stream |
|
|
|
async def __aiter__(self) -> typing.AsyncIterator[bytes]: |
|
with map_httpcore_exceptions(): |
|
async for part in self._httpcore_stream: |
|
yield part |
|
|
|
async def aclose(self) -> None: |
|
if hasattr(self._httpcore_stream, "aclose"): |
|
await self._httpcore_stream.aclose() |
|
|
|
|
|
class AsyncHTTPTransport(AsyncBaseTransport): |
|
def __init__( |
|
self, |
|
verify: ssl.SSLContext | str | bool = True, |
|
cert: CertTypes | None = None, |
|
trust_env: bool = True, |
|
http1: bool = True, |
|
http2: bool = False, |
|
limits: Limits = DEFAULT_LIMITS, |
|
proxy: ProxyTypes | None = None, |
|
uds: str | None = None, |
|
local_address: str | None = None, |
|
retries: int = 0, |
|
socket_options: typing.Iterable[SOCKET_OPTION] | None = None, |
|
) -> None: |
|
import httpcore |
|
|
|
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy |
|
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) |
|
|
|
if proxy is None: |
|
self._pool = httpcore.AsyncConnectionPool( |
|
ssl_context=ssl_context, |
|
max_connections=limits.max_connections, |
|
max_keepalive_connections=limits.max_keepalive_connections, |
|
keepalive_expiry=limits.keepalive_expiry, |
|
http1=http1, |
|
http2=http2, |
|
uds=uds, |
|
local_address=local_address, |
|
retries=retries, |
|
socket_options=socket_options, |
|
) |
|
elif proxy.url.scheme in ("http", "https"): |
|
self._pool = httpcore.AsyncHTTPProxy( |
|
proxy_url=httpcore.URL( |
|
scheme=proxy.url.raw_scheme, |
|
host=proxy.url.raw_host, |
|
port=proxy.url.port, |
|
target=proxy.url.raw_path, |
|
), |
|
proxy_auth=proxy.raw_auth, |
|
proxy_headers=proxy.headers.raw, |
|
proxy_ssl_context=proxy.ssl_context, |
|
ssl_context=ssl_context, |
|
max_connections=limits.max_connections, |
|
max_keepalive_connections=limits.max_keepalive_connections, |
|
keepalive_expiry=limits.keepalive_expiry, |
|
http1=http1, |
|
http2=http2, |
|
socket_options=socket_options, |
|
) |
|
elif proxy.url.scheme in ("socks5", "socks5h"): |
|
try: |
|
import socksio |
|
except ImportError: |
|
raise ImportError( |
|
"Using SOCKS proxy, but the 'socksio' package is not installed. " |
|
"Make sure to install httpx using `pip install httpx[socks]`." |
|
) from None |
|
|
|
self._pool = httpcore.AsyncSOCKSProxy( |
|
proxy_url=httpcore.URL( |
|
scheme=proxy.url.raw_scheme, |
|
host=proxy.url.raw_host, |
|
port=proxy.url.port, |
|
target=proxy.url.raw_path, |
|
), |
|
proxy_auth=proxy.raw_auth, |
|
ssl_context=ssl_context, |
|
max_connections=limits.max_connections, |
|
max_keepalive_connections=limits.max_keepalive_connections, |
|
keepalive_expiry=limits.keepalive_expiry, |
|
http1=http1, |
|
http2=http2, |
|
) |
|
else: |
|
raise ValueError( |
|
"Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h'," |
|
" but got {proxy.url.scheme!r}." |
|
) |
|
|
|
async def __aenter__(self: A) -> A: |
|
await self._pool.__aenter__() |
|
return self |
|
|
|
async def __aexit__( |
|
self, |
|
exc_type: type[BaseException] | None = None, |
|
exc_value: BaseException | None = None, |
|
traceback: TracebackType | None = None, |
|
) -> None: |
|
with map_httpcore_exceptions(): |
|
await self._pool.__aexit__(exc_type, exc_value, traceback) |
|
|
|
async def handle_async_request( |
|
self, |
|
request: Request, |
|
) -> Response: |
|
assert isinstance(request.stream, AsyncByteStream) |
|
import httpcore |
|
|
|
req = httpcore.Request( |
|
method=request.method, |
|
url=httpcore.URL( |
|
scheme=request.url.raw_scheme, |
|
host=request.url.raw_host, |
|
port=request.url.port, |
|
target=request.url.raw_path, |
|
), |
|
headers=request.headers.raw, |
|
content=request.stream, |
|
extensions=request.extensions, |
|
) |
|
with map_httpcore_exceptions(): |
|
resp = await self._pool.handle_async_request(req) |
|
|
|
assert isinstance(resp.stream, typing.AsyncIterable) |
|
|
|
return Response( |
|
status_code=resp.status, |
|
headers=resp.headers, |
|
stream=AsyncResponseStream(resp.stream), |
|
extensions=resp.extensions, |
|
) |
|
|
|
async def aclose(self) -> None: |
|
await self._pool.aclose() |
|
|