|
"""Bucket of reusable internal utilities. |
|
|
|
This should be reduced as much as possible with functions only used in one place, moved to that place. |
|
""" |
|
|
|
from __future__ import annotations as _annotations |
|
|
|
import dataclasses |
|
import keyword |
|
import sys |
|
import typing |
|
import warnings |
|
import weakref |
|
from collections import OrderedDict, defaultdict, deque |
|
from collections.abc import Mapping |
|
from copy import deepcopy |
|
from functools import cached_property |
|
from inspect import Parameter |
|
from itertools import zip_longest |
|
from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType |
|
from typing import Any, Callable, Generic, TypeVar, overload |
|
|
|
from typing_extensions import TypeAlias, TypeGuard, deprecated |
|
|
|
from pydantic import PydanticDeprecatedSince211 |
|
|
|
from . import _repr, _typing_extra |
|
from ._import_utils import import_cached_base_model |
|
|
|
if typing.TYPE_CHECKING: |
|
MappingIntStrAny: TypeAlias = 'typing.Mapping[int, Any] | typing.Mapping[str, Any]' |
|
AbstractSetIntStr: TypeAlias = 'typing.AbstractSet[int] | typing.AbstractSet[str]' |
|
from ..main import BaseModel |
|
|
|
|
|
|
|
IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = { |
|
int, |
|
float, |
|
complex, |
|
str, |
|
bool, |
|
bytes, |
|
type, |
|
_typing_extra.NoneType, |
|
FunctionType, |
|
BuiltinFunctionType, |
|
LambdaType, |
|
weakref.ref, |
|
CodeType, |
|
|
|
|
|
|
|
ModuleType, |
|
NotImplemented.__class__, |
|
Ellipsis.__class__, |
|
} |
|
|
|
|
|
BUILTIN_COLLECTIONS: set[type[Any]] = { |
|
list, |
|
set, |
|
tuple, |
|
frozenset, |
|
dict, |
|
OrderedDict, |
|
defaultdict, |
|
deque, |
|
} |
|
|
|
|
|
def can_be_positional(param: Parameter) -> bool: |
|
"""Return whether the parameter accepts a positional argument. |
|
|
|
```python {test="skip" lint="skip"} |
|
def func(a, /, b, *, c): |
|
pass |
|
|
|
params = inspect.signature(func).parameters |
|
can_be_positional(params['a']) |
|
#> True |
|
can_be_positional(params['b']) |
|
#> True |
|
can_be_positional(params['c']) |
|
#> False |
|
``` |
|
""" |
|
return param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) |
|
|
|
|
|
def sequence_like(v: Any) -> bool: |
|
return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque)) |
|
|
|
|
|
def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool: |
|
try: |
|
return isinstance(o, class_or_tuple) |
|
except TypeError: |
|
return False |
|
|
|
|
|
def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: |
|
try: |
|
return isinstance(cls, type) and issubclass(cls, class_or_tuple) |
|
except TypeError: |
|
if isinstance(cls, _typing_extra.WithArgsTypes): |
|
return False |
|
raise |
|
|
|
|
|
def is_model_class(cls: Any) -> TypeGuard[type[BaseModel]]: |
|
"""Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking, |
|
unlike raw calls to lenient_issubclass. |
|
""" |
|
BaseModel = import_cached_base_model() |
|
|
|
return lenient_issubclass(cls, BaseModel) and cls is not BaseModel |
|
|
|
|
|
def is_valid_identifier(identifier: str) -> bool: |
|
"""Checks that a string is a valid identifier and not a Python keyword. |
|
:param identifier: The identifier to test. |
|
:return: True if the identifier is valid. |
|
""" |
|
return identifier.isidentifier() and not keyword.iskeyword(identifier) |
|
|
|
|
|
KeyType = TypeVar('KeyType') |
|
|
|
|
|
def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]: |
|
updated_mapping = mapping.copy() |
|
for updating_mapping in updating_mappings: |
|
for k, v in updating_mapping.items(): |
|
if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict): |
|
updated_mapping[k] = deep_update(updated_mapping[k], v) |
|
else: |
|
updated_mapping[k] = v |
|
return updated_mapping |
|
|
|
|
|
def update_not_none(mapping: dict[Any, Any], **update: Any) -> None: |
|
mapping.update({k: v for k, v in update.items() if v is not None}) |
|
|
|
|
|
T = TypeVar('T') |
|
|
|
|
|
def unique_list( |
|
input_list: list[T] | tuple[T, ...], |
|
*, |
|
name_factory: typing.Callable[[T], str] = str, |
|
) -> list[T]: |
|
"""Make a list unique while maintaining order. |
|
We update the list if another one with the same name is set |
|
(e.g. model validator overridden in subclass). |
|
""" |
|
result: list[T] = [] |
|
result_names: list[str] = [] |
|
for v in input_list: |
|
v_name = name_factory(v) |
|
if v_name not in result_names: |
|
result_names.append(v_name) |
|
result.append(v) |
|
else: |
|
result[result_names.index(v_name)] = v |
|
|
|
return result |
|
|
|
|
|
class ValueItems(_repr.Representation): |
|
"""Class for more convenient calculation of excluded or included fields on values.""" |
|
|
|
__slots__ = ('_items', '_type') |
|
|
|
def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None: |
|
items = self._coerce_items(items) |
|
|
|
if isinstance(value, (list, tuple)): |
|
items = self._normalize_indexes(items, len(value)) |
|
|
|
self._items: MappingIntStrAny = items |
|
|
|
def is_excluded(self, item: Any) -> bool: |
|
"""Check if item is fully excluded. |
|
|
|
:param item: key or index of a value |
|
""" |
|
return self.is_true(self._items.get(item)) |
|
|
|
def is_included(self, item: Any) -> bool: |
|
"""Check if value is contained in self._items. |
|
|
|
:param item: key or index of value |
|
""" |
|
return item in self._items |
|
|
|
def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None: |
|
""":param e: key or index of element on value |
|
:return: raw values for element if self._items is dict and contain needed element |
|
""" |
|
item = self._items.get(e) |
|
return item if not self.is_true(item) else None |
|
|
|
def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]: |
|
""":param items: dict or set of indexes which will be normalized |
|
:param v_length: length of sequence indexes of which will be |
|
|
|
>>> self._normalize_indexes({0: True, -2: True, -1: True}, 4) |
|
{0: True, 2: True, 3: True} |
|
>>> self._normalize_indexes({'__all__': True}, 4) |
|
{0: True, 1: True, 2: True, 3: True} |
|
""" |
|
normalized_items: dict[int | str, Any] = {} |
|
all_items = None |
|
for i, v in items.items(): |
|
if not (isinstance(v, typing.Mapping) or isinstance(v, typing.AbstractSet) or self.is_true(v)): |
|
raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}') |
|
if i == '__all__': |
|
all_items = self._coerce_value(v) |
|
continue |
|
if not isinstance(i, int): |
|
raise TypeError( |
|
'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: ' |
|
'expected integer keys or keyword "__all__"' |
|
) |
|
normalized_i = v_length + i if i < 0 else i |
|
normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i)) |
|
|
|
if not all_items: |
|
return normalized_items |
|
if self.is_true(all_items): |
|
for i in range(v_length): |
|
normalized_items.setdefault(i, ...) |
|
return normalized_items |
|
for i in range(v_length): |
|
normalized_item = normalized_items.setdefault(i, {}) |
|
if not self.is_true(normalized_item): |
|
normalized_items[i] = self.merge(all_items, normalized_item) |
|
return normalized_items |
|
|
|
@classmethod |
|
def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any: |
|
"""Merge a `base` item with an `override` item. |
|
|
|
Both `base` and `override` are converted to dictionaries if possible. |
|
Sets are converted to dictionaries with the sets entries as keys and |
|
Ellipsis as values. |
|
|
|
Each key-value pair existing in `base` is merged with `override`, |
|
while the rest of the key-value pairs are updated recursively with this function. |
|
|
|
Merging takes place based on the "union" of keys if `intersect` is |
|
set to `False` (default) and on the intersection of keys if |
|
`intersect` is set to `True`. |
|
""" |
|
override = cls._coerce_value(override) |
|
base = cls._coerce_value(base) |
|
if override is None: |
|
return base |
|
if cls.is_true(base) or base is None: |
|
return override |
|
if cls.is_true(override): |
|
return base if intersect else override |
|
|
|
|
|
if intersect: |
|
merge_keys = [k for k in base if k in override] + [k for k in override if k in base] |
|
else: |
|
merge_keys = list(base) + [k for k in override if k not in base] |
|
|
|
merged: dict[int | str, Any] = {} |
|
for k in merge_keys: |
|
merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect) |
|
if merged_item is not None: |
|
merged[k] = merged_item |
|
|
|
return merged |
|
|
|
@staticmethod |
|
def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny: |
|
if isinstance(items, typing.Mapping): |
|
pass |
|
elif isinstance(items, typing.AbstractSet): |
|
items = dict.fromkeys(items, ...) |
|
else: |
|
class_name = getattr(items, '__class__', '???') |
|
raise TypeError(f'Unexpected type of exclude value {class_name}') |
|
return items |
|
|
|
@classmethod |
|
def _coerce_value(cls, value: Any) -> Any: |
|
if value is None or cls.is_true(value): |
|
return value |
|
return cls._coerce_items(value) |
|
|
|
@staticmethod |
|
def is_true(v: Any) -> bool: |
|
return v is True or v is ... |
|
|
|
def __repr_args__(self) -> _repr.ReprArgs: |
|
return [(None, self._items)] |
|
|
|
|
|
if typing.TYPE_CHECKING: |
|
|
|
def LazyClassAttribute(name: str, get_value: Callable[[], T]) -> T: ... |
|
|
|
else: |
|
|
|
class LazyClassAttribute: |
|
"""A descriptor exposing an attribute only accessible on a class (hidden from instances). |
|
|
|
The attribute is lazily computed and cached during the first access. |
|
""" |
|
|
|
def __init__(self, name: str, get_value: Callable[[], Any]) -> None: |
|
self.name = name |
|
self.get_value = get_value |
|
|
|
@cached_property |
|
def value(self) -> Any: |
|
return self.get_value() |
|
|
|
def __get__(self, instance: Any, owner: type[Any]) -> None: |
|
if instance is None: |
|
return self.value |
|
raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only') |
|
|
|
|
|
Obj = TypeVar('Obj') |
|
|
|
|
|
def smart_deepcopy(obj: Obj) -> Obj: |
|
"""Return type as is for immutable built-in types |
|
Use obj.copy() for built-in empty collections |
|
Use copy.deepcopy() for non-empty collections and unknown objects. |
|
""" |
|
obj_type = obj.__class__ |
|
if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES: |
|
return obj |
|
try: |
|
if not obj and obj_type in BUILTIN_COLLECTIONS: |
|
|
|
return obj if obj_type is tuple else obj.copy() |
|
except (TypeError, ValueError, RuntimeError): |
|
|
|
pass |
|
|
|
return deepcopy(obj) |
|
|
|
|
|
_SENTINEL = object() |
|
|
|
|
|
def all_identical(left: typing.Iterable[Any], right: typing.Iterable[Any]) -> bool: |
|
"""Check that the items of `left` are the same objects as those in `right`. |
|
|
|
>>> a, b = object(), object() |
|
>>> all_identical([a, b, a], [a, b, a]) |
|
True |
|
>>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical" |
|
False |
|
""" |
|
for left_item, right_item in zip_longest(left, right, fillvalue=_SENTINEL): |
|
if left_item is not right_item: |
|
return False |
|
return True |
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
class SafeGetItemProxy: |
|
"""Wrapper redirecting `__getitem__` to `get` with a sentinel value as default |
|
|
|
This makes is safe to use in `operator.itemgetter` when some keys may be missing |
|
""" |
|
|
|
|
|
|
|
__slots__ = ('wrapped',) |
|
|
|
wrapped: Mapping[str, Any] |
|
|
|
def __getitem__(self, key: str, /) -> Any: |
|
return self.wrapped.get(key, _SENTINEL) |
|
|
|
|
|
|
|
|
|
|
|
if typing.TYPE_CHECKING: |
|
|
|
def __contains__(self, key: str, /) -> bool: |
|
return self.wrapped.__contains__(key) |
|
|
|
|
|
_ModelT = TypeVar('_ModelT', bound='BaseModel') |
|
_RT = TypeVar('_RT') |
|
|
|
|
|
class deprecated_instance_property(Generic[_ModelT, _RT]): |
|
"""A decorator exposing the decorated class method as a property, with a warning on instance access. |
|
|
|
This decorator takes a class method defined on the `BaseModel` class and transforms it into |
|
an attribute. The attribute can be accessed on both the class and instances of the class. If accessed |
|
via an instance, a deprecation warning is emitted stating that instance access will be removed in V3. |
|
""" |
|
|
|
def __init__(self, fget: Callable[[type[_ModelT]], _RT], /) -> None: |
|
|
|
self.fget = fget |
|
|
|
@overload |
|
def __get__(self, instance: None, objtype: type[_ModelT]) -> _RT: ... |
|
@overload |
|
@deprecated( |
|
'Accessing this attribute on the instance is deprecated, and will be removed in Pydantic V3. ' |
|
'Instead, you should access this attribute from the model class.', |
|
category=None, |
|
) |
|
def __get__(self, instance: _ModelT, objtype: type[_ModelT]) -> _RT: ... |
|
def __get__(self, instance: _ModelT | None, objtype: type[_ModelT]) -> _RT: |
|
if instance is not None: |
|
attr_name = self.fget.__name__ if sys.version_info >= (3, 10) else self.fget.__func__.__name__ |
|
warnings.warn( |
|
f'Accessing the {attr_name!r} attribute on the instance is deprecated. ' |
|
'Instead, you should access this attribute from the model class.', |
|
category=PydanticDeprecatedSince211, |
|
stacklevel=2, |
|
) |
|
return self.fget.__get__(instance, objtype)() |
|
|