File size: 5,351 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
from collections import defaultdict, deque
from itertools import filterfalse
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
# copied from more_itertools 8.8
def always_iterable(obj, base_type=(str, bytes)):
"""If *obj* is iterable, return an iterator over its items::
>>> obj = (1, 2, 3)
>>> list(always_iterable(obj))
[1, 2, 3]
If *obj* is not iterable, return a one-item iterable containing *obj*::
>>> obj = 1
>>> list(always_iterable(obj))
[1]
If *obj* is ``None``, return an empty iterable:
>>> obj = None
>>> list(always_iterable(None))
[]
By default, binary and text strings are not considered iterable::
>>> obj = 'foo'
>>> list(always_iterable(obj))
['foo']
If *base_type* is set, objects for which ``isinstance(obj, base_type)``
returns ``True`` won't be considered iterable.
>>> obj = {'a': 1}
>>> list(always_iterable(obj)) # Iterate over the dict's keys
['a']
>>> list(always_iterable(obj, base_type=dict)) # Treat dicts as a unit
[{'a': 1}]
Set *base_type* to ``None`` to avoid any special handling and treat objects
Python considers iterable as iterable:
>>> obj = 'foo'
>>> list(always_iterable(obj, base_type=None))
['f', 'o', 'o']
"""
if obj is None:
return iter(())
if (base_type is not None) and isinstance(obj, base_type):
return iter((obj,))
try:
return iter(obj)
except TypeError:
return iter((obj,))
# Copied from more_itertools 10.3
class bucket:
"""Wrap *iterable* and return an object that buckets the iterable into
child iterables based on a *key* function.
>>> iterable = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'b3']
>>> s = bucket(iterable, key=lambda x: x[0]) # Bucket by 1st character
>>> sorted(list(s)) # Get the keys
['a', 'b', 'c']
>>> a_iterable = s['a']
>>> next(a_iterable)
'a1'
>>> next(a_iterable)
'a2'
>>> list(s['b'])
['b1', 'b2', 'b3']
The original iterable will be advanced and its items will be cached until
they are used by the child iterables. This may require significant storage.
By default, attempting to select a bucket to which no items belong will
exhaust the iterable and cache all values.
If you specify a *validator* function, selected buckets will instead be
checked against it.
>>> from itertools import count
>>> it = count(1, 2) # Infinite sequence of odd numbers
>>> key = lambda x: x % 10 # Bucket by last digit
>>> validator = lambda x: x in {1, 3, 5, 7, 9} # Odd digits only
>>> s = bucket(it, key=key, validator=validator)
>>> 2 in s
False
>>> list(s[2])
[]
"""
def __init__(self, iterable, key, validator=None):
self._it = iter(iterable)
self._key = key
self._cache = defaultdict(deque)
self._validator = validator or (lambda x: True)
def __contains__(self, value):
if not self._validator(value):
return False
try:
item = next(self[value])
except StopIteration:
return False
else:
self._cache[value].appendleft(item)
return True
def _get_values(self, value):
"""
Helper to yield items from the parent iterator that match *value*.
Items that don't match are stored in the local cache as they
are encountered.
"""
while True:
# If we've cached some items that match the target value, emit
# the first one and evict it from the cache.
if self._cache[value]:
yield self._cache[value].popleft()
# Otherwise we need to advance the parent iterator to search for
# a matching item, caching the rest.
else:
while True:
try:
item = next(self._it)
except StopIteration:
return
item_value = self._key(item)
if item_value == value:
yield item
break
elif self._validator(item_value):
self._cache[item_value].append(item)
def __iter__(self):
for item in self._it:
item_value = self._key(item)
if self._validator(item_value):
self._cache[item_value].append(item)
yield from self._cache.keys()
def __getitem__(self, value):
if not self._validator(value):
return iter(())
return self._get_values(value)
|