Spaces:
Running
Running
File size: 24,386 Bytes
4d1849f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# FILE: modules/model_manager_public.py
import asyncio
import logging
import os
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from typing import Dict, Any, Optional, List, Callable, Coroutine
# --- Llama.cpp Python Backend Import & Debug ---
logger_import_debug = logging.getLogger("ModelManager_ImportDebug")
if not logger_import_debug.handlers: # Minimal setup for this specific logger
_h = logging.StreamHandler(sys.stdout)
_f = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_h.setFormatter(_f)
logger_import_debug.addHandler(_h)
logger_import_debug.setLevel(logging.INFO)
# --- Start of FIX: Define LlamaMock unconditionally and manage Llama variable ---
class LlamaMock:
def __init__(self, model_path: str = "mock_model_path", *args, **kwargs):
# Use logger_import_debug as it's defined at this point
logger_import_debug.error(f"LlamaMock initialized for model_path='{model_path}' with args: {args}, kwargs: {kwargs}. llama_cpp is not installed or importable.")
self.model_path = model_path
self.n_ctx_train = kwargs.get('n_ctx', 0) # Mock common attributes
self.metadata = {}
def create_chat_completion(self, messages: List[Dict[str,str]], *args, **kwargs) -> Dict[str, Any]:
logger_import_debug.error(f"LlamaMock: create_chat_completion called for '{self.model_path}', but llama_cpp is unavailable.")
return {
"id": "chatcmpl-mock", "object": "chat.completion", "created": int(time.time()),
"model": self.model_path,
"choices": [{"index": 0, "message": {"role": "assistant", "content": f"[Error: Llama.cpp backend not available for {self.model_path}]"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
}
def __call__(self, prompt: str, *args, **kwargs) -> Dict[str, Any]: # For older direct call style
logger_import_debug.error(f"LlamaMock: __call__ used for '{self.model_path}', but llama_cpp is unavailable.")
return {
"id": "cmpl-mock", "object": "text_completion", "created": int(time.time()),
"model": self.model_path,
"choices": [{"text": f"[Error: Llama.cpp backend not available for {self.model_path}]", "index": 0, "logprobs": None, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
}
def get_metadata(self) -> Dict[str, Any]:
return self.metadata
def tokenizer(self) -> Any:
class MockTokenizer:
def encode(self, text: str) -> List[int]: return [0] * len(text)
def decode(self, tokens: List[int]) -> str: return "".join([chr(t) for t in tokens if t < 256]) # Simplistic
return MockTokenizer()
# Initialize variables that will hold the class to be used
Llama_imported_class: Any = LlamaMock # Default to mock
LlamaCppError_imported_class: Any = Exception # Default to base Exception
LLAMA_CPP_AVAILABLE: bool = False
try:
from llama_cpp import Llama as RealLlama, LlamaCppError as RealLlamaCppSpecificError
Llama_imported_class = RealLlama
LlamaCppError_imported_class = RealLlamaCppSpecificError
LLAMA_CPP_AVAILABLE = True
logger_import_debug.info("π Final llama-cpp-python status: AVAILABLE. Using REAL Llama class. Error catching for Llama.cpp specifics will use 'LlamaCppSpecificError'.")
except ImportError:
logger_import_debug.warning("Could not import 'Llama' and 'LlamaCppSpecificError' from 'llama_cpp' directly. Trying simpler import...")
try:
from llama_cpp import Llama as RealLlamaOnly # Try again without specific error import (older versions)
Llama_imported_class = RealLlamaOnly
# LlamaCppError_imported_class remains Exception
LLAMA_CPP_AVAILABLE = True
logger_import_debug.warning("Note: 'LlamaCppError' not found at llama_cpp top-level. Using base 'Exception' for LlamaCpp errors.")
logger_import_debug.info("π Final llama-cpp-python status: AVAILABLE (LlamaCppError not found). Using REAL Llama class.")
except ImportError:
# Llama_imported_class remains LlamaMock, LLAMA_CPP_AVAILABLE remains False
logger_import_debug.critical("CRITICAL FAILURE: Cannot import 'Llama' from 'llama_cpp': No module named 'llama_cpp'. Using LlamaMock. Model loading will fail for real models.")
except Exception as e_import_general:
# Llama_imported_class remains LlamaMock, LLAMA_CPP_AVAILABLE remains False
logger_import_debug.critical(f"CRITICAL FAILURE: Unexpected error during 'llama_cpp' import: {e_import_general}. Using LlamaMock.", exc_info=True)
# Assign to the names used throughout the rest of this module
Llama = Llama_imported_class
LlamaCppError = LlamaCppError_imported_class
# --- End of FIX ---
# --- End Llama.cpp Python Backend Import & Debug ---
try:
# Assume config_settings_public.py is in the same 'modules' directory or project root is in sys.path
from modules.config_settings_public import (
MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS, N_CTX_FALLBACK, VERBOSE_LLAMA_CPP
)
except ImportError as e_conf:
logging.critical(f"CRITICAL IMPORT ERROR in ModelManager: Cannot import from config_settings_public: {e_conf}")
# Provide minimal fallbacks if config cannot be loaded, though this is a critical failure
MODEL_PATHS = {}
MODEL_SPECIFIC_PARAMS = {"_default": {"n_ctx": 2048, "n_gpu_layers": -1, "verbose": False}}
INFERENCE_PRESETS = {"default": {"temperature": 0.7}}
N_CTX_FALLBACK = 2048
VERBOSE_LLAMA_CPP = False
logger = logging.getLogger("ZOTHEOS_ModelManager")
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Timeout for model generation tasks when run in a thread
GENERATION_THREAD_TIMEOUT_SECONDS = 300 # 5 minutes
class ModelManager:
def __init__(self, device_preference: Optional[str] = "cuda", max_model_count: int = 1, max_ram_models_gb: float = 8.0):
self.device_preference = device_preference
self.loaded_models: Dict[str, Any] = {} # Stores Llama instances
self.model_load_status: Dict[str, str] = {} # "unloaded", "loading", "loaded", "error"
self.model_ram_estimate_gb: Dict[str, float] = {}
self.total_estimated_ram_gb: float = 0.0
self.max_model_count = max_model_count
self.max_ram_models_gb = max_ram_models_gb
self.model_load_order: List[str] = [] # Tracks order for LRU
self.model_locks: Dict[str, asyncio.Lock] = {} # Per-model lock for loading
self.executor = ThreadPoolExecutor(max_workers=max_model_count + 1, thread_name_prefix="LLM_Gen") # +1 for summarizer
dev_log = "CPU (default)"
if self.device_preference == "cuda":
# Check NVIDIA drivers and CUDA toolkit availability (conceptual)
try:
import torch
if torch.cuda.is_available():
dev_log = f"CUDA (preference): {torch.cuda.get_device_name(0)}"
else:
dev_log = "CUDA (preference, but torch.cuda.is_available()=False)"
except ImportError:
dev_log = "CUDA (preference, but PyTorch not found for detailed check)"
except Exception as e_cuda_check:
dev_log = f"CUDA (preference, error during torch check: {e_cuda_check})"
# Actual CUDA usage is determined by n_gpu_layers > 0 during Llama init
logger.info(f"π₯ MMgr init. Dev Preference: {dev_log}. MaxRAM Models: {self.max_ram_models_gb}GB. Max Count: {self.max_model_count}.")
if not LLAMA_CPP_AVAILABLE:
logger.critical("β Llama.cpp backend CRITICAL FAILURE: The 'Llama' class could not be imported from 'llama_cpp'. ModelManager cannot load GGUF models. Please check installation and logs from 'ModelManager_ImportDebug'.")
logger.info(f"β
MMGR Config: Max Models: {self.max_model_count}, Max RAM: {self.max_ram_models_gb} GB.")
async def _ensure_model_loaded(self, model_name: str) -> bool:
if model_name not in self.model_locks:
self.model_locks[model_name] = asyncio.Lock()
async with self.model_locks[model_name]:
if self.model_load_status.get(model_name) == "loaded" and model_name in self.loaded_models:
if model_name in self.model_load_order: self.model_load_order.remove(model_name)
self.model_load_order.append(model_name) # Move to end of LRU
return True
if self.model_load_status.get(model_name) == "loading":
logger.info(f"Model '{model_name}' is already being loaded by another task. Waiting...")
while self.model_load_status.get(model_name) == "loading": await asyncio.sleep(0.5)
return self.model_load_status.get(model_name) == "loaded"
self.model_load_status[model_name] = "loading"
logger.info(f"Attempting to load model '{model_name}'...")
await self._evict_models_if_needed(new_model_name_to_load=model_name)
model_instance, ram_gb = await asyncio.to_thread(self._load_model_from_disk, model_name)
if model_instance:
self.loaded_models[model_name] = model_instance
self.model_ram_estimate_gb[model_name] = ram_gb
self.total_estimated_ram_gb += ram_gb
self.model_load_status[model_name] = "loaded"
if model_name in self.model_load_order: self.model_load_order.remove(model_name)
self.model_load_order.append(model_name)
logger.info(f"β
Model '{model_name}' loaded successfully. RAM used by this model: {ram_gb:.2f}GB. Total est. RAM: {self.total_estimated_ram_gb:.2f}GB.")
return True
else:
self.model_load_status[model_name] = "error"
logger.error(f"β Failed to load model '{model_name}'.")
# Clean up RAM if an estimate was added prematurely or if partial load occurred
if model_name in self.model_ram_estimate_gb:
self.total_estimated_ram_gb -= self.model_ram_estimate_gb.pop(model_name)
return False
def _load_model_from_disk(self, model_name: str) -> tuple[Optional[Any], float]:
# The check Llama == LlamaMock now works because LlamaMock is always defined
if not LLAMA_CPP_AVAILABLE or Llama == LlamaMock: # Llama is the variable pointing to the class
logger.error(f"Cannot load model '{model_name}': Llama.cpp backend not available (LLAMA_CPP_AVAILABLE={LLAMA_CPP_AVAILABLE}, Llama is LlamaMock: {Llama == LlamaMock}).")
return None, 0.0
model_path = MODEL_PATHS.get(model_name)
if not model_path or not os.path.exists(model_path):
logger.error(f"Model path for '{model_name}' not found or invalid: {model_path}")
return None, 0.0
specific_params = MODEL_SPECIFIC_PARAMS.get(model_name, {})
default_params = MODEL_SPECIFIC_PARAMS.get("_default", {})
load_params = default_params.copy()
load_params.update(specific_params)
load_params['model_path'] = model_path
load_params.setdefault('verbose', VERBOSE_LLAMA_CPP) # Use global verbose if not set per model
if 'n_ctx' not in load_params or load_params['n_ctx'] == 0: # Ensure n_ctx is valid
load_params['n_ctx'] = N_CTX_FALLBACK
logger.warning(f"n_ctx for model {model_name} was invalid or not found, using fallback: {N_CTX_FALLBACK}")
logger.info(f"π Loading '{model_name}' from: {model_path}")
logger.debug(f"--- FINAL LOAD PARAMS FOR {model_name.upper()}: {load_params} ---")
start_time = time.perf_counter()
try:
model_instance = Llama(**load_params) # Llama here is the variable assigned above
load_time = time.perf_counter() - start_time
logger.info(f"β
Model '{model_name}' (Llama instance) initialized by Llama(...). Load time: {load_time:.2f}s.")
ram_gb = os.path.getsize(model_path) / (1024**3)
ram_gb_with_buffer = ram_gb * 1.5 + 1 # Heuristic: file size + 50% + 1GB for KV cache, etc.
logger.info(f"Model '{model_name}' path: '{model_path}'. Est. RAM for this model (with buffer): {ram_gb_with_buffer:.2f}GB.")
return model_instance, ram_gb_with_buffer
except LlamaCppError as e_llama: # Catch specific LlamaCpp errors
logger.error(f"β LlamaCppError loading model '{model_name}': {e_llama}", exc_info=True)
except Exception as e:
logger.error(f"β Generic error loading model '{model_name}': {e}", exc_info=True)
return None, 0.0
async def _evict_models_if_needed(self, new_model_name_to_load: Optional[str] = None):
new_model_ram_gb_estimate = 0.0
if new_model_name_to_load:
model_path = MODEL_PATHS.get(new_model_name_to_load)
if model_path and os.path.exists(model_path):
new_model_ram_gb_estimate = (os.path.getsize(model_path) / (1024**3)) * 1.5 + 1 # Same heuristic
# Check if eviction is needed:
# 1. If we are at max model count AND the new model isn't already loaded.
# 2. OR if adding the new model would exceed total RAM limit AND we have models loaded.
while (len(self.loaded_models) >= self.max_model_count and (new_model_name_to_load not in self.loaded_models)) or \
(self.total_estimated_ram_gb + new_model_ram_gb_estimate > self.max_ram_models_gb and self.model_load_order):
if not self.model_load_order:
logger.warning("Eviction needed but model_load_order is empty. This should not happen if loaded_models is populated.")
break
model_to_evict = self.model_load_order.pop(0) # Evict LRU
if model_to_evict in self.loaded_models:
logger.warning(f"β οΈ Evicting model '{model_to_evict}' due to resource limits (Count: {len(self.loaded_models)}/{self.max_model_count}, RAM: {self.total_estimated_ram_gb:.2f}/{self.max_ram_models_gb:.2f}GB).")
del self.loaded_models[model_to_evict] # Release model instance
evicted_ram = self.model_ram_estimate_gb.pop(model_to_evict, 0)
self.total_estimated_ram_gb -= evicted_ram
self.model_load_status[model_to_evict] = "unloaded"
logger.info(f"Model '{model_to_evict}' unloaded. RAM reclaimed: {evicted_ram:.2f}GB. Total est. RAM: {self.total_estimated_ram_gb:.2f}GB.")
else:
logger.warning(f"Model '{model_to_evict}' was in load order but not in loaded_models dict. Inconsistency detected.")
def _generation_task_sync(self, model_instance: Any, prompt_messages: List[Dict[str,str]], gen_params: Dict[str, Any]) -> str:
model_log_name = model_instance.model_path.split(os.sep)[-1] if hasattr(model_instance, 'model_path') else 'UnknownModel'
logger.debug(f"--- [{model_log_name}] ENTERING _generation_task_sync ---")
output_text = "[Error: Generation failed in thread]"
try:
completion = model_instance.create_chat_completion(messages=prompt_messages, **gen_params)
if completion and "choices" in completion and completion["choices"]:
message_content = completion["choices"][0].get("message", {}).get("content")
output_text = message_content.strip() if message_content else "[No content in choice message]"
else:
output_text = "[No choices in completion or completion is empty]"
logger.warning(f"[{model_log_name}] Malformed completion object: {completion}")
logger.debug(f"β
[{model_log_name}] Sync generation successful. Preview: {output_text[:100].replace(chr(10),' ')}...")
except Exception as e:
logger.error(f"β [{model_log_name}] Exception during model generation in thread: {e}", exc_info=True)
output_text = f"[Error: Exception during generation - {type(e).__name__}: {str(e)[:100]}]"
logger.debug(f"--- [{model_log_name}] EXITING _generation_task_sync ---")
return output_text
async def async_generation_wrapper(self, model_instance: Any, prompt_messages: List[Dict[str,str]], gen_params: Dict[str, Any], model_name_for_log: str) -> str:
logger.debug(f"--- [{model_name_for_log}] ENTERING async_generation_wrapper ---")
output_str = f"[Error: Model '{model_name_for_log}' instance not available for generation]"
if not model_instance:
logger.error(f"[{model_name_for_log}] Model instance is None in async_generation_wrapper.")
return output_str
try:
output_str = await asyncio.wait_for(
asyncio.to_thread(self._generation_task_sync, model_instance, prompt_messages, gen_params),
timeout=GENERATION_THREAD_TIMEOUT_SECONDS + 10
)
except asyncio.TimeoutError:
logger.error(f"β [{model_name_for_log}] Generation task TIMED OUT in asyncio.wait_for (>{GENERATION_THREAD_TIMEOUT_SECONDS + 10}s).")
output_str = f"[Error: Generation timed out for model {model_name_for_log}]"
except Exception as e:
logger.error(f"β [{model_name_for_log}] Exception in async_generation_wrapper: {e}", exc_info=True)
output_str = f"[Error: Async wrapper exception for model {model_name_for_log} - {type(e).__name__}]"
logger.debug(f"--- [{model_name_for_log}] EXITING async_generation_wrapper ---")
return output_str
async def generate_with_model(self, model_name: str, prompt: str, preset_name: str = "default", system_prompt: Optional[str] = None) -> str:
logger.info(f"--- [{model_name}] Received generation request. System prompt: {'Yes' if system_prompt else 'No'}. Preset: {preset_name} ---")
# Check if Llama class is LlamaMock (meaning llama.cpp is not available)
if not LLAMA_CPP_AVAILABLE or Llama == LlamaMock:
logger.error(f"Cannot generate with model '{model_name}': Llama.cpp backend not available (LLAMA_CPP_AVAILABLE={LLAMA_CPP_AVAILABLE}, Llama is LlamaMock: {Llama == LlamaMock}).")
return "[Error: Llama.cpp backend core ('Llama' class) not available or is mocked]"
if not await self._ensure_model_loaded(model_name):
return f"[Error: Model '{model_name}' could not be loaded or is not available.]"
model_instance = self.loaded_models.get(model_name)
if not model_instance:
logger.error(f"Model '{model_name}' instance not found in loaded_models after successful _ensure_model_loaded. This is unexpected.")
return f"[Error: Model '{model_name}' instance unexpectedly not found after load attempt.]"
gen_params = INFERENCE_PRESETS.get(preset_name, INFERENCE_PRESETS.get("balanced", {"temperature": 0.7})).copy() # Fallback to "balanced" then hardcoded default
prompt_messages = []
if system_prompt and system_prompt.strip():
prompt_messages.append({"role": "system", "content": system_prompt})
prompt_messages.append({"role": "user", "content": prompt})
logger.debug(f"[{model_name}] Calling async_generation_wrapper with messages: {prompt_messages}, params: {gen_params}")
response_text = await self.async_generation_wrapper(model_instance, prompt_messages, gen_params, model_name)
logger.info(f"--- [{model_name}] Generation complete. Response length: {len(response_text)} ---")
return response_text
def get_loaded_model_stats(self) -> Dict[str, Any]:
return {
"loaded_model_count": len(self.loaded_models),
"total_ram_estimate_gb": round(self.total_estimated_ram_gb, 2),
"max_ram_models_gb": self.max_ram_models_gb,
"max_model_count": self.max_model_count,
"models_in_memory": list(self.loaded_models.keys()),
"load_order_lru": self.model_load_order, # LRU (oldest) to MRU (newest)
"model_ram_details_gb": {name: round(ram, 2) for name, ram in self.model_ram_estimate_gb.items()},
"model_load_status": self.model_load_status,
"llama_cpp_available": LLAMA_CPP_AVAILABLE,
"llama_class_is_mock": Llama == LlamaMock
}
def shutdown(self):
logger.info("π Shutting down ModelManager...")
self.executor.shutdown(wait=True, cancel_futures=True) # Cancel pending futures on shutdown
self.loaded_models.clear()
self.model_ram_estimate_gb.clear()
self.model_load_order.clear()
self.model_load_status.clear()
self.total_estimated_ram_gb = 0
logger.info("β
ModelManager shutdown complete.")
# Example usage for direct testing
async def main_model_manager_test():
# Configure logging for the test
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
# Set specific loggers to higher levels if too verbose
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger.info("--- ModelManager Test ---")
if not MODEL_PATHS:
logger.error("MODEL_PATHS is empty. Cannot run test. Check config_settings_public.py")
return
if Llama == LlamaMock: # Check if we are using the mock
logger.warning("Llama.cpp is not available. Running test with LlamaMock. Responses will be mocked.")
# Example: max 2 models, 10GB RAM limit
mm = ModelManager(max_model_count=2, max_ram_models_gb=10.0)
test_models_available = list(MODEL_PATHS.keys())
if not test_models_available:
logger.error("No models defined in MODEL_PATHS for testing.")
mm.shutdown()
return
# Select up to 3 models for testing, or fewer if not available
model1_name = test_models_available[0]
model2_name = test_models_available[1] if len(test_models_available) > 1 else model1_name
model3_name = test_models_available[2] if len(test_models_available) > 2 else model1_name
test_queries = [
(model1_name, "What is the capital of France?", "precise", "You are a helpful geography expert."),
(model2_name, "Explain black holes simply.", "creative", None),
(model1_name, "What is 1+1?", "precise", None), # Query model1 again
]
if len(test_models_available) > 2 and model3_name not in [model1_name, model2_name]: # Only add model3 if it's distinct and available
test_queries.append((model3_name, "Tell me a short joke.", "default", None))
for i, (model_name, query, preset, sys_prompt) in enumerate(test_queries):
logger.info(f"\n--- Test Query {i+1}: Model '{model_name}', Preset '{preset}' ---")
response = await mm.generate_with_model(model_name, query, preset_name=preset, system_prompt=sys_prompt)
print(f"Response from '{model_name}': {response[:150]}...") # Print more of the response
print(f"Stats after '{model_name}': {mm.get_loaded_model_stats()}")
await asyncio.sleep(1) # Small delay between queries
mm.shutdown()
logger.info("--- ModelManager Test Complete ---")
if __name__ == "__main__":
# This allows testing ModelManager directly if needed
# Ensure your MODEL_PATHS in config_settings_public.py point to valid GGUF files
# To run: python -m modules.model_manager_public
asyncio.run(main_model_manager_test()) |