|
import enum |
|
import functools |
|
import reprlib |
|
import sys |
|
from array import array |
|
from collections.abc import ( |
|
ItemsView, |
|
Iterable, |
|
Iterator, |
|
KeysView, |
|
Mapping, |
|
ValuesView, |
|
) |
|
from dataclasses import dataclass |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Any, |
|
ClassVar, |
|
Generic, |
|
NoReturn, |
|
Optional, |
|
TypeVar, |
|
Union, |
|
cast, |
|
overload, |
|
) |
|
|
|
from ._abc import MDArg, MultiMapping, MutableMultiMapping, SupportsKeys |
|
|
|
if sys.version_info >= (3, 11): |
|
from typing import Self |
|
else: |
|
from typing_extensions import Self |
|
|
|
|
|
class istr(str): |
|
"""Case insensitive str.""" |
|
|
|
__is_istr__ = True |
|
__istr_identity__: Optional[str] = None |
|
|
|
|
|
_V = TypeVar("_V") |
|
_T = TypeVar("_T") |
|
|
|
_SENTINEL = enum.Enum("_SENTINEL", "sentinel") |
|
sentinel = _SENTINEL.sentinel |
|
|
|
_version = array("Q", [0]) |
|
|
|
|
|
class _Iter(Generic[_T]): |
|
__slots__ = ("_size", "_iter") |
|
|
|
def __init__(self, size: int, iterator: Iterator[_T]): |
|
self._size = size |
|
self._iter = iterator |
|
|
|
def __iter__(self) -> Self: |
|
return self |
|
|
|
def __next__(self) -> _T: |
|
return next(self._iter) |
|
|
|
def __length_hint__(self) -> int: |
|
return self._size |
|
|
|
|
|
class _ViewBase(Generic[_V]): |
|
def __init__( |
|
self, |
|
md: "MultiDict[_V]", |
|
): |
|
self._md = md |
|
|
|
def __len__(self) -> int: |
|
return len(self._md) |
|
|
|
|
|
class _ItemsView(_ViewBase[_V], ItemsView[str, _V]): |
|
def __contains__(self, item: object) -> bool: |
|
if not isinstance(item, (tuple, list)) or len(item) != 2: |
|
return False |
|
key, value = item |
|
try: |
|
identity = self._md._identity(key) |
|
except TypeError: |
|
return False |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity and value == e.value: |
|
return True |
|
return False |
|
|
|
def __iter__(self) -> _Iter[tuple[str, _V]]: |
|
return _Iter(len(self), self._iter(self._md._version)) |
|
|
|
def _iter(self, version: int) -> Iterator[tuple[str, _V]]: |
|
for e in self._md._keys.iter_entries(): |
|
if version != self._md._version: |
|
raise RuntimeError("Dictionary changed during iteration") |
|
yield self._md._key(e.key), e.value |
|
|
|
@reprlib.recursive_repr() |
|
def __repr__(self) -> str: |
|
lst = [] |
|
for e in self._md._keys.iter_entries(): |
|
lst.append(f"'{e.key}': {e.value!r}") |
|
body = ", ".join(lst) |
|
return f"<{self.__class__.__name__}({body})>" |
|
|
|
def _parse_item( |
|
self, arg: Union[tuple[str, _V], _T] |
|
) -> Optional[tuple[int, str, str, _V]]: |
|
if not isinstance(arg, tuple): |
|
return None |
|
if len(arg) != 2: |
|
return None |
|
try: |
|
identity = self._md._identity(arg[0]) |
|
return (hash(identity), identity, arg[0], arg[1]) |
|
except TypeError: |
|
return None |
|
|
|
def _tmp_set(self, it: Iterable[_T]) -> set[tuple[str, _V]]: |
|
tmp = set() |
|
for arg in it: |
|
item = self._parse_item(arg) |
|
if item is None: |
|
continue |
|
else: |
|
tmp.add((item[1], item[3])) |
|
return tmp |
|
|
|
def __and__(self, other: Iterable[Any]) -> set[tuple[str, _V]]: |
|
ret = set() |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for arg in it: |
|
item = self._parse_item(arg) |
|
if item is None: |
|
continue |
|
hash_, identity, key, value = item |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
e.hash = -1 |
|
if e.identity == identity and e.value == value: |
|
ret.add((e.key, e.value)) |
|
self._md._keys.restore_hash(hash_) |
|
return ret |
|
|
|
def __rand__(self, other: Iterable[_T]) -> set[_T]: |
|
ret = set() |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for arg in it: |
|
item = self._parse_item(arg) |
|
if item is None: |
|
continue |
|
hash_, identity, key, value = item |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity and e.value == value: |
|
ret.add(arg) |
|
break |
|
return ret |
|
|
|
def __or__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]: |
|
ret: set[Union[tuple[str, _V], _T]] = set(self) |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for arg in it: |
|
item: Optional[tuple[int, str, str, _V]] = self._parse_item(arg) |
|
if item is None: |
|
ret.add(arg) |
|
continue |
|
hash_, identity, key, value = item |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity and e.value == value: |
|
break |
|
else: |
|
ret.add(arg) |
|
return ret |
|
|
|
def __ror__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]: |
|
try: |
|
ret: set[Union[tuple[str, _V], _T]] = set(other) |
|
except TypeError: |
|
return NotImplemented |
|
tmp = self._tmp_set(ret) |
|
|
|
for e in self._md._keys.iter_entries(): |
|
if (e.identity, e.value) not in tmp: |
|
ret.add((e.key, e.value)) |
|
return ret |
|
|
|
def __sub__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]: |
|
ret: set[Union[tuple[str, _V], _T]] = set() |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
tmp = self._tmp_set(it) |
|
|
|
for e in self._md._keys.iter_entries(): |
|
if (e.identity, e.value) not in tmp: |
|
ret.add((e.key, e.value)) |
|
|
|
return ret |
|
|
|
def __rsub__(self, other: Iterable[_T]) -> set[_T]: |
|
ret: set[_T] = set() |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for arg in it: |
|
item = self._parse_item(arg) |
|
if item is None: |
|
ret.add(arg) |
|
continue |
|
|
|
hash_, identity, key, value = item |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity and e.value == value: |
|
break |
|
else: |
|
ret.add(arg) |
|
return ret |
|
|
|
def __xor__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]: |
|
try: |
|
rgt = set(other) |
|
except TypeError: |
|
return NotImplemented |
|
ret: set[Union[tuple[str, _V], _T]] = self - rgt |
|
ret |= rgt - self |
|
return ret |
|
|
|
__rxor__ = __xor__ |
|
|
|
def isdisjoint(self, other: Iterable[tuple[str, _V]]) -> bool: |
|
for arg in other: |
|
item = self._parse_item(arg) |
|
if item is None: |
|
continue |
|
|
|
hash_, identity, key, value = item |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity and e.value == value: |
|
return False |
|
return True |
|
|
|
|
|
class _ValuesView(_ViewBase[_V], ValuesView[_V]): |
|
def __contains__(self, value: object) -> bool: |
|
for e in self._md._keys.iter_entries(): |
|
if e.value == value: |
|
return True |
|
return False |
|
|
|
def __iter__(self) -> _Iter[_V]: |
|
return _Iter(len(self), self._iter(self._md._version)) |
|
|
|
def _iter(self, version: int) -> Iterator[_V]: |
|
for e in self._md._keys.iter_entries(): |
|
if version != self._md._version: |
|
raise RuntimeError("Dictionary changed during iteration") |
|
yield e.value |
|
|
|
@reprlib.recursive_repr() |
|
def __repr__(self) -> str: |
|
lst = [] |
|
for e in self._md._keys.iter_entries(): |
|
lst.append(repr(e.value)) |
|
body = ", ".join(lst) |
|
return f"<{self.__class__.__name__}({body})>" |
|
|
|
|
|
class _KeysView(_ViewBase[_V], KeysView[str]): |
|
def __contains__(self, key: object) -> bool: |
|
if not isinstance(key, str): |
|
return False |
|
identity = self._md._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
return True |
|
return False |
|
|
|
def __iter__(self) -> _Iter[str]: |
|
return _Iter(len(self), self._iter(self._md._version)) |
|
|
|
def _iter(self, version: int) -> Iterator[str]: |
|
for e in self._md._keys.iter_entries(): |
|
if version != self._md._version: |
|
raise RuntimeError("Dictionary changed during iteration") |
|
yield self._md._key(e.key) |
|
|
|
def __repr__(self) -> str: |
|
lst = [] |
|
for e in self._md._keys.iter_entries(): |
|
lst.append(f"'{e.key}'") |
|
body = ", ".join(lst) |
|
return f"<{self.__class__.__name__}({body})>" |
|
|
|
def __and__(self, other: Iterable[object]) -> set[str]: |
|
ret = set() |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for key in it: |
|
if not isinstance(key, str): |
|
continue |
|
identity = self._md._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
ret.add(e.key) |
|
break |
|
return ret |
|
|
|
def __rand__(self, other: Iterable[_T]) -> set[_T]: |
|
ret = set() |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for key in it: |
|
if not isinstance(key, str): |
|
continue |
|
if key in self._md: |
|
ret.add(key) |
|
return cast(set[_T], ret) |
|
|
|
def __or__(self, other: Iterable[_T]) -> set[Union[str, _T]]: |
|
ret: set[Union[str, _T]] = set(self) |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for key in it: |
|
if not isinstance(key, str): |
|
ret.add(key) |
|
continue |
|
if key not in self._md: |
|
ret.add(key) |
|
return ret |
|
|
|
def __ror__(self, other: Iterable[_T]) -> set[Union[str, _T]]: |
|
try: |
|
ret: set[Union[str, _T]] = set(other) |
|
except TypeError: |
|
return NotImplemented |
|
|
|
tmp = set() |
|
for key in ret: |
|
if not isinstance(key, str): |
|
continue |
|
identity = self._md._identity(key) |
|
tmp.add(identity) |
|
|
|
for e in self._md._keys.iter_entries(): |
|
if e.identity not in tmp: |
|
ret.add(e.key) |
|
return ret |
|
|
|
def __sub__(self, other: Iterable[object]) -> set[str]: |
|
ret = set(self) |
|
try: |
|
it = iter(other) |
|
except TypeError: |
|
return NotImplemented |
|
for key in it: |
|
if not isinstance(key, str): |
|
continue |
|
identity = self._md._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._md._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
ret.discard(e.key) |
|
break |
|
return ret |
|
|
|
def __rsub__(self, other: Iterable[_T]) -> set[_T]: |
|
try: |
|
ret: set[_T] = set(other) |
|
except TypeError: |
|
return NotImplemented |
|
for key in other: |
|
if not isinstance(key, str): |
|
continue |
|
if key in self._md: |
|
ret.discard(key) |
|
return ret |
|
|
|
def __xor__(self, other: Iterable[_T]) -> set[Union[str, _T]]: |
|
try: |
|
rgt = set(other) |
|
except TypeError: |
|
return NotImplemented |
|
ret: set[Union[str, _T]] = self - rgt |
|
ret |= rgt - self |
|
return ret |
|
|
|
__rxor__ = __xor__ |
|
|
|
def isdisjoint(self, other: Iterable[object]) -> bool: |
|
for key in other: |
|
if not isinstance(key, str): |
|
continue |
|
if key in self._md: |
|
return False |
|
return True |
|
|
|
|
|
class _CSMixin: |
|
_ci: ClassVar[bool] = False |
|
|
|
def _key(self, key: str) -> str: |
|
return key |
|
|
|
def _identity(self, key: str) -> str: |
|
if isinstance(key, str): |
|
return key |
|
else: |
|
raise TypeError("MultiDict keys should be either str or subclasses of str") |
|
|
|
|
|
class _CIMixin: |
|
_ci: ClassVar[bool] = True |
|
|
|
def _key(self, key: str) -> str: |
|
if type(key) is istr: |
|
return key |
|
else: |
|
return istr(key) |
|
|
|
def _identity(self, key: str) -> str: |
|
if isinstance(key, istr): |
|
ret = key.__istr_identity__ |
|
if ret is None: |
|
ret = key.title() |
|
key.__istr_identity__ = ret |
|
return ret |
|
if isinstance(key, str): |
|
return key.title() |
|
else: |
|
raise TypeError("MultiDict keys should be either str or subclasses of str") |
|
|
|
|
|
def estimate_log2_keysize(n: int) -> int: |
|
|
|
return (((n * 3 + 1) // 2) | 7).bit_length() |
|
|
|
|
|
@dataclass |
|
class _Entry(Generic[_V]): |
|
hash: int |
|
identity: str |
|
key: str |
|
value: _V |
|
|
|
|
|
@dataclass |
|
class _HtKeys(Generic[_V]): |
|
LOG_MINSIZE: ClassVar[int] = 3 |
|
MINSIZE: ClassVar[int] = 8 |
|
PREALLOCATED_INDICES: ClassVar[dict[int, array]] = { |
|
log2_size: array( |
|
"b" if log2_size < 8 else "h", (-1 for i in range(1 << log2_size)) |
|
) |
|
for log2_size in range(3, 10) |
|
} |
|
|
|
log2_size: int |
|
usable: int |
|
|
|
indices: array |
|
entries: list[Optional[_Entry[_V]]] |
|
|
|
@functools.cached_property |
|
def nslots(self) -> int: |
|
return 1 << self.log2_size |
|
|
|
@functools.cached_property |
|
def mask(self) -> int: |
|
return self.nslots - 1 |
|
|
|
if sys.implementation.name != "pypy": |
|
|
|
def __sizeof__(self) -> int: |
|
return ( |
|
object.__sizeof__(self) |
|
+ sys.getsizeof(self.indices) |
|
+ sys.getsizeof(self.entries) |
|
) |
|
|
|
@classmethod |
|
def new(cls, log2_size: int, entries: list[Optional[_Entry[_V]]]) -> Self: |
|
size = 1 << log2_size |
|
usable = (size << 1) // 3 |
|
if log2_size < 10: |
|
indices = cls.PREALLOCATED_INDICES[log2_size].__copy__() |
|
elif log2_size < 16: |
|
indices = array("h", (-1 for i in range(size))) |
|
elif log2_size < 32: |
|
indices = array("l", (-1 for i in range(size))) |
|
else: |
|
indices = array("q", (-1 for i in range(size))) |
|
ret = cls( |
|
log2_size=log2_size, |
|
usable=usable, |
|
indices=indices, |
|
entries=entries, |
|
) |
|
return ret |
|
|
|
def clone(self) -> "_HtKeys[_V]": |
|
entries = [ |
|
_Entry(e.hash, e.identity, e.key, e.value) if e is not None else None |
|
for e in self.entries |
|
] |
|
|
|
return _HtKeys( |
|
log2_size=self.log2_size, |
|
usable=self.usable, |
|
indices=self.indices.__copy__(), |
|
entries=entries, |
|
) |
|
|
|
def build_indices(self, update: bool) -> None: |
|
mask = self.mask |
|
indices = self.indices |
|
for idx, e in enumerate(self.entries): |
|
assert e is not None |
|
hash_ = e.hash |
|
if update: |
|
if hash_ == -1: |
|
hash_ = hash(e.identity) |
|
else: |
|
assert hash_ != -1 |
|
i = hash_ & mask |
|
perturb = hash_ & sys.maxsize |
|
while indices[i] != -1: |
|
perturb >>= 5 |
|
i = mask & (i * 5 + perturb + 1) |
|
indices[i] = idx |
|
|
|
def find_empty_slot(self, hash_: int) -> int: |
|
mask = self.mask |
|
indices = self.indices |
|
i = hash_ & mask |
|
perturb = hash_ & sys.maxsize |
|
ix = indices[i] |
|
while ix != -1: |
|
perturb >>= 5 |
|
i = (i * 5 + perturb + 1) & mask |
|
ix = indices[i] |
|
return i |
|
|
|
def iter_hash(self, hash_: int) -> Iterator[tuple[int, int, _Entry[_V]]]: |
|
mask = self.mask |
|
indices = self.indices |
|
entries = self.entries |
|
i = hash_ & mask |
|
perturb = hash_ & sys.maxsize |
|
ix = indices[i] |
|
while ix != -1: |
|
if ix != -2: |
|
e = entries[ix] |
|
if e.hash == hash_: |
|
yield i, ix, e |
|
perturb >>= 5 |
|
i = (i * 5 + perturb + 1) & mask |
|
ix = indices[i] |
|
|
|
def del_idx(self, hash_: int, idx: int) -> None: |
|
mask = self.mask |
|
indices = self.indices |
|
i = hash_ & mask |
|
perturb = hash_ & sys.maxsize |
|
ix = indices[i] |
|
while ix != idx: |
|
perturb >>= 5 |
|
i = (i * 5 + perturb + 1) & mask |
|
ix = indices[i] |
|
indices[i] = -2 |
|
|
|
def iter_entries(self) -> Iterator[_Entry[_V]]: |
|
return filter(None, self.entries) |
|
|
|
def restore_hash(self, hash_: int) -> None: |
|
mask = self.mask |
|
indices = self.indices |
|
entries = self.entries |
|
i = hash_ & mask |
|
perturb = hash_ & sys.maxsize |
|
ix = indices[i] |
|
while ix != -1: |
|
if ix != -2: |
|
entry = entries[ix] |
|
if entry.hash == -1: |
|
entry.hash = hash_ |
|
perturb >>= 5 |
|
i = (i * 5 + perturb + 1) & mask |
|
ix = indices[i] |
|
|
|
|
|
class MultiDict(_CSMixin, MutableMultiMapping[_V]): |
|
"""Dictionary with the support for duplicate keys.""" |
|
|
|
__slots__ = ("_keys", "_used", "_version") |
|
|
|
def __init__(self, arg: MDArg[_V] = None, /, **kwargs: _V): |
|
self._used = 0 |
|
v = _version |
|
v[0] += 1 |
|
self._version = v[0] |
|
if not kwargs: |
|
md = None |
|
if isinstance(arg, MultiDictProxy): |
|
md = arg._md |
|
elif isinstance(arg, MultiDict): |
|
md = arg |
|
if md is not None and md._ci is self._ci: |
|
self._from_md(md) |
|
return |
|
|
|
items = self._parse_args(arg, kwargs) |
|
log2_size = estimate_log2_keysize(len(items)) |
|
if log2_size > 17: |
|
|
|
log2_size = 17 |
|
self._keys: _HtKeys[_V] = _HtKeys.new(log2_size, []) |
|
self._extend_items(items) |
|
|
|
def _from_md(self, md: "MultiDict[_V]") -> None: |
|
|
|
|
|
self._keys = md._keys.clone() |
|
self._used = md._used |
|
|
|
@overload |
|
def getall(self, key: str) -> list[_V]: ... |
|
@overload |
|
def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ... |
|
def getall( |
|
self, key: str, default: Union[_T, _SENTINEL] = sentinel |
|
) -> Union[list[_V], _T]: |
|
"""Return a list of all values matching the key.""" |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
res = [] |
|
|
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
res.append(e.value) |
|
e.hash = -1 |
|
self._keys.restore_hash(hash_) |
|
|
|
if res: |
|
return res |
|
if not res and default is not sentinel: |
|
return default |
|
raise KeyError("Key not found: %r" % key) |
|
|
|
@overload |
|
def getone(self, key: str) -> _V: ... |
|
@overload |
|
def getone(self, key: str, default: _T) -> Union[_V, _T]: ... |
|
def getone( |
|
self, key: str, default: Union[_T, _SENTINEL] = sentinel |
|
) -> Union[_V, _T]: |
|
"""Get first value matching the key. |
|
|
|
Raises KeyError if the key is not found and no default is provided. |
|
""" |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
return e.value |
|
if default is not sentinel: |
|
return default |
|
raise KeyError("Key not found: %r" % key) |
|
|
|
|
|
|
|
def __getitem__(self, key: str) -> _V: |
|
return self.getone(key) |
|
|
|
@overload |
|
def get(self, key: str, /) -> Union[_V, None]: ... |
|
@overload |
|
def get(self, key: str, /, default: _T) -> Union[_V, _T]: ... |
|
def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]: |
|
"""Get first value matching the key. |
|
|
|
If the key is not found, returns the default (or None if no default is provided) |
|
""" |
|
return self.getone(key, default) |
|
|
|
def __iter__(self) -> Iterator[str]: |
|
return iter(self.keys()) |
|
|
|
def __len__(self) -> int: |
|
return self._used |
|
|
|
def keys(self) -> KeysView[str]: |
|
"""Return a new view of the dictionary's keys.""" |
|
return _KeysView(self) |
|
|
|
def items(self) -> ItemsView[str, _V]: |
|
"""Return a new view of the dictionary's items *(key, value) pairs).""" |
|
return _ItemsView(self) |
|
|
|
def values(self) -> _ValuesView[_V]: |
|
"""Return a new view of the dictionary's values.""" |
|
return _ValuesView(self) |
|
|
|
def __eq__(self, other: object) -> bool: |
|
if not isinstance(other, Mapping): |
|
return NotImplemented |
|
if isinstance(other, MultiDictProxy): |
|
return self == other._md |
|
if isinstance(other, MultiDict): |
|
lft = self._keys |
|
rht = other._keys |
|
if self._used != other._used: |
|
return False |
|
for e1, e2 in zip(lft.iter_entries(), rht.iter_entries()): |
|
if e1.identity != e2.identity or e1.value != e2.value: |
|
return False |
|
return True |
|
if self._used != len(other): |
|
return False |
|
for k, v in self.items(): |
|
nv = other.get(k, sentinel) |
|
if v != nv: |
|
return False |
|
return True |
|
|
|
def __contains__(self, key: object) -> bool: |
|
if not isinstance(key, str): |
|
return False |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
return True |
|
return False |
|
|
|
@reprlib.recursive_repr() |
|
def __repr__(self) -> str: |
|
body = ", ".join(f"'{e.key}': {e.value!r}" for e in self._keys.iter_entries()) |
|
return f"<{self.__class__.__name__}({body})>" |
|
|
|
if sys.implementation.name != "pypy": |
|
|
|
def __sizeof__(self) -> int: |
|
return object.__sizeof__(self) + sys.getsizeof(self._keys) |
|
|
|
def __reduce__(self) -> tuple[type[Self], tuple[list[tuple[str, _V]]]]: |
|
return (self.__class__, (list(self.items()),)) |
|
|
|
def add(self, key: str, value: _V) -> None: |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
self._add_with_hash(_Entry(hash_, identity, key, value)) |
|
self._incr_version() |
|
|
|
def copy(self) -> Self: |
|
"""Return a copy of itself.""" |
|
cls = self.__class__ |
|
return cls(self) |
|
|
|
__copy__ = copy |
|
|
|
def extend(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None: |
|
"""Extend current MultiDict with more values. |
|
|
|
This method must be used instead of update. |
|
""" |
|
items = self._parse_args(arg, kwargs) |
|
newsize = self._used + len(items) |
|
self._resize(estimate_log2_keysize(newsize), False) |
|
self._extend_items(items) |
|
|
|
def _parse_args( |
|
self, |
|
arg: MDArg[_V], |
|
kwargs: Mapping[str, _V], |
|
) -> list[_Entry[_V]]: |
|
identity_func = self._identity |
|
if arg: |
|
if isinstance(arg, MultiDictProxy): |
|
arg = arg._md |
|
if isinstance(arg, MultiDict): |
|
if self._ci is not arg._ci: |
|
items = [] |
|
for e in arg._keys.iter_entries(): |
|
identity = identity_func(e.key) |
|
items.append(_Entry(hash(identity), identity, e.key, e.value)) |
|
else: |
|
items = [ |
|
_Entry(e.hash, e.identity, e.key, e.value) |
|
for e in arg._keys.iter_entries() |
|
] |
|
if kwargs: |
|
for key, value in kwargs.items(): |
|
identity = identity_func(key) |
|
items.append(_Entry(hash(identity), identity, key, value)) |
|
else: |
|
if hasattr(arg, "keys"): |
|
arg = cast(SupportsKeys[_V], arg) |
|
arg = [(k, arg[k]) for k in arg.keys()] |
|
if kwargs: |
|
arg = list(arg) |
|
arg.extend(list(kwargs.items())) |
|
items = [] |
|
for pos, item in enumerate(arg): |
|
if not len(item) == 2: |
|
raise ValueError( |
|
f"multidict update sequence element #{pos}" |
|
f"has length {len(item)}; 2 is required" |
|
) |
|
identity = identity_func(item[0]) |
|
items.append(_Entry(hash(identity), identity, item[0], item[1])) |
|
else: |
|
items = [] |
|
for key, value in kwargs.items(): |
|
identity = identity_func(key) |
|
items.append(_Entry(hash(identity), identity, key, value)) |
|
|
|
return items |
|
|
|
def _extend_items(self, items: Iterable[_Entry[_V]]) -> None: |
|
for e in items: |
|
self._add_with_hash(e) |
|
self._incr_version() |
|
|
|
def clear(self) -> None: |
|
"""Remove all items from MultiDict.""" |
|
self._used = 0 |
|
self._keys = _HtKeys.new(_HtKeys.LOG_MINSIZE, []) |
|
self._incr_version() |
|
|
|
|
|
|
|
def __setitem__(self, key: str, value: _V) -> None: |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
found = False |
|
|
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
if not found: |
|
e.key = key |
|
e.value = value |
|
e.hash = -1 |
|
found = True |
|
self._incr_version() |
|
elif e.hash != -1: |
|
self._del_at(slot, idx) |
|
|
|
if not found: |
|
self._add_with_hash(_Entry(hash_, identity, key, value)) |
|
else: |
|
self._keys.restore_hash(hash_) |
|
|
|
def __delitem__(self, key: str) -> None: |
|
found = False |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
self._del_at(slot, idx) |
|
found = True |
|
if not found: |
|
raise KeyError(key) |
|
else: |
|
self._incr_version() |
|
|
|
@overload |
|
def setdefault( |
|
self: "MultiDict[Union[_T, None]]", key: str, default: None = None |
|
) -> Union[_T, None]: ... |
|
@overload |
|
def setdefault(self, key: str, default: _V) -> _V: ... |
|
def setdefault(self, key: str, default: Union[_V, None] = None) -> Union[_V, None]: |
|
"""Return value for key, set value to default if key is not present.""" |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
return e.value |
|
self.add(key, default) |
|
return default |
|
|
|
@overload |
|
def popone(self, key: str) -> _V: ... |
|
@overload |
|
def popone(self, key: str, default: _T) -> Union[_V, _T]: ... |
|
def popone( |
|
self, key: str, default: Union[_T, _SENTINEL] = sentinel |
|
) -> Union[_V, _T]: |
|
"""Remove specified key and return the corresponding value. |
|
|
|
If key is not found, d is returned if given, otherwise |
|
KeyError is raised. |
|
|
|
""" |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
value = e.value |
|
self._del_at(slot, idx) |
|
self._incr_version() |
|
return value |
|
if default is sentinel: |
|
raise KeyError(key) |
|
else: |
|
return default |
|
|
|
|
|
if not TYPE_CHECKING: |
|
pop = popone |
|
|
|
@overload |
|
def popall(self, key: str) -> list[_V]: ... |
|
@overload |
|
def popall(self, key: str, default: _T) -> Union[list[_V], _T]: ... |
|
def popall( |
|
self, key: str, default: Union[_T, _SENTINEL] = sentinel |
|
) -> Union[list[_V], _T]: |
|
"""Remove all occurrences of key and return the list of corresponding |
|
values. |
|
|
|
If key is not found, default is returned if given, otherwise |
|
KeyError is raised. |
|
|
|
""" |
|
found = False |
|
identity = self._identity(key) |
|
hash_ = hash(identity) |
|
ret = [] |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
found = True |
|
ret.append(e.value) |
|
self._del_at(slot, idx) |
|
self._incr_version() |
|
|
|
if not found: |
|
if default is sentinel: |
|
raise KeyError(key) |
|
else: |
|
return default |
|
else: |
|
return ret |
|
|
|
def popitem(self) -> tuple[str, _V]: |
|
"""Remove and return an arbitrary (key, value) pair.""" |
|
if self._used <= 0: |
|
raise KeyError("empty multidict") |
|
|
|
pos = len(self._keys.entries) - 1 |
|
entry = self._keys.entries.pop() |
|
|
|
while entry is None: |
|
pos -= 1 |
|
entry = self._keys.entries.pop() |
|
|
|
ret = self._key(entry.key), entry.value |
|
self._keys.del_idx(entry.hash, pos) |
|
self._used -= 1 |
|
self._incr_version() |
|
return ret |
|
|
|
def update(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None: |
|
"""Update the dictionary from *other*, overwriting existing keys.""" |
|
items = self._parse_args(arg, kwargs) |
|
newsize = self._used + len(items) |
|
log2_size = estimate_log2_keysize(newsize) |
|
if log2_size > 17: |
|
|
|
|
|
log2_size = 17 |
|
if log2_size > self._keys.log2_size: |
|
self._resize(log2_size, False) |
|
self._update_items(items) |
|
|
|
def _update_items(self, items: list[_Entry[_V]]) -> None: |
|
if not items: |
|
return |
|
for entry in items: |
|
found = False |
|
hash_ = entry.hash |
|
identity = entry.identity |
|
for slot, idx, e in self._keys.iter_hash(hash_): |
|
if e.identity == identity: |
|
if not found: |
|
found = True |
|
e.key = entry.key |
|
e.value = entry.value |
|
e.hash = -1 |
|
else: |
|
self._del_at_for_upd(e) |
|
if not found: |
|
self._add_with_hash_for_upd(entry) |
|
|
|
keys = self._keys |
|
indices = keys.indices |
|
entries = keys.entries |
|
for slot in range(keys.nslots): |
|
idx = indices[slot] |
|
if idx >= 0: |
|
e2 = entries[idx] |
|
assert e2 is not None |
|
if e2.key is None: |
|
entries[idx] = None |
|
indices[slot] = -2 |
|
self._used -= 1 |
|
if e2.hash == -1: |
|
e2.hash = hash(e2.identity) |
|
|
|
self._incr_version() |
|
|
|
def _incr_version(self) -> None: |
|
v = _version |
|
v[0] += 1 |
|
self._version = v[0] |
|
|
|
def _resize(self, log2_newsize: int, update: bool) -> None: |
|
oldkeys = self._keys |
|
newentries = self._used |
|
|
|
if len(oldkeys.entries) == newentries: |
|
entries = oldkeys.entries |
|
else: |
|
entries = [e for e in oldkeys.entries if e is not None] |
|
newkeys: _HtKeys[_V] = _HtKeys.new(log2_newsize, entries) |
|
newkeys.usable -= newentries |
|
newkeys.build_indices(update) |
|
self._keys = newkeys |
|
|
|
def _add_with_hash(self, entry: _Entry[_V]) -> None: |
|
if self._keys.usable <= 0: |
|
self._resize((self._used * 3 | _HtKeys.MINSIZE - 1).bit_length(), False) |
|
keys = self._keys |
|
slot = keys.find_empty_slot(entry.hash) |
|
keys.indices[slot] = len(keys.entries) |
|
keys.entries.append(entry) |
|
self._incr_version() |
|
self._used += 1 |
|
keys.usable -= 1 |
|
|
|
def _add_with_hash_for_upd(self, entry: _Entry[_V]) -> None: |
|
if self._keys.usable <= 0: |
|
self._resize((self._used * 3 | _HtKeys.MINSIZE - 1).bit_length(), True) |
|
keys = self._keys |
|
slot = keys.find_empty_slot(entry.hash) |
|
keys.indices[slot] = len(keys.entries) |
|
entry.hash = -1 |
|
keys.entries.append(entry) |
|
self._incr_version() |
|
self._used += 1 |
|
keys.usable -= 1 |
|
|
|
def _del_at(self, slot: int, idx: int) -> None: |
|
self._keys.entries[idx] = None |
|
self._keys.indices[slot] = -2 |
|
self._used -= 1 |
|
|
|
def _del_at_for_upd(self, entry: _Entry[_V]) -> None: |
|
entry.key = None |
|
entry.value = None |
|
|
|
|
|
class CIMultiDict(_CIMixin, MultiDict[_V]): |
|
"""Dictionary with the support for duplicate case-insensitive keys.""" |
|
|
|
|
|
class MultiDictProxy(_CSMixin, MultiMapping[_V]): |
|
"""Read-only proxy for MultiDict instance.""" |
|
|
|
__slots__ = ("_md",) |
|
|
|
_md: MultiDict[_V] |
|
|
|
def __init__(self, arg: Union[MultiDict[_V], "MultiDictProxy[_V]"]): |
|
if not isinstance(arg, (MultiDict, MultiDictProxy)): |
|
raise TypeError( |
|
"ctor requires MultiDict or MultiDictProxy instance" |
|
f", not {type(arg)}" |
|
) |
|
if isinstance(arg, MultiDictProxy): |
|
self._md = arg._md |
|
else: |
|
self._md = arg |
|
|
|
def __reduce__(self) -> NoReturn: |
|
raise TypeError(f"can't pickle {self.__class__.__name__} objects") |
|
|
|
@overload |
|
def getall(self, key: str) -> list[_V]: ... |
|
@overload |
|
def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ... |
|
def getall( |
|
self, key: str, default: Union[_T, _SENTINEL] = sentinel |
|
) -> Union[list[_V], _T]: |
|
"""Return a list of all values matching the key.""" |
|
if default is not sentinel: |
|
return self._md.getall(key, default) |
|
else: |
|
return self._md.getall(key) |
|
|
|
@overload |
|
def getone(self, key: str) -> _V: ... |
|
@overload |
|
def getone(self, key: str, default: _T) -> Union[_V, _T]: ... |
|
def getone( |
|
self, key: str, default: Union[_T, _SENTINEL] = sentinel |
|
) -> Union[_V, _T]: |
|
"""Get first value matching the key. |
|
|
|
Raises KeyError if the key is not found and no default is provided. |
|
""" |
|
if default is not sentinel: |
|
return self._md.getone(key, default) |
|
else: |
|
return self._md.getone(key) |
|
|
|
|
|
|
|
def __getitem__(self, key: str) -> _V: |
|
return self.getone(key) |
|
|
|
@overload |
|
def get(self, key: str, /) -> Union[_V, None]: ... |
|
@overload |
|
def get(self, key: str, /, default: _T) -> Union[_V, _T]: ... |
|
def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]: |
|
"""Get first value matching the key. |
|
|
|
If the key is not found, returns the default (or None if no default is provided) |
|
""" |
|
return self._md.getone(key, default) |
|
|
|
def __iter__(self) -> Iterator[str]: |
|
return iter(self._md.keys()) |
|
|
|
def __len__(self) -> int: |
|
return len(self._md) |
|
|
|
def keys(self) -> KeysView[str]: |
|
"""Return a new view of the dictionary's keys.""" |
|
return self._md.keys() |
|
|
|
def items(self) -> ItemsView[str, _V]: |
|
"""Return a new view of the dictionary's items *(key, value) pairs).""" |
|
return self._md.items() |
|
|
|
def values(self) -> _ValuesView[_V]: |
|
"""Return a new view of the dictionary's values.""" |
|
return self._md.values() |
|
|
|
def __eq__(self, other: object) -> bool: |
|
return self._md == other |
|
|
|
def __contains__(self, key: object) -> bool: |
|
return key in self._md |
|
|
|
@reprlib.recursive_repr() |
|
def __repr__(self) -> str: |
|
body = ", ".join(f"'{k}': {v!r}" for k, v in self.items()) |
|
return f"<{self.__class__.__name__}({body})>" |
|
|
|
def copy(self) -> MultiDict[_V]: |
|
"""Return a copy of itself.""" |
|
return MultiDict(self._md) |
|
|
|
|
|
class CIMultiDictProxy(_CIMixin, MultiDictProxy[_V]): |
|
"""Read-only proxy for CIMultiDict instance.""" |
|
|
|
def __init__(self, arg: Union[MultiDict[_V], MultiDictProxy[_V]]): |
|
if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)): |
|
raise TypeError( |
|
"ctor requires CIMultiDict or CIMultiDictProxy instance" |
|
f", not {type(arg)}" |
|
) |
|
|
|
super().__init__(arg) |
|
|
|
def copy(self) -> CIMultiDict[_V]: |
|
"""Return a copy of itself.""" |
|
return CIMultiDict(self._md) |
|
|
|
|
|
def getversion(md: Union[MultiDict[object], MultiDictProxy[object]]) -> int: |
|
if isinstance(md, MultiDictProxy): |
|
md = md._md |
|
elif not isinstance(md, MultiDict): |
|
raise TypeError("Parameter should be multidict or proxy") |
|
return md._version |
|
|