|
import re |
|
import sys |
|
import warnings |
|
from collections.abc import Mapping, Sequence |
|
from enum import Enum |
|
from functools import _CacheInfo, lru_cache |
|
from ipaddress import ip_address |
|
from typing import TYPE_CHECKING, Any, NoReturn, TypedDict, TypeVar, Union, overload |
|
from urllib.parse import SplitResult, uses_relative |
|
|
|
import idna |
|
from multidict import MultiDict, MultiDictProxy |
|
from propcache.api import under_cached_property as cached_property |
|
|
|
from ._parse import ( |
|
USES_AUTHORITY, |
|
SplitURLType, |
|
make_netloc, |
|
query_to_pairs, |
|
split_netloc, |
|
split_url, |
|
unsplit_result, |
|
) |
|
from ._path import normalize_path, normalize_path_segments |
|
from ._query import ( |
|
Query, |
|
QueryVariable, |
|
SimpleQuery, |
|
get_str_query, |
|
get_str_query_from_iterable, |
|
get_str_query_from_sequence_iterable, |
|
) |
|
from ._quoters import ( |
|
FRAGMENT_QUOTER, |
|
FRAGMENT_REQUOTER, |
|
PATH_QUOTER, |
|
PATH_REQUOTER, |
|
PATH_SAFE_UNQUOTER, |
|
PATH_UNQUOTER, |
|
QS_UNQUOTER, |
|
QUERY_QUOTER, |
|
QUERY_REQUOTER, |
|
QUOTER, |
|
REQUOTER, |
|
UNQUOTER, |
|
human_quote, |
|
) |
|
|
|
DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21} |
|
USES_RELATIVE = frozenset(uses_relative) |
|
|
|
|
|
|
|
SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp")) |
|
|
|
|
|
|
|
|
|
|
|
NOT_REG_NAME = re.compile( |
|
r""" |
|
# any character not in the unreserved or sub-delims sets, plus % |
|
# (validated with the additional check for pct-encoded sequences below) |
|
[^a-z0-9\-._~!$&'()*+,;=%] |
|
| |
|
# % only allowed if it is part of a pct-encoded |
|
# sequence of 2 hex digits. |
|
%(?![0-9a-f]{2}) |
|
""", |
|
re.VERBOSE, |
|
) |
|
|
|
_T = TypeVar("_T") |
|
|
|
if sys.version_info >= (3, 11): |
|
from typing import Self |
|
else: |
|
Self = Any |
|
|
|
|
|
class UndefinedType(Enum): |
|
"""Singleton type for use with not set sentinel values.""" |
|
|
|
_singleton = 0 |
|
|
|
|
|
UNDEFINED = UndefinedType._singleton |
|
|
|
|
|
class CacheInfo(TypedDict): |
|
"""Host encoding cache.""" |
|
|
|
idna_encode: _CacheInfo |
|
idna_decode: _CacheInfo |
|
ip_address: _CacheInfo |
|
host_validate: _CacheInfo |
|
encode_host: _CacheInfo |
|
|
|
|
|
class _InternalURLCache(TypedDict, total=False): |
|
_val: SplitURLType |
|
_origin: "URL" |
|
absolute: bool |
|
hash: int |
|
scheme: str |
|
raw_authority: str |
|
authority: str |
|
raw_user: Union[str, None] |
|
user: Union[str, None] |
|
raw_password: Union[str, None] |
|
password: Union[str, None] |
|
raw_host: Union[str, None] |
|
host: Union[str, None] |
|
host_subcomponent: Union[str, None] |
|
host_port_subcomponent: Union[str, None] |
|
port: Union[int, None] |
|
explicit_port: Union[int, None] |
|
raw_path: str |
|
path: str |
|
_parsed_query: list[tuple[str, str]] |
|
query: "MultiDictProxy[str]" |
|
raw_query_string: str |
|
query_string: str |
|
path_qs: str |
|
raw_path_qs: str |
|
raw_fragment: str |
|
fragment: str |
|
raw_parts: tuple[str, ...] |
|
parts: tuple[str, ...] |
|
parent: "URL" |
|
raw_name: str |
|
name: str |
|
raw_suffix: str |
|
suffix: str |
|
raw_suffixes: tuple[str, ...] |
|
suffixes: tuple[str, ...] |
|
|
|
|
|
def rewrite_module(obj: _T) -> _T: |
|
obj.__module__ = "yarl" |
|
return obj |
|
|
|
|
|
@lru_cache |
|
def encode_url(url_str: str) -> "URL": |
|
"""Parse unencoded URL.""" |
|
cache: _InternalURLCache = {} |
|
host: Union[str, None] |
|
scheme, netloc, path, query, fragment = split_url(url_str) |
|
if not netloc: |
|
host = "" |
|
else: |
|
if ":" in netloc or "@" in netloc or "[" in netloc: |
|
|
|
username, password, host, port = split_netloc(netloc) |
|
else: |
|
username = password = port = None |
|
host = netloc |
|
if host is None: |
|
if scheme in SCHEME_REQUIRES_HOST: |
|
msg = ( |
|
"Invalid URL: host is required for " |
|
f"absolute urls with the {scheme} scheme" |
|
) |
|
raise ValueError(msg) |
|
else: |
|
host = "" |
|
host = _encode_host(host, validate_host=False) |
|
|
|
cache["raw_host"] = host[1:-1] if "[" in host else host |
|
cache["explicit_port"] = port |
|
if password is None and username is None: |
|
|
|
netloc = host if port is None else f"{host}:{port}" |
|
cache["raw_user"] = None |
|
cache["raw_password"] = None |
|
else: |
|
raw_user = REQUOTER(username) if username else username |
|
raw_password = REQUOTER(password) if password else password |
|
netloc = make_netloc(raw_user, raw_password, host, port) |
|
cache["raw_user"] = raw_user |
|
cache["raw_password"] = raw_password |
|
|
|
if path: |
|
path = PATH_REQUOTER(path) |
|
if netloc and "." in path: |
|
path = normalize_path(path) |
|
if query: |
|
query = QUERY_REQUOTER(query) |
|
if fragment: |
|
fragment = FRAGMENT_REQUOTER(fragment) |
|
|
|
cache["scheme"] = scheme |
|
cache["raw_path"] = "/" if not path and netloc else path |
|
cache["raw_query_string"] = query |
|
cache["raw_fragment"] = fragment |
|
|
|
self = object.__new__(URL) |
|
self._scheme = scheme |
|
self._netloc = netloc |
|
self._path = path |
|
self._query = query |
|
self._fragment = fragment |
|
self._cache = cache |
|
return self |
|
|
|
|
|
@lru_cache |
|
def pre_encoded_url(url_str: str) -> "URL": |
|
"""Parse pre-encoded URL.""" |
|
self = object.__new__(URL) |
|
val = split_url(url_str) |
|
self._scheme, self._netloc, self._path, self._query, self._fragment = val |
|
self._cache = {} |
|
return self |
|
|
|
|
|
@lru_cache |
|
def build_pre_encoded_url( |
|
scheme: str, |
|
authority: str, |
|
user: Union[str, None], |
|
password: Union[str, None], |
|
host: str, |
|
port: Union[int, None], |
|
path: str, |
|
query_string: str, |
|
fragment: str, |
|
) -> "URL": |
|
"""Build a pre-encoded URL from parts.""" |
|
self = object.__new__(URL) |
|
self._scheme = scheme |
|
if authority: |
|
self._netloc = authority |
|
elif host: |
|
if port is not None: |
|
port = None if port == DEFAULT_PORTS.get(scheme) else port |
|
if user is None and password is None: |
|
self._netloc = host if port is None else f"{host}:{port}" |
|
else: |
|
self._netloc = make_netloc(user, password, host, port) |
|
else: |
|
self._netloc = "" |
|
self._path = path |
|
self._query = query_string |
|
self._fragment = fragment |
|
self._cache = {} |
|
return self |
|
|
|
|
|
def from_parts_uncached( |
|
scheme: str, netloc: str, path: str, query: str, fragment: str |
|
) -> "URL": |
|
"""Create a new URL from parts.""" |
|
self = object.__new__(URL) |
|
self._scheme = scheme |
|
self._netloc = netloc |
|
self._path = path |
|
self._query = query |
|
self._fragment = fragment |
|
self._cache = {} |
|
return self |
|
|
|
|
|
from_parts = lru_cache(from_parts_uncached) |
|
|
|
|
|
@rewrite_module |
|
class URL: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment") |
|
|
|
_cache: _InternalURLCache |
|
_scheme: str |
|
_netloc: str |
|
_path: str |
|
_query: str |
|
_fragment: str |
|
|
|
def __new__( |
|
cls, |
|
val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED, |
|
*, |
|
encoded: bool = False, |
|
strict: Union[bool, None] = None, |
|
) -> "URL": |
|
if strict is not None: |
|
warnings.warn("strict parameter is ignored") |
|
if type(val) is str: |
|
return pre_encoded_url(val) if encoded else encode_url(val) |
|
if type(val) is cls: |
|
return val |
|
if type(val) is SplitResult: |
|
if not encoded: |
|
raise ValueError("Cannot apply decoding to SplitResult") |
|
return from_parts(*val) |
|
if isinstance(val, str): |
|
return pre_encoded_url(str(val)) if encoded else encode_url(str(val)) |
|
if val is UNDEFINED: |
|
|
|
|
|
|
|
self = object.__new__(URL) |
|
self._scheme = self._netloc = self._path = self._query = self._fragment = "" |
|
self._cache = {} |
|
return self |
|
raise TypeError("Constructor parameter should be str") |
|
|
|
@classmethod |
|
def build( |
|
cls, |
|
*, |
|
scheme: str = "", |
|
authority: str = "", |
|
user: Union[str, None] = None, |
|
password: Union[str, None] = None, |
|
host: str = "", |
|
port: Union[int, None] = None, |
|
path: str = "", |
|
query: Union[Query, None] = None, |
|
query_string: str = "", |
|
fragment: str = "", |
|
encoded: bool = False, |
|
) -> "URL": |
|
"""Creates and returns a new URL""" |
|
|
|
if authority and (user or password or host or port): |
|
raise ValueError( |
|
'Can\'t mix "authority" with "user", "password", "host" or "port".' |
|
) |
|
if port is not None and not isinstance(port, int): |
|
raise TypeError(f"The port is required to be int, got {type(port)!r}.") |
|
if port and not host: |
|
raise ValueError('Can\'t build URL with "port" but without "host".') |
|
if query and query_string: |
|
raise ValueError('Only one of "query" or "query_string" should be passed') |
|
if ( |
|
scheme is None |
|
or authority is None |
|
or host is None |
|
or path is None |
|
or query_string is None |
|
or fragment is None |
|
): |
|
raise TypeError( |
|
'NoneType is illegal for "scheme", "authority", "host", "path", ' |
|
'"query_string", and "fragment" args, use empty string instead.' |
|
) |
|
|
|
if query: |
|
query_string = get_str_query(query) or "" |
|
|
|
if encoded: |
|
return build_pre_encoded_url( |
|
scheme, |
|
authority, |
|
user, |
|
password, |
|
host, |
|
port, |
|
path, |
|
query_string, |
|
fragment, |
|
) |
|
|
|
self = object.__new__(URL) |
|
self._scheme = scheme |
|
_host: Union[str, None] = None |
|
if authority: |
|
user, password, _host, port = split_netloc(authority) |
|
_host = _encode_host(_host, validate_host=False) if _host else "" |
|
elif host: |
|
_host = _encode_host(host, validate_host=True) |
|
else: |
|
self._netloc = "" |
|
|
|
if _host is not None: |
|
if port is not None: |
|
port = None if port == DEFAULT_PORTS.get(scheme) else port |
|
if user is None and password is None: |
|
self._netloc = _host if port is None else f"{_host}:{port}" |
|
else: |
|
self._netloc = make_netloc(user, password, _host, port, True) |
|
|
|
path = PATH_QUOTER(path) if path else path |
|
if path and self._netloc: |
|
if "." in path: |
|
path = normalize_path(path) |
|
if path[0] != "/": |
|
msg = ( |
|
"Path in a URL with authority should " |
|
"start with a slash ('/') if set" |
|
) |
|
raise ValueError(msg) |
|
|
|
self._path = path |
|
if not query and query_string: |
|
query_string = QUERY_QUOTER(query_string) |
|
self._query = query_string |
|
self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment |
|
self._cache = {} |
|
return self |
|
|
|
def __init_subclass__(cls) -> NoReturn: |
|
raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden") |
|
|
|
def __str__(self) -> str: |
|
if not self._path and self._netloc and (self._query or self._fragment): |
|
path = "/" |
|
else: |
|
path = self._path |
|
if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get( |
|
self._scheme |
|
): |
|
|
|
|
|
host = self.host_subcomponent |
|
netloc = make_netloc(self.raw_user, self.raw_password, host, None) |
|
else: |
|
netloc = self._netloc |
|
return unsplit_result(self._scheme, netloc, path, self._query, self._fragment) |
|
|
|
def __repr__(self) -> str: |
|
return f"{self.__class__.__name__}('{str(self)}')" |
|
|
|
def __bytes__(self) -> bytes: |
|
return str(self).encode("ascii") |
|
|
|
def __eq__(self, other: object) -> bool: |
|
if type(other) is not URL: |
|
return NotImplemented |
|
|
|
path1 = "/" if not self._path and self._netloc else self._path |
|
path2 = "/" if not other._path and other._netloc else other._path |
|
return ( |
|
self._scheme == other._scheme |
|
and self._netloc == other._netloc |
|
and path1 == path2 |
|
and self._query == other._query |
|
and self._fragment == other._fragment |
|
) |
|
|
|
def __hash__(self) -> int: |
|
if (ret := self._cache.get("hash")) is None: |
|
path = "/" if not self._path and self._netloc else self._path |
|
ret = self._cache["hash"] = hash( |
|
(self._scheme, self._netloc, path, self._query, self._fragment) |
|
) |
|
return ret |
|
|
|
def __le__(self, other: object) -> bool: |
|
if type(other) is not URL: |
|
return NotImplemented |
|
return self._val <= other._val |
|
|
|
def __lt__(self, other: object) -> bool: |
|
if type(other) is not URL: |
|
return NotImplemented |
|
return self._val < other._val |
|
|
|
def __ge__(self, other: object) -> bool: |
|
if type(other) is not URL: |
|
return NotImplemented |
|
return self._val >= other._val |
|
|
|
def __gt__(self, other: object) -> bool: |
|
if type(other) is not URL: |
|
return NotImplemented |
|
return self._val > other._val |
|
|
|
def __truediv__(self, name: str) -> "URL": |
|
if not isinstance(name, str): |
|
return NotImplemented |
|
return self._make_child((str(name),)) |
|
|
|
def __mod__(self, query: Query) -> "URL": |
|
return self.update_query(query) |
|
|
|
def __bool__(self) -> bool: |
|
return bool(self._netloc or self._path or self._query or self._fragment) |
|
|
|
def __getstate__(self) -> tuple[SplitResult]: |
|
return (tuple.__new__(SplitResult, self._val),) |
|
|
|
def __setstate__( |
|
self, state: Union[tuple[SplitURLType], tuple[None, _InternalURLCache]] |
|
) -> None: |
|
if state[0] is None and isinstance(state[1], dict): |
|
|
|
val = state[1]["_val"] |
|
else: |
|
unused: list[object] |
|
val, *unused = state |
|
self._scheme, self._netloc, self._path, self._query, self._fragment = val |
|
self._cache = {} |
|
|
|
def _cache_netloc(self) -> None: |
|
"""Cache the netloc parts of the URL.""" |
|
c = self._cache |
|
split_loc = split_netloc(self._netloc) |
|
c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc |
|
|
|
def is_absolute(self) -> bool: |
|
"""A check for absolute URLs. |
|
|
|
Return True for absolute ones (having scheme or starting |
|
with //), False otherwise. |
|
|
|
Is is preferred to call the .absolute property instead |
|
as it is cached. |
|
""" |
|
return self.absolute |
|
|
|
def is_default_port(self) -> bool: |
|
"""A check for default port. |
|
|
|
Return True if port is default for specified scheme, |
|
e.g. 'http://python.org' or 'http://python.org:80', False |
|
otherwise. |
|
|
|
Return False for relative URLs. |
|
|
|
""" |
|
if (explicit := self.explicit_port) is None: |
|
|
|
|
|
|
|
return self._netloc != "" |
|
return explicit == DEFAULT_PORTS.get(self._scheme) |
|
|
|
def origin(self) -> "URL": |
|
"""Return an URL with scheme, host and port parts only. |
|
|
|
user, password, path, query and fragment are removed. |
|
|
|
""" |
|
|
|
return self._origin |
|
|
|
@cached_property |
|
def _val(self) -> SplitURLType: |
|
return (self._scheme, self._netloc, self._path, self._query, self._fragment) |
|
|
|
@cached_property |
|
def _origin(self) -> "URL": |
|
"""Return an URL with scheme, host and port parts only. |
|
|
|
user, password, path, query and fragment are removed. |
|
""" |
|
if not (netloc := self._netloc): |
|
raise ValueError("URL should be absolute") |
|
if not (scheme := self._scheme): |
|
raise ValueError("URL should have scheme") |
|
if "@" in netloc: |
|
encoded_host = self.host_subcomponent |
|
netloc = make_netloc(None, None, encoded_host, self.explicit_port) |
|
elif not self._path and not self._query and not self._fragment: |
|
return self |
|
return from_parts(scheme, netloc, "", "", "") |
|
|
|
def relative(self) -> "URL": |
|
"""Return a relative part of the URL. |
|
|
|
scheme, user, password, host and port are removed. |
|
|
|
""" |
|
if not self._netloc: |
|
raise ValueError("URL should be absolute") |
|
return from_parts("", "", self._path, self._query, self._fragment) |
|
|
|
@cached_property |
|
def absolute(self) -> bool: |
|
"""A check for absolute URLs. |
|
|
|
Return True for absolute ones (having scheme or starting |
|
with //), False otherwise. |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
return self._netloc != "" |
|
|
|
@cached_property |
|
def scheme(self) -> str: |
|
"""Scheme for absolute URLs. |
|
|
|
Empty string for relative URLs or URLs starting with // |
|
|
|
""" |
|
return self._scheme |
|
|
|
@cached_property |
|
def raw_authority(self) -> str: |
|
"""Encoded authority part of URL. |
|
|
|
Empty string for relative URLs. |
|
|
|
""" |
|
return self._netloc |
|
|
|
@cached_property |
|
def authority(self) -> str: |
|
"""Decoded authority part of URL. |
|
|
|
Empty string for relative URLs. |
|
|
|
""" |
|
return make_netloc(self.user, self.password, self.host, self.port) |
|
|
|
@cached_property |
|
def raw_user(self) -> Union[str, None]: |
|
"""Encoded user part of URL. |
|
|
|
None if user is missing. |
|
|
|
""" |
|
|
|
self._cache_netloc() |
|
return self._cache["raw_user"] |
|
|
|
@cached_property |
|
def user(self) -> Union[str, None]: |
|
"""Decoded user part of URL. |
|
|
|
None if user is missing. |
|
|
|
""" |
|
if (raw_user := self.raw_user) is None: |
|
return None |
|
return UNQUOTER(raw_user) |
|
|
|
@cached_property |
|
def raw_password(self) -> Union[str, None]: |
|
"""Encoded password part of URL. |
|
|
|
None if password is missing. |
|
|
|
""" |
|
self._cache_netloc() |
|
return self._cache["raw_password"] |
|
|
|
@cached_property |
|
def password(self) -> Union[str, None]: |
|
"""Decoded password part of URL. |
|
|
|
None if password is missing. |
|
|
|
""" |
|
if (raw_password := self.raw_password) is None: |
|
return None |
|
return UNQUOTER(raw_password) |
|
|
|
@cached_property |
|
def raw_host(self) -> Union[str, None]: |
|
"""Encoded host part of URL. |
|
|
|
None for relative URLs. |
|
|
|
When working with IPv6 addresses, use the `host_subcomponent` property instead |
|
as it will return the host subcomponent with brackets. |
|
""" |
|
|
|
|
|
self._cache_netloc() |
|
return self._cache["raw_host"] |
|
|
|
@cached_property |
|
def host(self) -> Union[str, None]: |
|
"""Decoded host part of URL. |
|
|
|
None for relative URLs. |
|
|
|
""" |
|
if (raw := self.raw_host) is None: |
|
return None |
|
if raw and raw[-1].isdigit() or ":" in raw: |
|
|
|
return raw |
|
return _idna_decode(raw) |
|
|
|
@cached_property |
|
def host_subcomponent(self) -> Union[str, None]: |
|
"""Return the host subcomponent part of URL. |
|
|
|
None for relative URLs. |
|
|
|
https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 |
|
|
|
`IP-literal = "[" ( IPv6address / IPvFuture ) "]"` |
|
|
|
Examples: |
|
- `http://example.com:8080` -> `example.com` |
|
- `http://example.com:80` -> `example.com` |
|
- `https://127.0.0.1:8443` -> `127.0.0.1` |
|
- `https://[::1]:8443` -> `[::1]` |
|
- `http://[::1]` -> `[::1]` |
|
|
|
""" |
|
if (raw := self.raw_host) is None: |
|
return None |
|
return f"[{raw}]" if ":" in raw else raw |
|
|
|
@cached_property |
|
def host_port_subcomponent(self) -> Union[str, None]: |
|
"""Return the host and port subcomponent part of URL. |
|
|
|
Trailing dots are removed from the host part. |
|
|
|
This value is suitable for use in the Host header of an HTTP request. |
|
|
|
None for relative URLs. |
|
|
|
https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 |
|
`IP-literal = "[" ( IPv6address / IPvFuture ) "]"` |
|
https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3 |
|
port = *DIGIT |
|
|
|
Examples: |
|
- `http://example.com:8080` -> `example.com:8080` |
|
- `http://example.com:80` -> `example.com` |
|
- `http://example.com.:80` -> `example.com` |
|
- `https://127.0.0.1:8443` -> `127.0.0.1:8443` |
|
- `https://[::1]:8443` -> `[::1]:8443` |
|
- `http://[::1]` -> `[::1]` |
|
|
|
""" |
|
if (raw := self.raw_host) is None: |
|
return None |
|
if raw[-1] == ".": |
|
|
|
|
|
|
|
|
|
|
|
raw = raw.rstrip(".") |
|
port = self.explicit_port |
|
if port is None or port == DEFAULT_PORTS.get(self._scheme): |
|
return f"[{raw}]" if ":" in raw else raw |
|
return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}" |
|
|
|
@cached_property |
|
def port(self) -> Union[int, None]: |
|
"""Port part of URL, with scheme-based fallback. |
|
|
|
None for relative URLs or URLs without explicit port and |
|
scheme without default port substitution. |
|
|
|
""" |
|
if (explicit_port := self.explicit_port) is not None: |
|
return explicit_port |
|
return DEFAULT_PORTS.get(self._scheme) |
|
|
|
@cached_property |
|
def explicit_port(self) -> Union[int, None]: |
|
"""Port part of URL, without scheme-based fallback. |
|
|
|
None for relative URLs or URLs without explicit port. |
|
|
|
""" |
|
self._cache_netloc() |
|
return self._cache["explicit_port"] |
|
|
|
@cached_property |
|
def raw_path(self) -> str: |
|
"""Encoded path of URL. |
|
|
|
/ for absolute URLs without path part. |
|
|
|
""" |
|
return self._path if self._path or not self._netloc else "/" |
|
|
|
@cached_property |
|
def path(self) -> str: |
|
"""Decoded path of URL. |
|
|
|
/ for absolute URLs without path part. |
|
|
|
""" |
|
return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else "" |
|
|
|
@cached_property |
|
def path_safe(self) -> str: |
|
"""Decoded path of URL. |
|
|
|
/ for absolute URLs without path part. |
|
|
|
/ (%2F) and % (%25) are not decoded |
|
|
|
""" |
|
if self._path: |
|
return PATH_SAFE_UNQUOTER(self._path) |
|
return "/" if self._netloc else "" |
|
|
|
@cached_property |
|
def _parsed_query(self) -> list[tuple[str, str]]: |
|
"""Parse query part of URL.""" |
|
return query_to_pairs(self._query) |
|
|
|
@cached_property |
|
def query(self) -> "MultiDictProxy[str]": |
|
"""A MultiDictProxy representing parsed query parameters in decoded |
|
representation. |
|
|
|
Empty value if URL has no query part. |
|
|
|
""" |
|
return MultiDictProxy(MultiDict(self._parsed_query)) |
|
|
|
@cached_property |
|
def raw_query_string(self) -> str: |
|
"""Encoded query part of URL. |
|
|
|
Empty string if query is missing. |
|
|
|
""" |
|
return self._query |
|
|
|
@cached_property |
|
def query_string(self) -> str: |
|
"""Decoded query part of URL. |
|
|
|
Empty string if query is missing. |
|
|
|
""" |
|
return QS_UNQUOTER(self._query) if self._query else "" |
|
|
|
@cached_property |
|
def path_qs(self) -> str: |
|
"""Decoded path of URL with query.""" |
|
return self.path if not (q := self.query_string) else f"{self.path}?{q}" |
|
|
|
@cached_property |
|
def raw_path_qs(self) -> str: |
|
"""Encoded path of URL with query.""" |
|
if q := self._query: |
|
return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}" |
|
return self._path if self._path or not self._netloc else "/" |
|
|
|
@cached_property |
|
def raw_fragment(self) -> str: |
|
"""Encoded fragment part of URL. |
|
|
|
Empty string if fragment is missing. |
|
|
|
""" |
|
return self._fragment |
|
|
|
@cached_property |
|
def fragment(self) -> str: |
|
"""Decoded fragment part of URL. |
|
|
|
Empty string if fragment is missing. |
|
|
|
""" |
|
return UNQUOTER(self._fragment) if self._fragment else "" |
|
|
|
@cached_property |
|
def raw_parts(self) -> tuple[str, ...]: |
|
"""A tuple containing encoded *path* parts. |
|
|
|
('/',) for absolute URLs if *path* is missing. |
|
|
|
""" |
|
path = self._path |
|
if self._netloc: |
|
return ("/", *path[1:].split("/")) if path else ("/",) |
|
if path and path[0] == "/": |
|
return ("/", *path[1:].split("/")) |
|
return tuple(path.split("/")) |
|
|
|
@cached_property |
|
def parts(self) -> tuple[str, ...]: |
|
"""A tuple containing decoded *path* parts. |
|
|
|
('/',) for absolute URLs if *path* is missing. |
|
|
|
""" |
|
return tuple(UNQUOTER(part) for part in self.raw_parts) |
|
|
|
@cached_property |
|
def parent(self) -> "URL": |
|
"""A new URL with last part of path removed and cleaned up query and |
|
fragment. |
|
|
|
""" |
|
path = self._path |
|
if not path or path == "/": |
|
if self._fragment or self._query: |
|
return from_parts(self._scheme, self._netloc, path, "", "") |
|
return self |
|
parts = path.split("/") |
|
return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "") |
|
|
|
@cached_property |
|
def raw_name(self) -> str: |
|
"""The last part of raw_parts.""" |
|
parts = self.raw_parts |
|
if not self._netloc: |
|
return parts[-1] |
|
parts = parts[1:] |
|
return parts[-1] if parts else "" |
|
|
|
@cached_property |
|
def name(self) -> str: |
|
"""The last part of parts.""" |
|
return UNQUOTER(self.raw_name) |
|
|
|
@cached_property |
|
def raw_suffix(self) -> str: |
|
name = self.raw_name |
|
i = name.rfind(".") |
|
return name[i:] if 0 < i < len(name) - 1 else "" |
|
|
|
@cached_property |
|
def suffix(self) -> str: |
|
return UNQUOTER(self.raw_suffix) |
|
|
|
@cached_property |
|
def raw_suffixes(self) -> tuple[str, ...]: |
|
name = self.raw_name |
|
if name.endswith("."): |
|
return () |
|
name = name.lstrip(".") |
|
return tuple("." + suffix for suffix in name.split(".")[1:]) |
|
|
|
@cached_property |
|
def suffixes(self) -> tuple[str, ...]: |
|
return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes) |
|
|
|
def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL": |
|
""" |
|
add paths to self._path, accounting for absolute vs relative paths, |
|
keep existing, but do not create new, empty segments |
|
""" |
|
parsed: list[str] = [] |
|
needs_normalize: bool = False |
|
for idx, path in enumerate(reversed(paths)): |
|
|
|
last = idx == 0 |
|
if path and path[0] == "/": |
|
raise ValueError( |
|
f"Appending path {path!r} starting from slash is forbidden" |
|
) |
|
|
|
|
|
|
|
|
|
path = path if encoded else PATH_QUOTER(path) |
|
needs_normalize |= "." in path |
|
segments = path.split("/") |
|
segments.reverse() |
|
|
|
parsed += segments[1:] if not last and segments[0] == "" else segments |
|
|
|
if (path := self._path) and (old_segments := path.split("/")): |
|
|
|
|
|
old = old_segments[:-1] if old_segments[-1] == "" else old_segments |
|
old.reverse() |
|
parsed += old |
|
|
|
|
|
|
|
if (netloc := self._netloc) and parsed and parsed[-1] != "": |
|
parsed.append("") |
|
|
|
parsed.reverse() |
|
if not netloc or not needs_normalize: |
|
return from_parts(self._scheme, netloc, "/".join(parsed), "", "") |
|
|
|
path = "/".join(normalize_path_segments(parsed)) |
|
|
|
if path and path[0] != "/": |
|
path = f"/{path}" |
|
return from_parts(self._scheme, netloc, path, "", "") |
|
|
|
def with_scheme(self, scheme: str) -> "URL": |
|
"""Return a new URL with scheme replaced.""" |
|
|
|
if not isinstance(scheme, str): |
|
raise TypeError("Invalid scheme type") |
|
lower_scheme = scheme.lower() |
|
netloc = self._netloc |
|
if not netloc and lower_scheme in SCHEME_REQUIRES_HOST: |
|
msg = ( |
|
"scheme replacement is not allowed for " |
|
f"relative URLs for the {lower_scheme} scheme" |
|
) |
|
raise ValueError(msg) |
|
return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment) |
|
|
|
def with_user(self, user: Union[str, None]) -> "URL": |
|
"""Return a new URL with user replaced. |
|
|
|
Autoencode user if needed. |
|
|
|
Clear user/password if user is None. |
|
|
|
""" |
|
|
|
if user is None: |
|
password = None |
|
elif isinstance(user, str): |
|
user = QUOTER(user) |
|
password = self.raw_password |
|
else: |
|
raise TypeError("Invalid user type") |
|
if not (netloc := self._netloc): |
|
raise ValueError("user replacement is not allowed for relative URLs") |
|
encoded_host = self.host_subcomponent or "" |
|
netloc = make_netloc(user, password, encoded_host, self.explicit_port) |
|
return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) |
|
|
|
def with_password(self, password: Union[str, None]) -> "URL": |
|
"""Return a new URL with password replaced. |
|
|
|
Autoencode password if needed. |
|
|
|
Clear password if argument is None. |
|
|
|
""" |
|
|
|
if password is None: |
|
pass |
|
elif isinstance(password, str): |
|
password = QUOTER(password) |
|
else: |
|
raise TypeError("Invalid password type") |
|
if not (netloc := self._netloc): |
|
raise ValueError("password replacement is not allowed for relative URLs") |
|
encoded_host = self.host_subcomponent or "" |
|
port = self.explicit_port |
|
netloc = make_netloc(self.raw_user, password, encoded_host, port) |
|
return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) |
|
|
|
def with_host(self, host: str) -> "URL": |
|
"""Return a new URL with host replaced. |
|
|
|
Autoencode host if needed. |
|
|
|
Changing host for relative URLs is not allowed, use .join() |
|
instead. |
|
|
|
""" |
|
|
|
if not isinstance(host, str): |
|
raise TypeError("Invalid host type") |
|
if not (netloc := self._netloc): |
|
raise ValueError("host replacement is not allowed for relative URLs") |
|
if not host: |
|
raise ValueError("host removing is not allowed") |
|
encoded_host = _encode_host(host, validate_host=True) if host else "" |
|
port = self.explicit_port |
|
netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) |
|
return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) |
|
|
|
def with_port(self, port: Union[int, None]) -> "URL": |
|
"""Return a new URL with port replaced. |
|
|
|
Clear port to default if None is passed. |
|
|
|
""" |
|
|
|
if port is not None: |
|
if isinstance(port, bool) or not isinstance(port, int): |
|
raise TypeError(f"port should be int or None, got {type(port)}") |
|
if not (0 <= port <= 65535): |
|
raise ValueError(f"port must be between 0 and 65535, got {port}") |
|
if not (netloc := self._netloc): |
|
raise ValueError("port replacement is not allowed for relative URLs") |
|
encoded_host = self.host_subcomponent or "" |
|
netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) |
|
return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) |
|
|
|
def with_path( |
|
self, |
|
path: str, |
|
*, |
|
encoded: bool = False, |
|
keep_query: bool = False, |
|
keep_fragment: bool = False, |
|
) -> "URL": |
|
"""Return a new URL with path replaced.""" |
|
netloc = self._netloc |
|
if not encoded: |
|
path = PATH_QUOTER(path) |
|
if netloc: |
|
path = normalize_path(path) if "." in path else path |
|
if path and path[0] != "/": |
|
path = f"/{path}" |
|
query = self._query if keep_query else "" |
|
fragment = self._fragment if keep_fragment else "" |
|
return from_parts(self._scheme, netloc, path, query, fragment) |
|
|
|
@overload |
|
def with_query(self, query: Query) -> "URL": ... |
|
|
|
@overload |
|
def with_query(self, **kwargs: QueryVariable) -> "URL": ... |
|
|
|
def with_query(self, *args: Any, **kwargs: Any) -> "URL": |
|
"""Return a new URL with query part replaced. |
|
|
|
Accepts any Mapping (e.g. dict, multidict.MultiDict instances) |
|
or str, autoencode the argument if needed. |
|
|
|
A sequence of (key, value) pairs is supported as well. |
|
|
|
It also can take an arbitrary number of keyword arguments. |
|
|
|
Clear query if None is passed. |
|
|
|
""" |
|
|
|
query = get_str_query(*args, **kwargs) or "" |
|
return from_parts_uncached( |
|
self._scheme, self._netloc, self._path, query, self._fragment |
|
) |
|
|
|
@overload |
|
def extend_query(self, query: Query) -> "URL": ... |
|
|
|
@overload |
|
def extend_query(self, **kwargs: QueryVariable) -> "URL": ... |
|
|
|
def extend_query(self, *args: Any, **kwargs: Any) -> "URL": |
|
"""Return a new URL with query part combined with the existing. |
|
|
|
This method will not remove existing query parameters. |
|
|
|
Example: |
|
>>> url = URL('http://example.com/?a=1&b=2') |
|
>>> url.extend_query(a=3, c=4) |
|
URL('http://example.com/?a=1&b=2&a=3&c=4') |
|
""" |
|
if not (new_query := get_str_query(*args, **kwargs)): |
|
return self |
|
if query := self._query: |
|
|
|
|
|
query += new_query if query[-1] == "&" else f"&{new_query}" |
|
else: |
|
query = new_query |
|
return from_parts_uncached( |
|
self._scheme, self._netloc, self._path, query, self._fragment |
|
) |
|
|
|
@overload |
|
def update_query(self, query: Query) -> "URL": ... |
|
|
|
@overload |
|
def update_query(self, **kwargs: QueryVariable) -> "URL": ... |
|
|
|
def update_query(self, *args: Any, **kwargs: Any) -> "URL": |
|
"""Return a new URL with query part updated. |
|
|
|
This method will overwrite existing query parameters. |
|
|
|
Example: |
|
>>> url = URL('http://example.com/?a=1&b=2') |
|
>>> url.update_query(a=3, c=4) |
|
URL('http://example.com/?a=3&b=2&c=4') |
|
""" |
|
in_query: Union[str, Mapping[str, QueryVariable], None] |
|
if kwargs: |
|
if args: |
|
msg = "Either kwargs or single query parameter must be present" |
|
raise ValueError(msg) |
|
in_query = kwargs |
|
elif len(args) == 1: |
|
in_query = args[0] |
|
else: |
|
raise ValueError("Either kwargs or single query parameter must be present") |
|
|
|
if in_query is None: |
|
query = "" |
|
elif not in_query: |
|
query = self._query |
|
elif isinstance(in_query, Mapping): |
|
qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query) |
|
qm.update(in_query) |
|
query = get_str_query_from_sequence_iterable(qm.items()) |
|
elif isinstance(in_query, str): |
|
qstr: MultiDict[str] = MultiDict(self._parsed_query) |
|
qstr.update(query_to_pairs(in_query)) |
|
query = get_str_query_from_iterable(qstr.items()) |
|
elif isinstance(in_query, (bytes, bytearray, memoryview)): |
|
msg = "Invalid query type: bytes, bytearray and memoryview are forbidden" |
|
raise TypeError(msg) |
|
elif isinstance(in_query, Sequence): |
|
|
|
|
|
|
|
|
|
qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query) |
|
qs.update(in_query) |
|
query = get_str_query_from_iterable(qs.items()) |
|
else: |
|
raise TypeError( |
|
"Invalid query type: only str, mapping or " |
|
"sequence of (key, value) pairs is allowed" |
|
) |
|
return from_parts_uncached( |
|
self._scheme, self._netloc, self._path, query, self._fragment |
|
) |
|
|
|
def without_query_params(self, *query_params: str) -> "URL": |
|
"""Remove some keys from query part and return new URL.""" |
|
params_to_remove = set(query_params) & self.query.keys() |
|
if not params_to_remove: |
|
return self |
|
return self.with_query( |
|
tuple( |
|
(name, value) |
|
for name, value in self.query.items() |
|
if name not in params_to_remove |
|
) |
|
) |
|
|
|
def with_fragment(self, fragment: Union[str, None]) -> "URL": |
|
"""Return a new URL with fragment replaced. |
|
|
|
Autoencode fragment if needed. |
|
|
|
Clear fragment to default if None is passed. |
|
|
|
""" |
|
|
|
if fragment is None: |
|
raw_fragment = "" |
|
elif not isinstance(fragment, str): |
|
raise TypeError("Invalid fragment type") |
|
else: |
|
raw_fragment = FRAGMENT_QUOTER(fragment) |
|
if self._fragment == raw_fragment: |
|
return self |
|
return from_parts( |
|
self._scheme, self._netloc, self._path, self._query, raw_fragment |
|
) |
|
|
|
def with_name( |
|
self, |
|
name: str, |
|
*, |
|
keep_query: bool = False, |
|
keep_fragment: bool = False, |
|
) -> "URL": |
|
"""Return a new URL with name (last part of path) replaced. |
|
|
|
Query and fragment parts are cleaned up. |
|
|
|
Name is encoded if needed. |
|
|
|
""" |
|
|
|
if not isinstance(name, str): |
|
raise TypeError("Invalid name type") |
|
if "/" in name: |
|
raise ValueError("Slash in name is not allowed") |
|
name = PATH_QUOTER(name) |
|
if name in (".", ".."): |
|
raise ValueError(". and .. values are forbidden") |
|
parts = list(self.raw_parts) |
|
if netloc := self._netloc: |
|
if len(parts) == 1: |
|
parts.append(name) |
|
else: |
|
parts[-1] = name |
|
parts[0] = "" |
|
else: |
|
parts[-1] = name |
|
if parts[0] == "/": |
|
parts[0] = "" |
|
|
|
query = self._query if keep_query else "" |
|
fragment = self._fragment if keep_fragment else "" |
|
return from_parts(self._scheme, netloc, "/".join(parts), query, fragment) |
|
|
|
def with_suffix( |
|
self, |
|
suffix: str, |
|
*, |
|
keep_query: bool = False, |
|
keep_fragment: bool = False, |
|
) -> "URL": |
|
"""Return a new URL with suffix (file extension of name) replaced. |
|
|
|
Query and fragment parts are cleaned up. |
|
|
|
suffix is encoded if needed. |
|
""" |
|
if not isinstance(suffix, str): |
|
raise TypeError("Invalid suffix type") |
|
if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix: |
|
raise ValueError(f"Invalid suffix {suffix!r}") |
|
name = self.raw_name |
|
if not name: |
|
raise ValueError(f"{self!r} has an empty name") |
|
old_suffix = self.raw_suffix |
|
suffix = PATH_QUOTER(suffix) |
|
name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix |
|
if name in (".", ".."): |
|
raise ValueError(". and .. values are forbidden") |
|
parts = list(self.raw_parts) |
|
if netloc := self._netloc: |
|
if len(parts) == 1: |
|
parts.append(name) |
|
else: |
|
parts[-1] = name |
|
parts[0] = "" |
|
else: |
|
parts[-1] = name |
|
if parts[0] == "/": |
|
parts[0] = "" |
|
|
|
query = self._query if keep_query else "" |
|
fragment = self._fragment if keep_fragment else "" |
|
return from_parts(self._scheme, netloc, "/".join(parts), query, fragment) |
|
|
|
def join(self, url: "URL") -> "URL": |
|
"""Join URLs |
|
|
|
Construct a full (βabsoluteβ) URL by combining a βbase URLβ |
|
(self) with another URL (url). |
|
|
|
Informally, this uses components of the base URL, in |
|
particular the addressing scheme, the network location and |
|
(part of) the path, to provide missing components in the |
|
relative URL. |
|
|
|
""" |
|
if type(url) is not URL: |
|
raise TypeError("url should be URL") |
|
|
|
scheme = url._scheme or self._scheme |
|
if scheme != self._scheme or scheme not in USES_RELATIVE: |
|
return url |
|
|
|
|
|
if (join_netloc := url._netloc) and scheme in USES_AUTHORITY: |
|
return from_parts(scheme, join_netloc, url._path, url._query, url._fragment) |
|
|
|
orig_path = self._path |
|
if join_path := url._path: |
|
if join_path[0] == "/": |
|
path = join_path |
|
elif not orig_path: |
|
path = f"/{join_path}" |
|
elif orig_path[-1] == "/": |
|
path = f"{orig_path}{join_path}" |
|
else: |
|
|
|
|
|
|
|
|
|
path = "/".join([*self.parts[:-1], ""]) + join_path |
|
|
|
if orig_path[0] == "/": |
|
path = path[1:] |
|
path = normalize_path(path) if "." in path else path |
|
else: |
|
path = orig_path |
|
|
|
return from_parts( |
|
scheme, |
|
self._netloc, |
|
path, |
|
url._query if join_path or url._query else self._query, |
|
url._fragment if join_path or url._fragment else self._fragment, |
|
) |
|
|
|
def joinpath(self, *other: str, encoded: bool = False) -> "URL": |
|
"""Return a new URL with the elements in other appended to the path.""" |
|
return self._make_child(other, encoded=encoded) |
|
|
|
def human_repr(self) -> str: |
|
"""Return decoded human readable string for URL representation.""" |
|
user = human_quote(self.user, "#/:?@[]") |
|
password = human_quote(self.password, "#/:?@[]") |
|
if (host := self.host) and ":" in host: |
|
host = f"[{host}]" |
|
path = human_quote(self.path, "#?") |
|
if TYPE_CHECKING: |
|
assert path is not None |
|
query_string = "&".join( |
|
"{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;=")) |
|
for k, v in self.query.items() |
|
) |
|
fragment = human_quote(self.fragment, "") |
|
if TYPE_CHECKING: |
|
assert fragment is not None |
|
netloc = make_netloc(user, password, host, self.explicit_port) |
|
return unsplit_result(self._scheme, netloc, path, query_string, fragment) |
|
|
|
|
|
_DEFAULT_IDNA_SIZE = 256 |
|
_DEFAULT_ENCODE_SIZE = 512 |
|
|
|
|
|
@lru_cache(_DEFAULT_IDNA_SIZE) |
|
def _idna_decode(raw: str) -> str: |
|
try: |
|
return idna.decode(raw.encode("ascii")) |
|
except UnicodeError: |
|
return raw.encode("ascii").decode("idna") |
|
|
|
|
|
@lru_cache(_DEFAULT_IDNA_SIZE) |
|
def _idna_encode(host: str) -> str: |
|
try: |
|
return idna.encode(host, uts46=True).decode("ascii") |
|
except UnicodeError: |
|
return host.encode("idna").decode("ascii") |
|
|
|
|
|
@lru_cache(_DEFAULT_ENCODE_SIZE) |
|
def _encode_host(host: str, validate_host: bool) -> str: |
|
"""Encode host part of URL.""" |
|
|
|
|
|
if host and (host[-1].isdigit() or ":" in host): |
|
raw_ip, sep, zone = host.partition("%") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
ip = ip_address(raw_ip) |
|
except ValueError: |
|
pass |
|
else: |
|
|
|
|
|
host = ip.compressed |
|
if ip.version == 6: |
|
return f"[{host}%{zone}]" if sep else f"[{host}]" |
|
return f"{host}%{zone}" if sep else host |
|
|
|
|
|
if host.isascii(): |
|
|
|
|
|
host = host.lower() |
|
if validate_host and (invalid := NOT_REG_NAME.search(host)): |
|
value, pos, extra = invalid.group(), invalid.start(), "" |
|
if value == "@" or (value == ":" and "@" in host[pos:]): |
|
|
|
extra = ( |
|
", if the value includes a username or password, " |
|
"use 'authority' instead of 'host'" |
|
) |
|
raise ValueError( |
|
f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}" |
|
) from None |
|
return host |
|
|
|
return _idna_encode(host) |
|
|
|
|
|
@rewrite_module |
|
def cache_clear() -> None: |
|
"""Clear all LRU caches.""" |
|
_idna_encode.cache_clear() |
|
_idna_decode.cache_clear() |
|
_encode_host.cache_clear() |
|
|
|
|
|
@rewrite_module |
|
def cache_info() -> CacheInfo: |
|
"""Report cache statistics.""" |
|
return { |
|
"idna_encode": _idna_encode.cache_info(), |
|
"idna_decode": _idna_decode.cache_info(), |
|
"ip_address": _encode_host.cache_info(), |
|
"host_validate": _encode_host.cache_info(), |
|
"encode_host": _encode_host.cache_info(), |
|
} |
|
|
|
|
|
@rewrite_module |
|
def cache_configure( |
|
*, |
|
idna_encode_size: Union[int, None] = _DEFAULT_IDNA_SIZE, |
|
idna_decode_size: Union[int, None] = _DEFAULT_IDNA_SIZE, |
|
ip_address_size: Union[int, None, UndefinedType] = UNDEFINED, |
|
host_validate_size: Union[int, None, UndefinedType] = UNDEFINED, |
|
encode_host_size: Union[int, None, UndefinedType] = UNDEFINED, |
|
) -> None: |
|
"""Configure LRU cache sizes.""" |
|
global _idna_decode, _idna_encode, _encode_host |
|
|
|
|
|
if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED: |
|
warnings.warn( |
|
"cache_configure() no longer accepts the " |
|
"ip_address_size or host_validate_size arguments, " |
|
"they are used to set the encode_host_size instead " |
|
"and will be removed in the future", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
|
|
if encode_host_size is not None: |
|
for size in (ip_address_size, host_validate_size): |
|
if size is None: |
|
encode_host_size = None |
|
elif encode_host_size is UNDEFINED: |
|
if size is not UNDEFINED: |
|
encode_host_size = size |
|
elif size is not UNDEFINED: |
|
if TYPE_CHECKING: |
|
assert isinstance(size, int) |
|
assert isinstance(encode_host_size, int) |
|
encode_host_size = max(size, encode_host_size) |
|
if encode_host_size is UNDEFINED: |
|
encode_host_size = _DEFAULT_ENCODE_SIZE |
|
|
|
_encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__) |
|
_idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__) |
|
_idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__) |
|
|