Spaces:
Running
Running
import os | |
import tempfile | |
import importlib.util | |
from enum import Enum | |
from contextlib import contextmanager, AbstractContextManager | |
from functools import wraps | |
os.environ["HF_HUB_CACHE"] = "cache" | |
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" | |
import gradio as gr | |
from huggingface_hub import HfApi | |
from huggingface_hub import whoami | |
from huggingface_hub import ModelCard | |
from huggingface_hub import scan_cache_dir | |
from huggingface_hub import logging | |
from gradio_huggingfacehub_search import HuggingfaceHubSearch | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from textwrap import dedent | |
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
Optional, | |
Tuple, | |
Type, | |
Union, | |
NamedTuple, | |
) | |
import mlx.nn as nn | |
import mlx_lm | |
from mlx_lm.utils import ( | |
load_config, | |
get_model_path, | |
) | |
import mlx_vlm | |
# mlx-lm/mlx_lm/utils.py | |
MODEL_REMAPPING_MLX_LM = { | |
"mistral": "llama", # mistral is compatible with llama | |
"phi-msft": "phixtral", | |
"falcon_mamba": "mamba", | |
} | |
# mlx-vlm/mlx_vlm/utils.py | |
MODEL_REMAPPING_MLX_VLM = { | |
"llava-qwen2": "llava_bunny", | |
"bunny-llama": "llava_bunny", | |
} | |
MODEL_REMAPPING = { | |
**MODEL_REMAPPING_MLX_LM, | |
**MODEL_REMAPPING_MLX_VLM, | |
} | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
# I'm not sure if we need to add more stuff here | |
QUANT_PARAMS = { | |
"Q2": 2, | |
"Q3": 3, | |
"Q4": 4, | |
"Q6": 6, | |
"Q8": 8, | |
} | |
class RuntimeInfo(NamedTuple): | |
name: str | |
package: str | |
version: str | |
convert_fn: Callable | |
usage_example: Callable[[str], str] | |
format: str = "MLX" | |
class Runtime(RuntimeInfo, Enum): | |
MLX_LM = RuntimeInfo( | |
name="MLX LM", | |
package="mlx-lm", | |
version=mlx_lm.__version__, | |
convert_fn=mlx_lm.convert, | |
usage_example=lambda upload_repo: dedent( | |
f""" | |
## Use with mlx | |
```bash | |
pip install mlx-lm | |
``` | |
```python | |
from mlx_lm import load, generate | |
model, tokenizer = load("{upload_repo}") | |
prompt="hello" | |
if hasattr(tokenizer, "apply_chat_template") and tokenizer.chat_template is not None: | |
messages = [{{"role": "user", "content": prompt}}] | |
prompt = tokenizer.apply_chat_template( | |
messages, tokenize=False, add_generation_prompt=True | |
) | |
response = generate(model, tokenizer, prompt=prompt, verbose=True) | |
``` | |
""" | |
) | |
) | |
MLX_VLM = RuntimeInfo( | |
name="MLX-VLM", | |
package="mlx-vlm", | |
version=mlx_vlm.__version__, | |
convert_fn=mlx_vlm.convert, | |
usage_example=lambda upload_repo: dedent( | |
f""" | |
```bash | |
pip install -U mlx-vlm | |
``` | |
```bash | |
python -m mlx_vlm.generate --model {upload_repo} --max-tokens 100 --temp 0.0 --prompt "Describe this image." --image <path_to_image> | |
``` | |
""" | |
) | |
) | |
def list_files_in_folder(folder_path): | |
# List all files and directories in the specified folder | |
all_items = os.listdir(folder_path) | |
# Filter out only files | |
files = [item for item in all_items if os.path.isfile(os.path.join(folder_path, item))] | |
return files | |
def clear_hf_cache_space(): | |
scan = scan_cache_dir() | |
to_delete = [] | |
for repo in scan.repos: | |
if repo.repo_type == "model": | |
to_delete.extend([rev.commit_hash for rev in repo.revisions]) | |
scan.delete_revisions(*to_delete).execute() | |
print("Cache has been cleared") | |
def upload_to_hub(path, upload_repo, hf_path, oauth_token, runtime: Runtime): | |
card = ModelCard.load(hf_path, token=oauth_token.token) | |
card.data.tags = ["mlx"] if card.data.tags is None else card.data.tags + ["mlx", "mlx-my-repo"] | |
card.data.base_model = hf_path | |
card.text = dedent( | |
f""" | |
# {upload_repo} | |
The Model [{upload_repo}](https://huggingface.co/{upload_repo}) was converted to {runtime.format} format from [{hf_path}](https://huggingface.co/{hf_path}) using {runtime.package} version **{runtime.version}**. | |
""" | |
) + runtime.usage_example(upload_repo) | |
card.save(os.path.join(path, "README.md")) | |
logging.set_verbosity_info() | |
api = HfApi(token=oauth_token.token) | |
api.create_repo(repo_id=upload_repo, exist_ok=True) | |
files = list_files_in_folder(path) | |
print(files) | |
for file in files: | |
file_path = os.path.join(path, file) | |
print(f"Uploading file: {file_path}") | |
api.upload_file( | |
path_or_fileobj=file_path, | |
path_in_repo=file, | |
repo_id=upload_repo, | |
) | |
print(f"Upload successful, go to https://huggingface.co/{upload_repo} for details.") | |
def patch_strict_default_methods_ctx() -> AbstractContextManager[Callable[[Any, str], None]]: | |
""" | |
Context manager to temporarily set the default value of the 'strict' arg to `False` | |
for specified class methods. | |
Does not affect explict `strict=True`. | |
(e.g. `def update(self, parameters: dict, strict: bool = True)` | |
becomes `def update(self, parameters: dict, strict: bool = False)`) | |
Typical usage: | |
with patch_strict_default_methods_ctx() as patch: | |
patch(Foo, "bar") | |
patch(Foo, "baz") | |
patch(Bar, "foo") | |
# Patched methods active here | |
# Originals restored here | |
""" | |
originals: Dict[Tuple[Type[Any], str], Callable] = {} | |
def patch(cls: Any, method_name: str): | |
method = getattr(cls, method_name) | |
originals[(cls, method_name)] = method | |
def wrapper(self, *args, strict=False, **kwargs): | |
return method(self, *args, strict=strict, **kwargs) | |
setattr(cls, method_name, wrapper) | |
try: | |
yield patch | |
finally: | |
# Restore all patched methods | |
for (cls, method_name), original in originals.items(): | |
setattr(cls, method_name, original) | |
originals.clear() | |
def convert( | |
hf_path: str, | |
mlx_path: str = "mlx_model", | |
quantize: bool = False, | |
q_group_size: int = 64, | |
q_bits: int = 4, | |
dtype: Optional[str] = None, | |
upload_repo: str = None, | |
revision: Optional[str] = None, | |
dequantize: bool = False, | |
quant_predicate: Optional[ | |
Union[Callable[[str, nn.Module, dict], Union[bool, dict]], str] | |
] = None, # mlx-lm | |
skip_vision: bool = False, # mlx-vlm | |
trust_remote_code: bool = True, # mlx-vlm | |
) -> Runtime : | |
model_path = get_model_path(hf_path, revision=revision) | |
def mlx_lm_convert(): | |
mlx_lm.convert( | |
hf_path=hf_path, | |
mlx_path=mlx_path, | |
quantize=quantize, | |
q_group_size=q_group_size, | |
q_bits=q_bits, | |
dtype=dtype, | |
upload_repo=upload_repo, | |
revision=revision, | |
dequantize=dequantize, | |
quant_predicate=quant_predicate, | |
) | |
def mlx_vlm_convert(): | |
def _mlx_vlm_convert(): | |
mlx_vlm.convert( | |
#hf_path=new_model_path, | |
hf_path=hf_path, | |
mlx_path=mlx_path, | |
quantize=quantize, | |
q_group_size=q_group_size, | |
q_bits=q_bits, | |
dtype=dtype, | |
upload_repo=upload_repo, | |
revision=revision, | |
dequantize=dequantize, | |
skip_vision=skip_vision, | |
trust_remote_code=trust_remote_code, | |
) | |
try: | |
_mlx_vlm_convert() | |
except ValueError as e: | |
print(e) | |
print(f"Error converting, try again with strict = False") | |
with patch_strict_default_methods_ctx() as patch: | |
import mlx.nn as n | |
patch(nn.Module, "load_weights") | |
patch(nn.Module, "update") | |
patch(nn.Module, "update_modules") | |
# patched strict=False by default, try again | |
_mlx_vlm_convert() | |
config = load_config(model_path) | |
model_type = config["model_type"] | |
model_type = MODEL_REMAPPING.get(model_type, model_type) | |
is_lm = importlib.util.find_spec(f"mlx_lm.models.{model_type}") is not None | |
is_vlm = importlib.util.find_spec(f"mlx_vlm.models.{model_type}") is not None | |
if is_lm and (not is_vlm): | |
mlx_lm_convert() | |
runtime = Runtime.MLX_LM | |
elif is_vlm and (not is_lm): | |
mlx_vlm_convert() | |
runtime = Runtime.MLX_VLM | |
else: | |
# fallback in-case our MODEL_REMAPPING is outdated | |
try: | |
mlx_vlm_convert() | |
runtime = Runtime.MLX_VLM | |
except Exception as e: | |
mlx_lm_convert() | |
runtime = Runtime.MLX_LM | |
return runtime | |
def process_model(model_id, q_method, oauth_token: gr.OAuthToken | None): | |
if oauth_token.token is None: | |
raise ValueError("You must be logged in to use MLX-my-repo") | |
model_name = model_id.split('/')[-1] | |
username = whoami(oauth_token.token)["name"] | |
try: | |
if q_method == "FP16": | |
upload_repo = f"{username}/{model_name}-mlx-fp16" | |
with tempfile.TemporaryDirectory(dir="converted") as tmpdir: | |
# The target directory must not exist | |
mlx_path = os.path.join(tmpdir, "mlx") | |
runtime = convert(model_id, mlx_path=mlx_path, quantize=False, dtype="float16") | |
print("Conversion done") | |
upload_to_hub(path=mlx_path, upload_repo=upload_repo, hf_path=model_id, oauth_token=oauth_token, runtime=runtime) | |
print("Upload done") | |
else: | |
q_bits = QUANT_PARAMS[q_method] | |
upload_repo = f"{username}/{model_name}-mlx-{q_bits}Bit" | |
with tempfile.TemporaryDirectory(dir="converted") as tmpdir: | |
# The target directory must not exist | |
mlx_path = os.path.join(tmpdir, "mlx") | |
runtime = convert(model_id, mlx_path=mlx_path, quantize=True, q_bits=q_bits) | |
print("Conversion done") | |
upload_to_hub(path=mlx_path, upload_repo=upload_repo, hf_path=model_id, oauth_token=oauth_token, runtime=runtime) | |
print("Upload done") | |
return ( | |
f'Find your repo <a href="https://hf.co/{upload_repo}" target="_blank" style="text-decoration:underline">here</a>', | |
"llama.png", | |
) | |
except Exception as e: | |
return (f"Error: {e}", "error.png") | |
finally: | |
clear_hf_cache_space() | |
print("Folder cleaned up successfully!") | |
css="""/* Custom CSS to allow scrolling */ | |
.gradio-container {overflow-y: auto;} | |
""" | |
# Create Gradio interface | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown("You must be logged in to use MLX-my-repo.") | |
gr.LoginButton(min_width=250) | |
model_id = HuggingfaceHubSearch( | |
label="Hub Model ID", | |
placeholder="Search for model id on Huggingface", | |
search_type="model", | |
) | |
q_method = gr.Dropdown( | |
["FP16", "Q2", "Q3", "Q4", "Q6", "Q8"], | |
label="Conversion Method", | |
info="MLX conversion type (FP16 for float16, Q2–Q8 for quantized models)", | |
value="Q4", | |
filterable=False, | |
visible=True | |
) | |
iface = gr.Interface( | |
fn=process_model, | |
inputs=[ | |
model_id, | |
q_method, | |
], | |
outputs=[ | |
gr.Markdown(label="output"), | |
gr.Image(show_label=False), | |
], | |
title="Create your own MLX Models, blazingly fast ⚡!", | |
description="The space takes an HF repo as an input, converts it to MLX format (FP16 or quantized), and creates a Public/Private repo under your HF user namespace.", | |
api_name=False | |
) | |
def restart_space(): | |
HfApi().restart_space(repo_id="reach-vb/mlx-my-repo", token=HF_TOKEN, factory_reboot=True) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", seconds=21600) | |
scheduler.start() | |
# Launch the interface | |
demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False) | |