Spaces:
Running
Running
import json | |
from typing import Dict | |
import redis | |
from app.controllers.manager.base_manager import TaskManager | |
from app.models.schema import VideoParams | |
from app.services import task as tm | |
FUNC_MAP = { | |
"start": tm.start, | |
# 'start_test': tm.start_test | |
} | |
class RedisTaskManager(TaskManager): | |
def __init__(self, max_concurrent_tasks: int, redis_url: str): | |
self.redis_client = redis.Redis.from_url(redis_url) | |
super().__init__(max_concurrent_tasks) | |
def create_queue(self): | |
return "task_queue" | |
def enqueue(self, task: Dict): | |
task_with_serializable_params = task.copy() | |
if "params" in task["kwargs"] and isinstance( | |
task["kwargs"]["params"], VideoParams | |
): | |
task_with_serializable_params["kwargs"]["params"] = task["kwargs"][ | |
"params" | |
].dict() | |
# 将函数对象转换为其名称 | |
task_with_serializable_params["func"] = task["func"].__name__ | |
self.redis_client.rpush(self.queue, json.dumps(task_with_serializable_params)) | |
def dequeue(self): | |
task_json = self.redis_client.lpop(self.queue) | |
if task_json: | |
task_info = json.loads(task_json) | |
# 将函数名称转换回函数对象 | |
task_info["func"] = FUNC_MAP[task_info["func"]] | |
if "params" in task_info["kwargs"] and isinstance( | |
task_info["kwargs"]["params"], dict | |
): | |
task_info["kwargs"]["params"] = VideoParams( | |
**task_info["kwargs"]["params"] | |
) | |
return task_info | |
return None | |
def is_queue_empty(self): | |
return self.redis_client.llen(self.queue) == 0 | |