Spaces:
Sleeping
Sleeping
import os | |
import aiohttp | |
import asyncio | |
import requests | |
from tqdm import tqdm | |
from typing import Optional | |
from concurrent.futures import ThreadPoolExecutor | |
# ============================= | |
# Configuration | |
# ============================= | |
MODEL_DIR = os.getenv("MODEL_DIR", "models") | |
DEFAULT_MODEL_URL = os.getenv( | |
"DEFAULT_MODEL_URL", | |
"https://huggingface.co/TheBloke/CapybaraHermes-2.5-Mistral-7B-GGUF/resolve/main/capybarahermes-2.5-mistral-7b.Q5_K_S.gguf" | |
) | |
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Optional: for gated repos | |
# ============================= | |
# Utilities | |
# ============================= | |
def extract_filename(url: str) -> str: | |
return url.split("/")[-1] | |
def list_available_models() -> list: | |
"""List all .gguf models in the model directory.""" | |
if not os.path.isdir(MODEL_DIR): | |
return [] | |
return [f for f in os.listdir(MODEL_DIR) if f.endswith(".gguf")] | |
def get_model_path(filename: str) -> str: | |
"""Get full path for a given model filename.""" | |
path = os.path.join(MODEL_DIR, filename) | |
if not os.path.exists(path): | |
raise FileNotFoundError(f"β οΈ Model not found at {path}") | |
return path | |
# ============================= | |
# Sync Model Download (fallback) | |
# ============================= | |
def download_model_if_missing(model_url: str = DEFAULT_MODEL_URL) -> Optional[str]: | |
os.makedirs(MODEL_DIR, exist_ok=True) | |
model_filename = extract_filename(model_url) | |
model_path = os.path.join(MODEL_DIR, model_filename) | |
if os.path.exists(model_path): | |
print(f"β Model already exists: {model_path}") | |
return model_path | |
headers = {} | |
if HUGGINGFACE_TOKEN: | |
headers["Authorization"] = f"Bearer {HUGGINGFACE_TOKEN}" | |
try: | |
print(f"β¬οΈ Downloading model from {model_url}") | |
with requests.get(model_url, headers=headers, stream=True, timeout=60) as r: | |
r.raise_for_status() | |
total = int(r.headers.get('content-length', 0)) | |
with open(model_path, 'wb') as file, tqdm( | |
total=total, unit='B', unit_scale=True, desc=model_filename | |
) as pbar: | |
for chunk in r.iter_content(chunk_size=8192): | |
file.write(chunk) | |
pbar.update(len(chunk)) | |
print(f"β Download complete: {model_path}") | |
return model_path | |
except requests.exceptions.RequestException as e: | |
print(f"β Failed to download model: {e}") | |
return None | |
# ============================= | |
# Optional Async Download (advanced) | |
# ============================= | |
async def _async_download_file(url: str, output_path: str, token: Optional[str] = None): | |
headers = {} | |
if token: | |
headers["Authorization"] = f"Bearer {token}" | |
async with aiohttp.ClientSession(headers=headers) as session: | |
async with session.get(url) as response: | |
response.raise_for_status() | |
total = int(response.headers.get("Content-Length", 0)) | |
with open(output_path, "wb") as f, tqdm( | |
total=total, unit="B", unit_scale=True, desc=os.path.basename(output_path) | |
) as pbar: | |
async for chunk in response.content.iter_chunked(1024): | |
f.write(chunk) | |
pbar.update(len(chunk)) | |
def download_model_async(model_url: str = DEFAULT_MODEL_URL) -> Optional[str]: | |
os.makedirs(MODEL_DIR, exist_ok=True) | |
model_filename = extract_filename(model_url) | |
model_path = os.path.join(MODEL_DIR, model_filename) | |
if os.path.exists(model_path): | |
print(f"β Model already exists: {model_path}") | |
return model_path | |
print(f"β¬οΈ [Async] Downloading model from {model_url} ...") | |
try: | |
asyncio.run(_async_download_file(model_url, model_path, token=HUGGINGFACE_TOKEN)) | |
print(f"β Async download complete: {model_path}") | |
return model_path | |
except Exception as e: | |
print(f"β Async download failed: {e}") | |
return None | |