jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
import enum
import functools
import reprlib
import sys
from array import array
from collections.abc import (
ItemsView,
Iterable,
Iterator,
KeysView,
Mapping,
ValuesView,
)
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
NoReturn,
Optional,
TypeVar,
Union,
cast,
overload,
)
from ._abc import MDArg, MultiMapping, MutableMultiMapping, SupportsKeys
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
class istr(str):
"""Case insensitive str."""
__is_istr__ = True
__istr_identity__: Optional[str] = None
_V = TypeVar("_V")
_T = TypeVar("_T")
_SENTINEL = enum.Enum("_SENTINEL", "sentinel")
sentinel = _SENTINEL.sentinel
_version = array("Q", [0])
class _Iter(Generic[_T]):
__slots__ = ("_size", "_iter")
def __init__(self, size: int, iterator: Iterator[_T]):
self._size = size
self._iter = iterator
def __iter__(self) -> Self:
return self
def __next__(self) -> _T:
return next(self._iter)
def __length_hint__(self) -> int:
return self._size
class _ViewBase(Generic[_V]):
def __init__(
self,
md: "MultiDict[_V]",
):
self._md = md
def __len__(self) -> int:
return len(self._md)
class _ItemsView(_ViewBase[_V], ItemsView[str, _V]):
def __contains__(self, item: object) -> bool:
if not isinstance(item, (tuple, list)) or len(item) != 2:
return False
key, value = item
try:
identity = self._md._identity(key)
except TypeError:
return False
hash_ = hash(identity)
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity and value == e.value:
return True
return False
def __iter__(self) -> _Iter[tuple[str, _V]]:
return _Iter(len(self), self._iter(self._md._version))
def _iter(self, version: int) -> Iterator[tuple[str, _V]]:
for e in self._md._keys.iter_entries():
if version != self._md._version:
raise RuntimeError("Dictionary changed during iteration")
yield self._md._key(e.key), e.value
@reprlib.recursive_repr()
def __repr__(self) -> str:
lst = []
for e in self._md._keys.iter_entries():
lst.append(f"'{e.key}': {e.value!r}")
body = ", ".join(lst)
return f"<{self.__class__.__name__}({body})>"
def _parse_item(
self, arg: Union[tuple[str, _V], _T]
) -> Optional[tuple[int, str, str, _V]]:
if not isinstance(arg, tuple):
return None
if len(arg) != 2:
return None
try:
identity = self._md._identity(arg[0])
return (hash(identity), identity, arg[0], arg[1])
except TypeError:
return None
def _tmp_set(self, it: Iterable[_T]) -> set[tuple[str, _V]]:
tmp = set()
for arg in it:
item = self._parse_item(arg)
if item is None:
continue
else:
tmp.add((item[1], item[3]))
return tmp
def __and__(self, other: Iterable[Any]) -> set[tuple[str, _V]]:
ret = set()
try:
it = iter(other)
except TypeError:
return NotImplemented
for arg in it:
item = self._parse_item(arg)
if item is None:
continue
hash_, identity, key, value = item
for slot, idx, e in self._md._keys.iter_hash(hash_):
e.hash = -1
if e.identity == identity and e.value == value:
ret.add((e.key, e.value))
self._md._keys.restore_hash(hash_)
return ret
def __rand__(self, other: Iterable[_T]) -> set[_T]:
ret = set()
try:
it = iter(other)
except TypeError:
return NotImplemented
for arg in it:
item = self._parse_item(arg)
if item is None:
continue
hash_, identity, key, value = item
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity and e.value == value:
ret.add(arg)
break
return ret
def __or__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
ret: set[Union[tuple[str, _V], _T]] = set(self)
try:
it = iter(other)
except TypeError:
return NotImplemented
for arg in it:
item: Optional[tuple[int, str, str, _V]] = self._parse_item(arg)
if item is None:
ret.add(arg)
continue
hash_, identity, key, value = item
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity and e.value == value: # pragma: no branch
break
else:
ret.add(arg)
return ret
def __ror__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
try:
ret: set[Union[tuple[str, _V], _T]] = set(other)
except TypeError:
return NotImplemented
tmp = self._tmp_set(ret)
for e in self._md._keys.iter_entries():
if (e.identity, e.value) not in tmp:
ret.add((e.key, e.value))
return ret
def __sub__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
ret: set[Union[tuple[str, _V], _T]] = set()
try:
it = iter(other)
except TypeError:
return NotImplemented
tmp = self._tmp_set(it)
for e in self._md._keys.iter_entries():
if (e.identity, e.value) not in tmp:
ret.add((e.key, e.value))
return ret
def __rsub__(self, other: Iterable[_T]) -> set[_T]:
ret: set[_T] = set()
try:
it = iter(other)
except TypeError:
return NotImplemented
for arg in it:
item = self._parse_item(arg)
if item is None:
ret.add(arg)
continue
hash_, identity, key, value = item
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity and e.value == value: # pragma: no branch
break
else:
ret.add(arg)
return ret
def __xor__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
try:
rgt = set(other)
except TypeError:
return NotImplemented
ret: set[Union[tuple[str, _V], _T]] = self - rgt
ret |= rgt - self
return ret
__rxor__ = __xor__
def isdisjoint(self, other: Iterable[tuple[str, _V]]) -> bool:
for arg in other:
item = self._parse_item(arg)
if item is None:
continue
hash_, identity, key, value = item
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity and e.value == value: # pragma: no branch
return False
return True
class _ValuesView(_ViewBase[_V], ValuesView[_V]):
def __contains__(self, value: object) -> bool:
for e in self._md._keys.iter_entries():
if e.value == value:
return True
return False
def __iter__(self) -> _Iter[_V]:
return _Iter(len(self), self._iter(self._md._version))
def _iter(self, version: int) -> Iterator[_V]:
for e in self._md._keys.iter_entries():
if version != self._md._version:
raise RuntimeError("Dictionary changed during iteration")
yield e.value
@reprlib.recursive_repr()
def __repr__(self) -> str:
lst = []
for e in self._md._keys.iter_entries():
lst.append(repr(e.value))
body = ", ".join(lst)
return f"<{self.__class__.__name__}({body})>"
class _KeysView(_ViewBase[_V], KeysView[str]):
def __contains__(self, key: object) -> bool:
if not isinstance(key, str):
return False
identity = self._md._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
return True
return False
def __iter__(self) -> _Iter[str]:
return _Iter(len(self), self._iter(self._md._version))
def _iter(self, version: int) -> Iterator[str]:
for e in self._md._keys.iter_entries():
if version != self._md._version:
raise RuntimeError("Dictionary changed during iteration")
yield self._md._key(e.key)
def __repr__(self) -> str:
lst = []
for e in self._md._keys.iter_entries():
lst.append(f"'{e.key}'")
body = ", ".join(lst)
return f"<{self.__class__.__name__}({body})>"
def __and__(self, other: Iterable[object]) -> set[str]:
ret = set()
try:
it = iter(other)
except TypeError:
return NotImplemented
for key in it:
if not isinstance(key, str):
continue
identity = self._md._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
ret.add(e.key)
break
return ret
def __rand__(self, other: Iterable[_T]) -> set[_T]:
ret = set()
try:
it = iter(other)
except TypeError:
return NotImplemented
for key in it:
if not isinstance(key, str):
continue
if key in self._md:
ret.add(key)
return cast(set[_T], ret)
def __or__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
ret: set[Union[str, _T]] = set(self)
try:
it = iter(other)
except TypeError:
return NotImplemented
for key in it:
if not isinstance(key, str):
ret.add(key)
continue
if key not in self._md:
ret.add(key)
return ret
def __ror__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
try:
ret: set[Union[str, _T]] = set(other)
except TypeError:
return NotImplemented
tmp = set()
for key in ret:
if not isinstance(key, str):
continue
identity = self._md._identity(key)
tmp.add(identity)
for e in self._md._keys.iter_entries():
if e.identity not in tmp:
ret.add(e.key)
return ret
def __sub__(self, other: Iterable[object]) -> set[str]:
ret = set(self)
try:
it = iter(other)
except TypeError:
return NotImplemented
for key in it:
if not isinstance(key, str):
continue
identity = self._md._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._md._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
ret.discard(e.key)
break
return ret
def __rsub__(self, other: Iterable[_T]) -> set[_T]:
try:
ret: set[_T] = set(other)
except TypeError:
return NotImplemented
for key in other:
if not isinstance(key, str):
continue
if key in self._md:
ret.discard(key) # type: ignore[arg-type]
return ret
def __xor__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
try:
rgt = set(other)
except TypeError:
return NotImplemented
ret: set[Union[str, _T]] = self - rgt # type: ignore[assignment]
ret |= rgt - self
return ret
__rxor__ = __xor__
def isdisjoint(self, other: Iterable[object]) -> bool:
for key in other:
if not isinstance(key, str):
continue
if key in self._md:
return False
return True
class _CSMixin:
_ci: ClassVar[bool] = False
def _key(self, key: str) -> str:
return key
def _identity(self, key: str) -> str:
if isinstance(key, str):
return key
else:
raise TypeError("MultiDict keys should be either str or subclasses of str")
class _CIMixin:
_ci: ClassVar[bool] = True
def _key(self, key: str) -> str:
if type(key) is istr:
return key
else:
return istr(key)
def _identity(self, key: str) -> str:
if isinstance(key, istr):
ret = key.__istr_identity__
if ret is None:
ret = key.title()
key.__istr_identity__ = ret
return ret
if isinstance(key, str):
return key.title()
else:
raise TypeError("MultiDict keys should be either str or subclasses of str")
def estimate_log2_keysize(n: int) -> int:
# 7 == HT_MINSIZE - 1
return (((n * 3 + 1) // 2) | 7).bit_length()
@dataclass
class _Entry(Generic[_V]):
hash: int
identity: str
key: str
value: _V
@dataclass
class _HtKeys(Generic[_V]): # type: ignore[misc]
LOG_MINSIZE: ClassVar[int] = 3
MINSIZE: ClassVar[int] = 8
PREALLOCATED_INDICES: ClassVar[dict[int, array]] = { # type: ignore[type-arg]
log2_size: array(
"b" if log2_size < 8 else "h", (-1 for i in range(1 << log2_size))
)
for log2_size in range(3, 10)
}
log2_size: int
usable: int
indices: array # type: ignore[type-arg] # in py3.9 array is not generic
entries: list[Optional[_Entry[_V]]]
@functools.cached_property
def nslots(self) -> int:
return 1 << self.log2_size
@functools.cached_property
def mask(self) -> int:
return self.nslots - 1
if sys.implementation.name != "pypy":
def __sizeof__(self) -> int:
return (
object.__sizeof__(self)
+ sys.getsizeof(self.indices)
+ sys.getsizeof(self.entries)
)
@classmethod
def new(cls, log2_size: int, entries: list[Optional[_Entry[_V]]]) -> Self:
size = 1 << log2_size
usable = (size << 1) // 3
if log2_size < 10:
indices = cls.PREALLOCATED_INDICES[log2_size].__copy__()
elif log2_size < 16:
indices = array("h", (-1 for i in range(size)))
elif log2_size < 32:
indices = array("l", (-1 for i in range(size)))
else: # pragma: no cover # don't test huge multidicts
indices = array("q", (-1 for i in range(size)))
ret = cls(
log2_size=log2_size,
usable=usable,
indices=indices,
entries=entries,
)
return ret
def clone(self) -> "_HtKeys[_V]":
entries = [
_Entry(e.hash, e.identity, e.key, e.value) if e is not None else None
for e in self.entries
]
return _HtKeys(
log2_size=self.log2_size,
usable=self.usable,
indices=self.indices.__copy__(),
entries=entries,
)
def build_indices(self, update: bool) -> None:
mask = self.mask
indices = self.indices
for idx, e in enumerate(self.entries):
assert e is not None
hash_ = e.hash
if update:
if hash_ == -1:
hash_ = hash(e.identity)
else:
assert hash_ != -1
i = hash_ & mask
perturb = hash_ & sys.maxsize
while indices[i] != -1:
perturb >>= 5
i = mask & (i * 5 + perturb + 1)
indices[i] = idx
def find_empty_slot(self, hash_: int) -> int:
mask = self.mask
indices = self.indices
i = hash_ & mask
perturb = hash_ & sys.maxsize
ix = indices[i]
while ix != -1:
perturb >>= 5
i = (i * 5 + perturb + 1) & mask
ix = indices[i]
return i
def iter_hash(self, hash_: int) -> Iterator[tuple[int, int, _Entry[_V]]]:
mask = self.mask
indices = self.indices
entries = self.entries
i = hash_ & mask
perturb = hash_ & sys.maxsize
ix = indices[i]
while ix != -1:
if ix != -2:
e = entries[ix]
if e.hash == hash_:
yield i, ix, e
perturb >>= 5
i = (i * 5 + perturb + 1) & mask
ix = indices[i]
def del_idx(self, hash_: int, idx: int) -> None:
mask = self.mask
indices = self.indices
i = hash_ & mask
perturb = hash_ & sys.maxsize
ix = indices[i]
while ix != idx:
perturb >>= 5
i = (i * 5 + perturb + 1) & mask
ix = indices[i]
indices[i] = -2
def iter_entries(self) -> Iterator[_Entry[_V]]:
return filter(None, self.entries)
def restore_hash(self, hash_: int) -> None:
mask = self.mask
indices = self.indices
entries = self.entries
i = hash_ & mask
perturb = hash_ & sys.maxsize
ix = indices[i]
while ix != -1:
if ix != -2:
entry = entries[ix]
if entry.hash == -1:
entry.hash = hash_
perturb >>= 5
i = (i * 5 + perturb + 1) & mask
ix = indices[i]
class MultiDict(_CSMixin, MutableMultiMapping[_V]):
"""Dictionary with the support for duplicate keys."""
__slots__ = ("_keys", "_used", "_version")
def __init__(self, arg: MDArg[_V] = None, /, **kwargs: _V):
self._used = 0
v = _version
v[0] += 1
self._version = v[0]
if not kwargs:
md = None
if isinstance(arg, MultiDictProxy):
md = arg._md
elif isinstance(arg, MultiDict):
md = arg
if md is not None and md._ci is self._ci:
self._from_md(md)
return
items = self._parse_args(arg, kwargs)
log2_size = estimate_log2_keysize(len(items))
if log2_size > 17: # pragma: no cover
# Don't overallocate really huge keys space in init
log2_size = 17
self._keys: _HtKeys[_V] = _HtKeys.new(log2_size, [])
self._extend_items(items)
def _from_md(self, md: "MultiDict[_V]") -> None:
# Copy everything as-is without compacting the new multidict,
# otherwise it requires reindexing
self._keys = md._keys.clone()
self._used = md._used
@overload
def getall(self, key: str) -> list[_V]: ...
@overload
def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
def getall(
self, key: str, default: Union[_T, _SENTINEL] = sentinel
) -> Union[list[_V], _T]:
"""Return a list of all values matching the key."""
identity = self._identity(key)
hash_ = hash(identity)
res = []
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
res.append(e.value)
e.hash = -1
self._keys.restore_hash(hash_)
if res:
return res
if not res and default is not sentinel:
return default
raise KeyError("Key not found: %r" % key)
@overload
def getone(self, key: str) -> _V: ...
@overload
def getone(self, key: str, default: _T) -> Union[_V, _T]: ...
def getone(
self, key: str, default: Union[_T, _SENTINEL] = sentinel
) -> Union[_V, _T]:
"""Get first value matching the key.
Raises KeyError if the key is not found and no default is provided.
"""
identity = self._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
return e.value
if default is not sentinel:
return default
raise KeyError("Key not found: %r" % key)
# Mapping interface #
def __getitem__(self, key: str) -> _V:
return self.getone(key)
@overload
def get(self, key: str, /) -> Union[_V, None]: ...
@overload
def get(self, key: str, /, default: _T) -> Union[_V, _T]: ...
def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]:
"""Get first value matching the key.
If the key is not found, returns the default (or None if no default is provided)
"""
return self.getone(key, default)
def __iter__(self) -> Iterator[str]:
return iter(self.keys())
def __len__(self) -> int:
return self._used
def keys(self) -> KeysView[str]:
"""Return a new view of the dictionary's keys."""
return _KeysView(self)
def items(self) -> ItemsView[str, _V]:
"""Return a new view of the dictionary's items *(key, value) pairs)."""
return _ItemsView(self)
def values(self) -> _ValuesView[_V]:
"""Return a new view of the dictionary's values."""
return _ValuesView(self)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Mapping):
return NotImplemented
if isinstance(other, MultiDictProxy):
return self == other._md
if isinstance(other, MultiDict):
lft = self._keys
rht = other._keys
if self._used != other._used:
return False
for e1, e2 in zip(lft.iter_entries(), rht.iter_entries()):
if e1.identity != e2.identity or e1.value != e2.value:
return False
return True
if self._used != len(other):
return False
for k, v in self.items():
nv = other.get(k, sentinel)
if v != nv:
return False
return True
def __contains__(self, key: object) -> bool:
if not isinstance(key, str):
return False
identity = self._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
return True
return False
@reprlib.recursive_repr()
def __repr__(self) -> str:
body = ", ".join(f"'{e.key}': {e.value!r}" for e in self._keys.iter_entries())
return f"<{self.__class__.__name__}({body})>"
if sys.implementation.name != "pypy":
def __sizeof__(self) -> int:
return object.__sizeof__(self) + sys.getsizeof(self._keys)
def __reduce__(self) -> tuple[type[Self], tuple[list[tuple[str, _V]]]]:
return (self.__class__, (list(self.items()),))
def add(self, key: str, value: _V) -> None:
identity = self._identity(key)
hash_ = hash(identity)
self._add_with_hash(_Entry(hash_, identity, key, value))
self._incr_version()
def copy(self) -> Self:
"""Return a copy of itself."""
cls = self.__class__
return cls(self)
__copy__ = copy
def extend(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
"""Extend current MultiDict with more values.
This method must be used instead of update.
"""
items = self._parse_args(arg, kwargs)
newsize = self._used + len(items)
self._resize(estimate_log2_keysize(newsize), False)
self._extend_items(items)
def _parse_args(
self,
arg: MDArg[_V],
kwargs: Mapping[str, _V],
) -> list[_Entry[_V]]:
identity_func = self._identity
if arg:
if isinstance(arg, MultiDictProxy):
arg = arg._md
if isinstance(arg, MultiDict):
if self._ci is not arg._ci:
items = []
for e in arg._keys.iter_entries():
identity = identity_func(e.key)
items.append(_Entry(hash(identity), identity, e.key, e.value))
else:
items = [
_Entry(e.hash, e.identity, e.key, e.value)
for e in arg._keys.iter_entries()
]
if kwargs:
for key, value in kwargs.items():
identity = identity_func(key)
items.append(_Entry(hash(identity), identity, key, value))
else:
if hasattr(arg, "keys"):
arg = cast(SupportsKeys[_V], arg)
arg = [(k, arg[k]) for k in arg.keys()]
if kwargs:
arg = list(arg)
arg.extend(list(kwargs.items()))
items = []
for pos, item in enumerate(arg):
if not len(item) == 2:
raise ValueError(
f"multidict update sequence element #{pos}"
f"has length {len(item)}; 2 is required"
)
identity = identity_func(item[0])
items.append(_Entry(hash(identity), identity, item[0], item[1]))
else:
items = []
for key, value in kwargs.items():
identity = identity_func(key)
items.append(_Entry(hash(identity), identity, key, value))
return items
def _extend_items(self, items: Iterable[_Entry[_V]]) -> None:
for e in items:
self._add_with_hash(e)
self._incr_version()
def clear(self) -> None:
"""Remove all items from MultiDict."""
self._used = 0
self._keys = _HtKeys.new(_HtKeys.LOG_MINSIZE, [])
self._incr_version()
# Mapping interface #
def __setitem__(self, key: str, value: _V) -> None:
identity = self._identity(key)
hash_ = hash(identity)
found = False
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
if not found:
e.key = key
e.value = value
e.hash = -1
found = True
self._incr_version()
elif e.hash != -1: # pragma: no branch
self._del_at(slot, idx)
if not found:
self._add_with_hash(_Entry(hash_, identity, key, value))
else:
self._keys.restore_hash(hash_)
def __delitem__(self, key: str) -> None:
found = False
identity = self._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
self._del_at(slot, idx)
found = True
if not found:
raise KeyError(key)
else:
self._incr_version()
@overload
def setdefault(
self: "MultiDict[Union[_T, None]]", key: str, default: None = None
) -> Union[_T, None]: ...
@overload
def setdefault(self, key: str, default: _V) -> _V: ...
def setdefault(self, key: str, default: Union[_V, None] = None) -> Union[_V, None]: # type: ignore[misc]
"""Return value for key, set value to default if key is not present."""
identity = self._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
return e.value
self.add(key, default) # type: ignore[arg-type]
return default
@overload
def popone(self, key: str) -> _V: ...
@overload
def popone(self, key: str, default: _T) -> Union[_V, _T]: ...
def popone(
self, key: str, default: Union[_T, _SENTINEL] = sentinel
) -> Union[_V, _T]:
"""Remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise
KeyError is raised.
"""
identity = self._identity(key)
hash_ = hash(identity)
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
value = e.value
self._del_at(slot, idx)
self._incr_version()
return value
if default is sentinel:
raise KeyError(key)
else:
return default
# Type checking will inherit signature for pop() if we don't confuse it here.
if not TYPE_CHECKING:
pop = popone
@overload
def popall(self, key: str) -> list[_V]: ...
@overload
def popall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
def popall(
self, key: str, default: Union[_T, _SENTINEL] = sentinel
) -> Union[list[_V], _T]:
"""Remove all occurrences of key and return the list of corresponding
values.
If key is not found, default is returned if given, otherwise
KeyError is raised.
"""
found = False
identity = self._identity(key)
hash_ = hash(identity)
ret = []
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
found = True
ret.append(e.value)
self._del_at(slot, idx)
self._incr_version()
if not found:
if default is sentinel:
raise KeyError(key)
else:
return default
else:
return ret
def popitem(self) -> tuple[str, _V]:
"""Remove and return an arbitrary (key, value) pair."""
if self._used <= 0:
raise KeyError("empty multidict")
pos = len(self._keys.entries) - 1
entry = self._keys.entries.pop()
while entry is None:
pos -= 1
entry = self._keys.entries.pop()
ret = self._key(entry.key), entry.value
self._keys.del_idx(entry.hash, pos)
self._used -= 1
self._incr_version()
return ret
def update(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
"""Update the dictionary from *other*, overwriting existing keys."""
items = self._parse_args(arg, kwargs)
newsize = self._used + len(items)
log2_size = estimate_log2_keysize(newsize)
if log2_size > 17: # pragma: no cover
# Don't overallocate really huge keys space in update,
# duplicate keys could reduce the resulting anount of entries
log2_size = 17
if log2_size > self._keys.log2_size:
self._resize(log2_size, False)
self._update_items(items)
def _update_items(self, items: list[_Entry[_V]]) -> None:
if not items:
return
for entry in items:
found = False
hash_ = entry.hash
identity = entry.identity
for slot, idx, e in self._keys.iter_hash(hash_):
if e.identity == identity: # pragma: no branch
if not found:
found = True
e.key = entry.key
e.value = entry.value
e.hash = -1
else:
self._del_at_for_upd(e)
if not found:
self._add_with_hash_for_upd(entry)
keys = self._keys
indices = keys.indices
entries = keys.entries
for slot in range(keys.nslots):
idx = indices[slot]
if idx >= 0:
e2 = entries[idx]
assert e2 is not None
if e2.key is None:
entries[idx] = None # type: ignore[unreachable]
indices[slot] = -2
self._used -= 1
if e2.hash == -1:
e2.hash = hash(e2.identity)
self._incr_version()
def _incr_version(self) -> None:
v = _version
v[0] += 1
self._version = v[0]
def _resize(self, log2_newsize: int, update: bool) -> None:
oldkeys = self._keys
newentries = self._used
if len(oldkeys.entries) == newentries:
entries = oldkeys.entries
else:
entries = [e for e in oldkeys.entries if e is not None]
newkeys: _HtKeys[_V] = _HtKeys.new(log2_newsize, entries)
newkeys.usable -= newentries
newkeys.build_indices(update)
self._keys = newkeys
def _add_with_hash(self, entry: _Entry[_V]) -> None:
if self._keys.usable <= 0:
self._resize((self._used * 3 | _HtKeys.MINSIZE - 1).bit_length(), False)
keys = self._keys
slot = keys.find_empty_slot(entry.hash)
keys.indices[slot] = len(keys.entries)
keys.entries.append(entry)
self._incr_version()
self._used += 1
keys.usable -= 1
def _add_with_hash_for_upd(self, entry: _Entry[_V]) -> None:
if self._keys.usable <= 0:
self._resize((self._used * 3 | _HtKeys.MINSIZE - 1).bit_length(), True)
keys = self._keys
slot = keys.find_empty_slot(entry.hash)
keys.indices[slot] = len(keys.entries)
entry.hash = -1
keys.entries.append(entry)
self._incr_version()
self._used += 1
keys.usable -= 1
def _del_at(self, slot: int, idx: int) -> None:
self._keys.entries[idx] = None
self._keys.indices[slot] = -2
self._used -= 1
def _del_at_for_upd(self, entry: _Entry[_V]) -> None:
entry.key = None # type: ignore[assignment]
entry.value = None # type: ignore[assignment]
class CIMultiDict(_CIMixin, MultiDict[_V]):
"""Dictionary with the support for duplicate case-insensitive keys."""
class MultiDictProxy(_CSMixin, MultiMapping[_V]):
"""Read-only proxy for MultiDict instance."""
__slots__ = ("_md",)
_md: MultiDict[_V]
def __init__(self, arg: Union[MultiDict[_V], "MultiDictProxy[_V]"]):
if not isinstance(arg, (MultiDict, MultiDictProxy)):
raise TypeError(
"ctor requires MultiDict or MultiDictProxy instance"
f", not {type(arg)}"
)
if isinstance(arg, MultiDictProxy):
self._md = arg._md
else:
self._md = arg
def __reduce__(self) -> NoReturn:
raise TypeError(f"can't pickle {self.__class__.__name__} objects")
@overload
def getall(self, key: str) -> list[_V]: ...
@overload
def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
def getall(
self, key: str, default: Union[_T, _SENTINEL] = sentinel
) -> Union[list[_V], _T]:
"""Return a list of all values matching the key."""
if default is not sentinel:
return self._md.getall(key, default)
else:
return self._md.getall(key)
@overload
def getone(self, key: str) -> _V: ...
@overload
def getone(self, key: str, default: _T) -> Union[_V, _T]: ...
def getone(
self, key: str, default: Union[_T, _SENTINEL] = sentinel
) -> Union[_V, _T]:
"""Get first value matching the key.
Raises KeyError if the key is not found and no default is provided.
"""
if default is not sentinel:
return self._md.getone(key, default)
else:
return self._md.getone(key)
# Mapping interface #
def __getitem__(self, key: str) -> _V:
return self.getone(key)
@overload
def get(self, key: str, /) -> Union[_V, None]: ...
@overload
def get(self, key: str, /, default: _T) -> Union[_V, _T]: ...
def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]:
"""Get first value matching the key.
If the key is not found, returns the default (or None if no default is provided)
"""
return self._md.getone(key, default)
def __iter__(self) -> Iterator[str]:
return iter(self._md.keys())
def __len__(self) -> int:
return len(self._md)
def keys(self) -> KeysView[str]:
"""Return a new view of the dictionary's keys."""
return self._md.keys()
def items(self) -> ItemsView[str, _V]:
"""Return a new view of the dictionary's items *(key, value) pairs)."""
return self._md.items()
def values(self) -> _ValuesView[_V]:
"""Return a new view of the dictionary's values."""
return self._md.values()
def __eq__(self, other: object) -> bool:
return self._md == other
def __contains__(self, key: object) -> bool:
return key in self._md
@reprlib.recursive_repr()
def __repr__(self) -> str:
body = ", ".join(f"'{k}': {v!r}" for k, v in self.items())
return f"<{self.__class__.__name__}({body})>"
def copy(self) -> MultiDict[_V]:
"""Return a copy of itself."""
return MultiDict(self._md)
class CIMultiDictProxy(_CIMixin, MultiDictProxy[_V]):
"""Read-only proxy for CIMultiDict instance."""
def __init__(self, arg: Union[MultiDict[_V], MultiDictProxy[_V]]):
if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)):
raise TypeError(
"ctor requires CIMultiDict or CIMultiDictProxy instance"
f", not {type(arg)}"
)
super().__init__(arg)
def copy(self) -> CIMultiDict[_V]:
"""Return a copy of itself."""
return CIMultiDict(self._md)
def getversion(md: Union[MultiDict[object], MultiDictProxy[object]]) -> int:
if isinstance(md, MultiDictProxy):
md = md._md
elif not isinstance(md, MultiDict):
raise TypeError("Parameter should be multidict or proxy")
return md._version