Spaces:
Running
Running
import json | |
import locale | |
import os | |
from pathlib import Path | |
import threading | |
from typing import Any | |
from uuid import uuid4 | |
import urllib3 | |
from loguru import logger | |
from app.models import const | |
urllib3.disable_warnings() | |
def get_response(status: int, data: Any = None, message: str = ""): | |
obj = { | |
"status": status, | |
} | |
if data: | |
obj["data"] = data | |
if message: | |
obj["message"] = message | |
return obj | |
def to_json(obj): | |
try: | |
# Define a helper function to handle different types of objects | |
def serialize(o): | |
# If the object is a serializable type, return it directly | |
if isinstance(o, (int, float, bool, str)) or o is None: | |
return o | |
# If the object is binary data, convert it to a base64-encoded string | |
elif isinstance(o, bytes): | |
return "*** binary data ***" | |
# If the object is a dictionary, recursively process each key-value pair | |
elif isinstance(o, dict): | |
return {k: serialize(v) for k, v in o.items()} | |
# If the object is a list or tuple, recursively process each element | |
elif isinstance(o, (list, tuple)): | |
return [serialize(item) for item in o] | |
# If the object is a custom type, attempt to return its __dict__ attribute | |
elif hasattr(o, "__dict__"): | |
return serialize(o.__dict__) | |
# Return None for other cases (or choose to raise an exception) | |
else: | |
return None | |
# Use the serialize function to process the input object | |
serialized_obj = serialize(obj) | |
# Serialize the processed object into a JSON string | |
return json.dumps(serialized_obj, ensure_ascii=False, indent=4) | |
except Exception: | |
return None | |
def get_uuid(remove_hyphen: bool = False): | |
u = str(uuid4()) | |
if remove_hyphen: | |
u = u.replace("-", "") | |
return u | |
def root_dir(): | |
return os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) | |
def storage_dir(sub_dir: str = "", create: bool = False): | |
d = os.path.join(root_dir(), "storage") | |
if sub_dir: | |
d = os.path.join(d, sub_dir) | |
if create and not os.path.exists(d): | |
os.makedirs(d) | |
return d | |
def resource_dir(sub_dir: str = ""): | |
d = os.path.join(root_dir(), "resource") | |
if sub_dir: | |
d = os.path.join(d, sub_dir) | |
return d | |
def task_dir(sub_dir: str = ""): | |
d = os.path.join(storage_dir(), "tasks") | |
if sub_dir: | |
d = os.path.join(d, sub_dir) | |
if not os.path.exists(d): | |
os.makedirs(d) | |
return d | |
def font_dir(sub_dir: str = ""): | |
d = resource_dir("fonts") | |
if sub_dir: | |
d = os.path.join(d, sub_dir) | |
if not os.path.exists(d): | |
os.makedirs(d) | |
return d | |
def song_dir(sub_dir: str = ""): | |
d = resource_dir("songs") | |
if sub_dir: | |
d = os.path.join(d, sub_dir) | |
if not os.path.exists(d): | |
os.makedirs(d) | |
return d | |
def public_dir(sub_dir: str = ""): | |
d = resource_dir("public") | |
if sub_dir: | |
d = os.path.join(d, sub_dir) | |
if not os.path.exists(d): | |
os.makedirs(d) | |
return d | |
def run_in_background(func, *args, **kwargs): | |
def run(): | |
try: | |
func(*args, **kwargs) | |
except Exception as e: | |
logger.error(f"run_in_background error: {e}") | |
thread = threading.Thread(target=run) | |
thread.start() | |
return thread | |
def time_convert_seconds_to_hmsm(seconds) -> str: | |
hours = int(seconds // 3600) | |
seconds = seconds % 3600 | |
minutes = int(seconds // 60) | |
milliseconds = int(seconds * 1000) % 1000 | |
seconds = int(seconds % 60) | |
return "{:02d}:{:02d}:{:02d},{:03d}".format(hours, minutes, seconds, milliseconds) | |
def text_to_srt(idx: int, msg: str, start_time: float, end_time: float) -> str: | |
start_time = time_convert_seconds_to_hmsm(start_time) | |
end_time = time_convert_seconds_to_hmsm(end_time) | |
srt = """%d | |
%s --> %s | |
%s | |
""" % ( | |
idx, | |
start_time, | |
end_time, | |
msg, | |
) | |
return srt | |
def str_contains_punctuation(word): | |
for p in const.PUNCTUATIONS: | |
if p in word: | |
return True | |
return False | |
def split_string_by_punctuations(s): | |
result = [] | |
txt = "" | |
previous_char = "" | |
next_char = "" | |
for i in range(len(s)): | |
char = s[i] | |
if char == "\n": | |
result.append(txt.strip()) | |
txt = "" | |
continue | |
if i > 0: | |
previous_char = s[i - 1] | |
if i < len(s) - 1: | |
next_char = s[i + 1] | |
if char == "." and previous_char.isdigit() and next_char.isdigit(): | |
# # In the case of "withdraw 10,000, charged at 2.5% fee", the dot in "2.5" should not be treated as a line break marker | |
txt += char | |
continue | |
if char not in const.PUNCTUATIONS: | |
txt += char | |
else: | |
result.append(txt.strip()) | |
txt = "" | |
result.append(txt.strip()) | |
# filter empty string | |
result = list(filter(None, result)) | |
return result | |
def md5(text): | |
import hashlib | |
return hashlib.md5(text.encode("utf-8")).hexdigest() | |
def get_system_locale(): | |
try: | |
loc = locale.getdefaultlocale() | |
# zh_CN, zh_TW return zh | |
# en_US, en_GB return en | |
language_code = loc[0].split("_")[0] | |
return language_code | |
except Exception: | |
return "en" | |
def load_locales(i18n_dir): | |
_locales = {} | |
for root, dirs, files in os.walk(i18n_dir): | |
for file in files: | |
if file.endswith(".json"): | |
lang = file.split(".")[0] | |
with open(os.path.join(root, file), "r", encoding="utf-8") as f: | |
_locales[lang] = json.loads(f.read()) | |
return _locales | |
def parse_extension(filename): | |
return Path(filename).suffix.lower().lstrip('.') | |