|
from __future__ import annotations |
|
|
|
import os |
|
import typing |
|
|
|
from ._models import Headers |
|
from ._types import CertTypes, HeaderTypes, TimeoutTypes |
|
from ._urls import URL |
|
|
|
if typing.TYPE_CHECKING: |
|
import ssl |
|
|
|
__all__ = ["Limits", "Proxy", "Timeout", "create_ssl_context"] |
|
|
|
|
|
class UnsetType: |
|
pass |
|
|
|
|
|
UNSET = UnsetType() |
|
|
|
|
|
def create_ssl_context( |
|
verify: ssl.SSLContext | str | bool = True, |
|
cert: CertTypes | None = None, |
|
trust_env: bool = True, |
|
) -> ssl.SSLContext: |
|
import ssl |
|
import warnings |
|
|
|
import certifi |
|
|
|
if verify is True: |
|
if trust_env and os.environ.get("SSL_CERT_FILE"): |
|
ctx = ssl.create_default_context(cafile=os.environ["SSL_CERT_FILE"]) |
|
elif trust_env and os.environ.get("SSL_CERT_DIR"): |
|
ctx = ssl.create_default_context(capath=os.environ["SSL_CERT_DIR"]) |
|
else: |
|
|
|
ctx = ssl.create_default_context(cafile=certifi.where()) |
|
elif verify is False: |
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|
ctx.check_hostname = False |
|
ctx.verify_mode = ssl.CERT_NONE |
|
elif isinstance(verify, str): |
|
message = ( |
|
"`verify=<str>` is deprecated. " |
|
"Use `verify=ssl.create_default_context(cafile=...)` " |
|
"or `verify=ssl.create_default_context(capath=...)` instead." |
|
) |
|
warnings.warn(message, DeprecationWarning) |
|
if os.path.isdir(verify): |
|
return ssl.create_default_context(capath=verify) |
|
return ssl.create_default_context(cafile=verify) |
|
else: |
|
ctx = verify |
|
|
|
if cert: |
|
message = ( |
|
"`cert=...` is deprecated. Use `verify=<ssl_context>` instead," |
|
"with `.load_cert_chain()` to configure the certificate chain." |
|
) |
|
warnings.warn(message, DeprecationWarning) |
|
if isinstance(cert, str): |
|
ctx.load_cert_chain(cert) |
|
else: |
|
ctx.load_cert_chain(*cert) |
|
|
|
return ctx |
|
|
|
|
|
class Timeout: |
|
""" |
|
Timeout configuration. |
|
|
|
**Usage**: |
|
|
|
Timeout(None) # No timeouts. |
|
Timeout(5.0) # 5s timeout on all operations. |
|
Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. |
|
Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. |
|
Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. |
|
# 5s timeout elsewhere. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
timeout: TimeoutTypes | UnsetType = UNSET, |
|
*, |
|
connect: None | float | UnsetType = UNSET, |
|
read: None | float | UnsetType = UNSET, |
|
write: None | float | UnsetType = UNSET, |
|
pool: None | float | UnsetType = UNSET, |
|
) -> None: |
|
if isinstance(timeout, Timeout): |
|
|
|
assert connect is UNSET |
|
assert read is UNSET |
|
assert write is UNSET |
|
assert pool is UNSET |
|
self.connect = timeout.connect |
|
self.read = timeout.read |
|
self.write = timeout.write |
|
self.pool = timeout.pool |
|
elif isinstance(timeout, tuple): |
|
|
|
self.connect = timeout[0] |
|
self.read = timeout[1] |
|
self.write = None if len(timeout) < 3 else timeout[2] |
|
self.pool = None if len(timeout) < 4 else timeout[3] |
|
elif not ( |
|
isinstance(connect, UnsetType) |
|
or isinstance(read, UnsetType) |
|
or isinstance(write, UnsetType) |
|
or isinstance(pool, UnsetType) |
|
): |
|
self.connect = connect |
|
self.read = read |
|
self.write = write |
|
self.pool = pool |
|
else: |
|
if isinstance(timeout, UnsetType): |
|
raise ValueError( |
|
"httpx.Timeout must either include a default, or set all " |
|
"four parameters explicitly." |
|
) |
|
self.connect = timeout if isinstance(connect, UnsetType) else connect |
|
self.read = timeout if isinstance(read, UnsetType) else read |
|
self.write = timeout if isinstance(write, UnsetType) else write |
|
self.pool = timeout if isinstance(pool, UnsetType) else pool |
|
|
|
def as_dict(self) -> dict[str, float | None]: |
|
return { |
|
"connect": self.connect, |
|
"read": self.read, |
|
"write": self.write, |
|
"pool": self.pool, |
|
} |
|
|
|
def __eq__(self, other: typing.Any) -> bool: |
|
return ( |
|
isinstance(other, self.__class__) |
|
and self.connect == other.connect |
|
and self.read == other.read |
|
and self.write == other.write |
|
and self.pool == other.pool |
|
) |
|
|
|
def __repr__(self) -> str: |
|
class_name = self.__class__.__name__ |
|
if len({self.connect, self.read, self.write, self.pool}) == 1: |
|
return f"{class_name}(timeout={self.connect})" |
|
return ( |
|
f"{class_name}(connect={self.connect}, " |
|
f"read={self.read}, write={self.write}, pool={self.pool})" |
|
) |
|
|
|
|
|
class Limits: |
|
""" |
|
Configuration for limits to various client behaviors. |
|
|
|
**Parameters:** |
|
|
|
* **max_connections** - The maximum number of concurrent connections that may be |
|
established. |
|
* **max_keepalive_connections** - Allow the connection pool to maintain |
|
keep-alive connections below this point. Should be less than or equal |
|
to `max_connections`. |
|
* **keepalive_expiry** - Time limit on idle keep-alive connections in seconds. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*, |
|
max_connections: int | None = None, |
|
max_keepalive_connections: int | None = None, |
|
keepalive_expiry: float | None = 5.0, |
|
) -> None: |
|
self.max_connections = max_connections |
|
self.max_keepalive_connections = max_keepalive_connections |
|
self.keepalive_expiry = keepalive_expiry |
|
|
|
def __eq__(self, other: typing.Any) -> bool: |
|
return ( |
|
isinstance(other, self.__class__) |
|
and self.max_connections == other.max_connections |
|
and self.max_keepalive_connections == other.max_keepalive_connections |
|
and self.keepalive_expiry == other.keepalive_expiry |
|
) |
|
|
|
def __repr__(self) -> str: |
|
class_name = self.__class__.__name__ |
|
return ( |
|
f"{class_name}(max_connections={self.max_connections}, " |
|
f"max_keepalive_connections={self.max_keepalive_connections}, " |
|
f"keepalive_expiry={self.keepalive_expiry})" |
|
) |
|
|
|
|
|
class Proxy: |
|
def __init__( |
|
self, |
|
url: URL | str, |
|
*, |
|
ssl_context: ssl.SSLContext | None = None, |
|
auth: tuple[str, str] | None = None, |
|
headers: HeaderTypes | None = None, |
|
) -> None: |
|
url = URL(url) |
|
headers = Headers(headers) |
|
|
|
if url.scheme not in ("http", "https", "socks5", "socks5h"): |
|
raise ValueError(f"Unknown scheme for proxy URL {url!r}") |
|
|
|
if url.username or url.password: |
|
|
|
auth = (url.username, url.password) |
|
url = url.copy_with(username=None, password=None) |
|
|
|
self.url = url |
|
self.auth = auth |
|
self.headers = headers |
|
self.ssl_context = ssl_context |
|
|
|
@property |
|
def raw_auth(self) -> tuple[bytes, bytes] | None: |
|
|
|
return ( |
|
None |
|
if self.auth is None |
|
else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8")) |
|
) |
|
|
|
def __repr__(self) -> str: |
|
|
|
auth = (self.auth[0], "********") if self.auth else None |
|
|
|
|
|
url_str = f"{str(self.url)!r}" |
|
auth_str = f", auth={auth!r}" if auth else "" |
|
headers_str = f", headers={dict(self.headers)!r}" if self.headers else "" |
|
return f"Proxy({url_str}{auth_str}{headers_str})" |
|
|
|
|
|
DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0) |
|
DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20) |
|
DEFAULT_MAX_REDIRECTS = 20 |
|
|