freemt
6e17b85
"""Prep gradio API."""
# pylint: diable=invalid-name
from typing import List, Optional, Union
import os
from pathlib import Path
import multiprocessing as mp
import numpy as np
import psutil
import ray
import gradio as gr
# import joblib
import more_itertools as mit
import numpy as np
import psutil
import ray
from about_time import about_time
from logzero import logger
from radio_embed import radio_embed
num_cpus_m = mp.cpu_count()
num_cpus = psutil.cpu_count(logical=False)
filename = "fangfang-en.txt"
lines = Path(filename).read_text("utf8").splitlines()
lst = [_.strip() for _ in lines if _.strip()]
args_m = ["\n".join(elm) for elm in mit.divide(num_cpus_m, lst)]
args = ["\n".join(elm) for elm in mit.divide(num_cpus, lst)]
os.environ["TOKENIZERS_PARALLELISM"] = "false"
if not ray.is_initialized():
ray.init(num_cpus=num_cpus)
@ray.remote
def ray_embed(text):
"""Embed text to d-512."""
return radio_embed(text)
def test_pool(func, args_, num_procs=None):
"""Test mp.Pool."""
if num_procs is None:
num_procs = num_cpus_m
elif num_procs < 1:
num_procs = num_cpus_m
with mp.Pool(num_procs) as pool:
ret = pool.map(func, args_)
return ret
def fn(type_, num_cpus):
if type_ in ["mp_pool"]:
with about_time() as dur:
_ = test_pool(radio_embed, args, num_procs=num_cpus)
return dur.duration_human
with gr.Blocks() as blocks:
with gr.Row():
type_ = gr.Radio(
label="type",
choices=[
"mp_pool",
"joblib(loky)",
"joblib(mp)",
"ray",
],
value="mp_pool",
interactive=False,
)
num_cpus_ = gr.Slider(
label="num_procs",
value=num_cpus_m,
minimum=1,
maximum=2 * num_cpus_m,
step=1,
)
btn = gr.Button("Go")
out = gr.Textbox(
label="time elapsed",
value="",
)
btn.click(
fn=fn,
inputs=[type_, num_cpus_],
outputs=out,
api_name="time",
)
# gr.Markdown(f"{description}")
blocks.launch(enable_queue=True)