Spaces:
Running
Running
import threading | |
from typing import Any, Callable, Dict | |
class TaskManager: | |
def __init__(self, max_concurrent_tasks: int): | |
self.max_concurrent_tasks = max_concurrent_tasks | |
self.current_tasks = 0 | |
self.lock = threading.Lock() | |
self.queue = self.create_queue() | |
def create_queue(self): | |
raise NotImplementedError() | |
def add_task(self, func: Callable, *args: Any, **kwargs: Any): | |
with self.lock: | |
if self.current_tasks < self.max_concurrent_tasks: | |
print(f"add task: {func.__name__}, current_tasks: {self.current_tasks}") | |
self.execute_task(func, *args, **kwargs) | |
else: | |
print( | |
f"enqueue task: {func.__name__}, current_tasks: {self.current_tasks}" | |
) | |
self.enqueue({"func": func, "args": args, "kwargs": kwargs}) | |
def execute_task(self, func: Callable, *args: Any, **kwargs: Any): | |
thread = threading.Thread( | |
target=self.run_task, args=(func, *args), kwargs=kwargs | |
) | |
thread.start() | |
def run_task(self, func: Callable, *args: Any, **kwargs: Any): | |
try: | |
with self.lock: | |
self.current_tasks += 1 | |
func(*args, **kwargs) # call the function here, passing *args and **kwargs. | |
finally: | |
self.task_done() | |
def check_queue(self): | |
with self.lock: | |
if ( | |
self.current_tasks < self.max_concurrent_tasks | |
and not self.is_queue_empty() | |
): | |
task_info = self.dequeue() | |
func = task_info["func"] | |
args = task_info.get("args", ()) | |
kwargs = task_info.get("kwargs", {}) | |
self.execute_task(func, *args, **kwargs) | |
def task_done(self): | |
with self.lock: | |
self.current_tasks -= 1 | |
self.check_queue() | |
def enqueue(self, task: Dict): | |
raise NotImplementedError() | |
def dequeue(self): | |
raise NotImplementedError() | |
def is_queue_empty(self): | |
raise NotImplementedError() | |