"""Prep gradio API.""" # pylint: diable=invalid-name from typing import List, Optional, Union import os from pathlib import Path import multiprocessing as mp import numpy as np import psutil import ray import gradio as gr # import joblib import more_itertools as mit import numpy as np import psutil import ray from about_time import about_time from logzero import logger from radio_embed import radio_embed num_cpus_m = mp.cpu_count() num_cpus = psutil.cpu_count(logical=False) filename = "fangfang-en.txt" lines = Path(filename).read_text("utf8").splitlines() lst = [_.strip() for _ in lines if _.strip()] args_m = ["\n".join(elm) for elm in mit.divide(num_cpus_m, lst)] args = ["\n".join(elm) for elm in mit.divide(num_cpus, lst)] os.environ["TOKENIZERS_PARALLELISM"] = "false" if not ray.is_initialized(): ray.init(num_cpus=num_cpus) @ray.remote def ray_embed(text): """Embed text to d-512.""" return radio_embed(text) def test_pool(func, args_, num_procs=None): """Test mp.Pool.""" if num_procs is None: num_procs = num_cpus_m elif num_procs < 1: num_procs = num_cpus_m with mp.Pool(num_procs) as pool: ret = pool.map(func, args_) return ret def fn(type_, num_cpus): if type_ in ["mp_pool"]: with about_time() as dur: _ = test_pool(radio_embed, args, num_procs=num_cpus) return dur.duration_human with gr.Blocks() as blocks: with gr.Row(): type_ = gr.Radio( label="type", choices=[ "mp_pool", "joblib(loky)", "joblib(mp)", "ray", ], value="mp_pool", interactive=False, ) num_cpus_ = gr.Slider( label="num_procs", value=num_cpus_m, minimum=1, maximum=2 * num_cpus_m, step=1, ) btn = gr.Button("Go") out = gr.Textbox( label="time elapsed", value="", ) btn.click( fn=fn, inputs=[type_, num_cpus_], outputs=out, api_name="time", ) # gr.Markdown(f"{description}") blocks.launch(enable_queue=True)