|
""" |
|
A fork of Python 3.6's stdlib queue (found in Pythons 'cpython/Lib/queue.py') |
|
with Lock swapped out for RLock to avoid a deadlock while garbage collecting. |
|
|
|
https://github.com/python/cpython/blob/v3.6.12/Lib/queue.py |
|
|
|
|
|
See also |
|
https://codewithoutrules.com/2017/08/16/concurrency-python/ |
|
https://bugs.python.org/issue14976 |
|
https://github.com/sqlalchemy/sqlalchemy/blob/4eb747b61f0c1b1c25bdee3856d7195d10a0c227/lib/sqlalchemy/queue.py#L1 |
|
|
|
We also vendor the code to evade eventlet's broken monkeypatching, see |
|
https://github.com/getsentry/sentry-python/pull/484 |
|
|
|
|
|
Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, |
|
2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation; |
|
|
|
All Rights Reserved |
|
|
|
|
|
PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2 |
|
-------------------------------------------- |
|
|
|
1. This LICENSE AGREEMENT is between the Python Software Foundation |
|
("PSF"), and the Individual or Organization ("Licensee") accessing and |
|
otherwise using this software ("Python") in source or binary form and |
|
its associated documentation. |
|
|
|
2. Subject to the terms and conditions of this License Agreement, PSF hereby |
|
grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, |
|
analyze, test, perform and/or display publicly, prepare derivative works, |
|
distribute, and otherwise use Python alone or in any derivative version, |
|
provided, however, that PSF's License Agreement and PSF's notice of copyright, |
|
i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, |
|
2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation; |
|
All Rights Reserved" are retained in Python alone or in any derivative version |
|
prepared by Licensee. |
|
|
|
3. In the event Licensee prepares a derivative work that is based on |
|
or incorporates Python or any part thereof, and wants to make |
|
the derivative work available to others as provided herein, then |
|
Licensee hereby agrees to include in any such work a brief summary of |
|
the changes made to Python. |
|
|
|
4. PSF is making Python available to Licensee on an "AS IS" |
|
basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR |
|
IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND |
|
DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS |
|
FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT |
|
INFRINGE ANY THIRD PARTY RIGHTS. |
|
|
|
5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON |
|
FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS |
|
A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, |
|
OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. |
|
|
|
6. This License Agreement will automatically terminate upon a material |
|
breach of its terms and conditions. |
|
|
|
7. Nothing in this License Agreement shall be deemed to create any |
|
relationship of agency, partnership, or joint venture between PSF and |
|
Licensee. This License Agreement does not grant permission to use PSF |
|
trademarks or trade name in a trademark sense to endorse or promote |
|
products or services of Licensee, or any third party. |
|
|
|
8. By copying, installing or otherwise using Python, Licensee |
|
agrees to be bound by the terms and conditions of this License |
|
Agreement. |
|
|
|
""" |
|
|
|
import threading |
|
|
|
from collections import deque |
|
from time import time |
|
|
|
from typing import TYPE_CHECKING |
|
|
|
if TYPE_CHECKING: |
|
from typing import Any |
|
|
|
__all__ = ["EmptyError", "FullError", "Queue"] |
|
|
|
|
|
class EmptyError(Exception): |
|
"Exception raised by Queue.get(block=0)/get_nowait()." |
|
|
|
pass |
|
|
|
|
|
class FullError(Exception): |
|
"Exception raised by Queue.put(block=0)/put_nowait()." |
|
|
|
pass |
|
|
|
|
|
class Queue: |
|
"""Create a queue object with a given maximum size. |
|
|
|
If maxsize is <= 0, the queue size is infinite. |
|
""" |
|
|
|
def __init__(self, maxsize=0): |
|
self.maxsize = maxsize |
|
self._init(maxsize) |
|
|
|
|
|
|
|
|
|
|
|
self.mutex = threading.RLock() |
|
|
|
|
|
|
|
self.not_empty = threading.Condition(self.mutex) |
|
|
|
|
|
|
|
self.not_full = threading.Condition(self.mutex) |
|
|
|
|
|
|
|
self.all_tasks_done = threading.Condition(self.mutex) |
|
self.unfinished_tasks = 0 |
|
|
|
def task_done(self): |
|
"""Indicate that a formerly enqueued task is complete. |
|
|
|
Used by Queue consumer threads. For each get() used to fetch a task, |
|
a subsequent call to task_done() tells the queue that the processing |
|
on the task is complete. |
|
|
|
If a join() is currently blocking, it will resume when all items |
|
have been processed (meaning that a task_done() call was received |
|
for every item that had been put() into the queue). |
|
|
|
Raises a ValueError if called more times than there were items |
|
placed in the queue. |
|
""" |
|
with self.all_tasks_done: |
|
unfinished = self.unfinished_tasks - 1 |
|
if unfinished <= 0: |
|
if unfinished < 0: |
|
raise ValueError("task_done() called too many times") |
|
self.all_tasks_done.notify_all() |
|
self.unfinished_tasks = unfinished |
|
|
|
def join(self): |
|
"""Blocks until all items in the Queue have been gotten and processed. |
|
|
|
The count of unfinished tasks goes up whenever an item is added to the |
|
queue. The count goes down whenever a consumer thread calls task_done() |
|
to indicate the item was retrieved and all work on it is complete. |
|
|
|
When the count of unfinished tasks drops to zero, join() unblocks. |
|
""" |
|
with self.all_tasks_done: |
|
while self.unfinished_tasks: |
|
self.all_tasks_done.wait() |
|
|
|
def qsize(self): |
|
"""Return the approximate size of the queue (not reliable!).""" |
|
with self.mutex: |
|
return self._qsize() |
|
|
|
def empty(self): |
|
"""Return True if the queue is empty, False otherwise (not reliable!). |
|
|
|
This method is likely to be removed at some point. Use qsize() == 0 |
|
as a direct substitute, but be aware that either approach risks a race |
|
condition where a queue can grow before the result of empty() or |
|
qsize() can be used. |
|
|
|
To create code that needs to wait for all queued tasks to be |
|
completed, the preferred technique is to use the join() method. |
|
""" |
|
with self.mutex: |
|
return not self._qsize() |
|
|
|
def full(self): |
|
"""Return True if the queue is full, False otherwise (not reliable!). |
|
|
|
This method is likely to be removed at some point. Use qsize() >= n |
|
as a direct substitute, but be aware that either approach risks a race |
|
condition where a queue can shrink before the result of full() or |
|
qsize() can be used. |
|
""" |
|
with self.mutex: |
|
return 0 < self.maxsize <= self._qsize() |
|
|
|
def put(self, item, block=True, timeout=None): |
|
"""Put an item into the queue. |
|
|
|
If optional args 'block' is true and 'timeout' is None (the default), |
|
block if necessary until a free slot is available. If 'timeout' is |
|
a non-negative number, it blocks at most 'timeout' seconds and raises |
|
the FullError exception if no free slot was available within that time. |
|
Otherwise ('block' is false), put an item on the queue if a free slot |
|
is immediately available, else raise the FullError exception ('timeout' |
|
is ignored in that case). |
|
""" |
|
with self.not_full: |
|
if self.maxsize > 0: |
|
if not block: |
|
if self._qsize() >= self.maxsize: |
|
raise FullError() |
|
elif timeout is None: |
|
while self._qsize() >= self.maxsize: |
|
self.not_full.wait() |
|
elif timeout < 0: |
|
raise ValueError("'timeout' must be a non-negative number") |
|
else: |
|
endtime = time() + timeout |
|
while self._qsize() >= self.maxsize: |
|
remaining = endtime - time() |
|
if remaining <= 0.0: |
|
raise FullError() |
|
self.not_full.wait(remaining) |
|
self._put(item) |
|
self.unfinished_tasks += 1 |
|
self.not_empty.notify() |
|
|
|
def get(self, block=True, timeout=None): |
|
"""Remove and return an item from the queue. |
|
|
|
If optional args 'block' is true and 'timeout' is None (the default), |
|
block if necessary until an item is available. If 'timeout' is |
|
a non-negative number, it blocks at most 'timeout' seconds and raises |
|
the EmptyError exception if no item was available within that time. |
|
Otherwise ('block' is false), return an item if one is immediately |
|
available, else raise the EmptyError exception ('timeout' is ignored |
|
in that case). |
|
""" |
|
with self.not_empty: |
|
if not block: |
|
if not self._qsize(): |
|
raise EmptyError() |
|
elif timeout is None: |
|
while not self._qsize(): |
|
self.not_empty.wait() |
|
elif timeout < 0: |
|
raise ValueError("'timeout' must be a non-negative number") |
|
else: |
|
endtime = time() + timeout |
|
while not self._qsize(): |
|
remaining = endtime - time() |
|
if remaining <= 0.0: |
|
raise EmptyError() |
|
self.not_empty.wait(remaining) |
|
item = self._get() |
|
self.not_full.notify() |
|
return item |
|
|
|
def put_nowait(self, item): |
|
"""Put an item into the queue without blocking. |
|
|
|
Only enqueue the item if a free slot is immediately available. |
|
Otherwise raise the FullError exception. |
|
""" |
|
return self.put(item, block=False) |
|
|
|
def get_nowait(self): |
|
"""Remove and return an item from the queue without blocking. |
|
|
|
Only get an item if one is immediately available. Otherwise |
|
raise the EmptyError exception. |
|
""" |
|
return self.get(block=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def _init(self, maxsize): |
|
self.queue = deque() |
|
|
|
def _qsize(self): |
|
return len(self.queue) |
|
|
|
|
|
def _put(self, item): |
|
self.queue.append(item) |
|
|
|
|
|
def _get(self): |
|
return self.queue.popleft() |
|
|