# FILE: modules/model_manager_public.py import asyncio import logging import os import sys import time import threading from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError from typing import Dict, Any, Optional, List, Callable, Coroutine # --- Llama.cpp Python Backend Import & Debug --- logger_import_debug = logging.getLogger("ModelManager_ImportDebug") if not logger_import_debug.handlers: # Minimal setup for this specific logger _h = logging.StreamHandler(sys.stdout) _f = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') _h.setFormatter(_f) logger_import_debug.addHandler(_h) logger_import_debug.setLevel(logging.INFO) # --- Start of FIX: Define LlamaMock unconditionally and manage Llama variable --- class LlamaMock: def __init__(self, model_path: str = "mock_model_path", *args, **kwargs): # Use logger_import_debug as it's defined at this point logger_import_debug.error(f"LlamaMock initialized for model_path='{model_path}' with args: {args}, kwargs: {kwargs}. llama_cpp is not installed or importable.") self.model_path = model_path self.n_ctx_train = kwargs.get('n_ctx', 0) # Mock common attributes self.metadata = {} def create_chat_completion(self, messages: List[Dict[str,str]], *args, **kwargs) -> Dict[str, Any]: logger_import_debug.error(f"LlamaMock: create_chat_completion called for '{self.model_path}', but llama_cpp is unavailable.") return { "id": "chatcmpl-mock", "object": "chat.completion", "created": int(time.time()), "model": self.model_path, "choices": [{"index": 0, "message": {"role": "assistant", "content": f"[Error: Llama.cpp backend not available for {self.model_path}]"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} } def __call__(self, prompt: str, *args, **kwargs) -> Dict[str, Any]: # For older direct call style logger_import_debug.error(f"LlamaMock: __call__ used for '{self.model_path}', but llama_cpp is unavailable.") return { "id": "cmpl-mock", "object": "text_completion", "created": int(time.time()), "model": self.model_path, "choices": [{"text": f"[Error: Llama.cpp backend not available for {self.model_path}]", "index": 0, "logprobs": None, "finish_reason": "stop"}], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} } def get_metadata(self) -> Dict[str, Any]: return self.metadata def tokenizer(self) -> Any: class MockTokenizer: def encode(self, text: str) -> List[int]: return [0] * len(text) def decode(self, tokens: List[int]) -> str: return "".join([chr(t) for t in tokens if t < 256]) # Simplistic return MockTokenizer() # Initialize variables that will hold the class to be used Llama_imported_class: Any = LlamaMock # Default to mock LlamaCppError_imported_class: Any = Exception # Default to base Exception LLAMA_CPP_AVAILABLE: bool = False try: from llama_cpp import Llama as RealLlama, LlamaCppError as RealLlamaCppSpecificError Llama_imported_class = RealLlama LlamaCppError_imported_class = RealLlamaCppSpecificError LLAMA_CPP_AVAILABLE = True logger_import_debug.info("🎉 Final llama-cpp-python status: AVAILABLE. Using REAL Llama class. Error catching for Llama.cpp specifics will use 'LlamaCppSpecificError'.") except ImportError: logger_import_debug.warning("Could not import 'Llama' and 'LlamaCppSpecificError' from 'llama_cpp' directly. Trying simpler import...") try: from llama_cpp import Llama as RealLlamaOnly # Try again without specific error import (older versions) Llama_imported_class = RealLlamaOnly # LlamaCppError_imported_class remains Exception LLAMA_CPP_AVAILABLE = True logger_import_debug.warning("Note: 'LlamaCppError' not found at llama_cpp top-level. Using base 'Exception' for LlamaCpp errors.") logger_import_debug.info("🎉 Final llama-cpp-python status: AVAILABLE (LlamaCppError not found). Using REAL Llama class.") except ImportError: # Llama_imported_class remains LlamaMock, LLAMA_CPP_AVAILABLE remains False logger_import_debug.critical("CRITICAL FAILURE: Cannot import 'Llama' from 'llama_cpp': No module named 'llama_cpp'. Using LlamaMock. Model loading will fail for real models.") except Exception as e_import_general: # Llama_imported_class remains LlamaMock, LLAMA_CPP_AVAILABLE remains False logger_import_debug.critical(f"CRITICAL FAILURE: Unexpected error during 'llama_cpp' import: {e_import_general}. Using LlamaMock.", exc_info=True) # Assign to the names used throughout the rest of this module Llama = Llama_imported_class LlamaCppError = LlamaCppError_imported_class # --- End of FIX --- # --- End Llama.cpp Python Backend Import & Debug --- try: # Assume config_settings_public.py is in the same 'modules' directory or project root is in sys.path from modules.config_settings_public import ( MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS, N_CTX_FALLBACK, VERBOSE_LLAMA_CPP ) except ImportError as e_conf: logging.critical(f"CRITICAL IMPORT ERROR in ModelManager: Cannot import from config_settings_public: {e_conf}") # Provide minimal fallbacks if config cannot be loaded, though this is a critical failure MODEL_PATHS = {} MODEL_SPECIFIC_PARAMS = {"_default": {"n_ctx": 2048, "n_gpu_layers": -1, "verbose": False}} INFERENCE_PRESETS = {"default": {"temperature": 0.7}} N_CTX_FALLBACK = 2048 VERBOSE_LLAMA_CPP = False logger = logging.getLogger("ZOTHEOS_ModelManager") if not logger.handlers: handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) # Timeout for model generation tasks when run in a thread GENERATION_THREAD_TIMEOUT_SECONDS = 300 # 5 minutes class ModelManager: def __init__(self, device_preference: Optional[str] = "cuda", max_model_count: int = 1, max_ram_models_gb: float = 8.0): self.device_preference = device_preference self.loaded_models: Dict[str, Any] = {} # Stores Llama instances self.model_load_status: Dict[str, str] = {} # "unloaded", "loading", "loaded", "error" self.model_ram_estimate_gb: Dict[str, float] = {} self.total_estimated_ram_gb: float = 0.0 self.max_model_count = max_model_count self.max_ram_models_gb = max_ram_models_gb self.model_load_order: List[str] = [] # Tracks order for LRU self.model_locks: Dict[str, asyncio.Lock] = {} # Per-model lock for loading self.executor = ThreadPoolExecutor(max_workers=max_model_count + 1, thread_name_prefix="LLM_Gen") # +1 for summarizer dev_log = "CPU (default)" if self.device_preference == "cuda": # Check NVIDIA drivers and CUDA toolkit availability (conceptual) try: import torch if torch.cuda.is_available(): dev_log = f"CUDA (preference): {torch.cuda.get_device_name(0)}" else: dev_log = "CUDA (preference, but torch.cuda.is_available()=False)" except ImportError: dev_log = "CUDA (preference, but PyTorch not found for detailed check)" except Exception as e_cuda_check: dev_log = f"CUDA (preference, error during torch check: {e_cuda_check})" # Actual CUDA usage is determined by n_gpu_layers > 0 during Llama init logger.info(f"🔥 MMgr init. Dev Preference: {dev_log}. MaxRAM Models: {self.max_ram_models_gb}GB. Max Count: {self.max_model_count}.") if not LLAMA_CPP_AVAILABLE: logger.critical("❌ Llama.cpp backend CRITICAL FAILURE: The 'Llama' class could not be imported from 'llama_cpp'. ModelManager cannot load GGUF models. Please check installation and logs from 'ModelManager_ImportDebug'.") logger.info(f"✅ MMGR Config: Max Models: {self.max_model_count}, Max RAM: {self.max_ram_models_gb} GB.") async def _ensure_model_loaded(self, model_name: str) -> bool: if model_name not in self.model_locks: self.model_locks[model_name] = asyncio.Lock() async with self.model_locks[model_name]: if self.model_load_status.get(model_name) == "loaded" and model_name in self.loaded_models: if model_name in self.model_load_order: self.model_load_order.remove(model_name) self.model_load_order.append(model_name) # Move to end of LRU return True if self.model_load_status.get(model_name) == "loading": logger.info(f"Model '{model_name}' is already being loaded by another task. Waiting...") while self.model_load_status.get(model_name) == "loading": await asyncio.sleep(0.5) return self.model_load_status.get(model_name) == "loaded" self.model_load_status[model_name] = "loading" logger.info(f"Attempting to load model '{model_name}'...") await self._evict_models_if_needed(new_model_name_to_load=model_name) model_instance, ram_gb = await asyncio.to_thread(self._load_model_from_disk, model_name) if model_instance: self.loaded_models[model_name] = model_instance self.model_ram_estimate_gb[model_name] = ram_gb self.total_estimated_ram_gb += ram_gb self.model_load_status[model_name] = "loaded" if model_name in self.model_load_order: self.model_load_order.remove(model_name) self.model_load_order.append(model_name) logger.info(f"✅ Model '{model_name}' loaded successfully. RAM used by this model: {ram_gb:.2f}GB. Total est. RAM: {self.total_estimated_ram_gb:.2f}GB.") return True else: self.model_load_status[model_name] = "error" logger.error(f"❌ Failed to load model '{model_name}'.") # Clean up RAM if an estimate was added prematurely or if partial load occurred if model_name in self.model_ram_estimate_gb: self.total_estimated_ram_gb -= self.model_ram_estimate_gb.pop(model_name) return False def _load_model_from_disk(self, model_name: str) -> tuple[Optional[Any], float]: # The check Llama == LlamaMock now works because LlamaMock is always defined if not LLAMA_CPP_AVAILABLE or Llama == LlamaMock: # Llama is the variable pointing to the class logger.error(f"Cannot load model '{model_name}': Llama.cpp backend not available (LLAMA_CPP_AVAILABLE={LLAMA_CPP_AVAILABLE}, Llama is LlamaMock: {Llama == LlamaMock}).") return None, 0.0 model_path = MODEL_PATHS.get(model_name) if not model_path or not os.path.exists(model_path): logger.error(f"Model path for '{model_name}' not found or invalid: {model_path}") return None, 0.0 specific_params = MODEL_SPECIFIC_PARAMS.get(model_name, {}) default_params = MODEL_SPECIFIC_PARAMS.get("_default", {}) load_params = default_params.copy() load_params.update(specific_params) load_params['model_path'] = model_path load_params.setdefault('verbose', VERBOSE_LLAMA_CPP) # Use global verbose if not set per model if 'n_ctx' not in load_params or load_params['n_ctx'] == 0: # Ensure n_ctx is valid load_params['n_ctx'] = N_CTX_FALLBACK logger.warning(f"n_ctx for model {model_name} was invalid or not found, using fallback: {N_CTX_FALLBACK}") logger.info(f"🔄 Loading '{model_name}' from: {model_path}") logger.debug(f"--- FINAL LOAD PARAMS FOR {model_name.upper()}: {load_params} ---") start_time = time.perf_counter() try: model_instance = Llama(**load_params) # Llama here is the variable assigned above load_time = time.perf_counter() - start_time logger.info(f"✅ Model '{model_name}' (Llama instance) initialized by Llama(...). Load time: {load_time:.2f}s.") ram_gb = os.path.getsize(model_path) / (1024**3) ram_gb_with_buffer = ram_gb * 1.5 + 1 # Heuristic: file size + 50% + 1GB for KV cache, etc. logger.info(f"Model '{model_name}' path: '{model_path}'. Est. RAM for this model (with buffer): {ram_gb_with_buffer:.2f}GB.") return model_instance, ram_gb_with_buffer except LlamaCppError as e_llama: # Catch specific LlamaCpp errors logger.error(f"❌ LlamaCppError loading model '{model_name}': {e_llama}", exc_info=True) except Exception as e: logger.error(f"❌ Generic error loading model '{model_name}': {e}", exc_info=True) return None, 0.0 async def _evict_models_if_needed(self, new_model_name_to_load: Optional[str] = None): new_model_ram_gb_estimate = 0.0 if new_model_name_to_load: model_path = MODEL_PATHS.get(new_model_name_to_load) if model_path and os.path.exists(model_path): new_model_ram_gb_estimate = (os.path.getsize(model_path) / (1024**3)) * 1.5 + 1 # Same heuristic # Check if eviction is needed: # 1. If we are at max model count AND the new model isn't already loaded. # 2. OR if adding the new model would exceed total RAM limit AND we have models loaded. while (len(self.loaded_models) >= self.max_model_count and (new_model_name_to_load not in self.loaded_models)) or \ (self.total_estimated_ram_gb + new_model_ram_gb_estimate > self.max_ram_models_gb and self.model_load_order): if not self.model_load_order: logger.warning("Eviction needed but model_load_order is empty. This should not happen if loaded_models is populated.") break model_to_evict = self.model_load_order.pop(0) # Evict LRU if model_to_evict in self.loaded_models: logger.warning(f"⚠️ Evicting model '{model_to_evict}' due to resource limits (Count: {len(self.loaded_models)}/{self.max_model_count}, RAM: {self.total_estimated_ram_gb:.2f}/{self.max_ram_models_gb:.2f}GB).") del self.loaded_models[model_to_evict] # Release model instance evicted_ram = self.model_ram_estimate_gb.pop(model_to_evict, 0) self.total_estimated_ram_gb -= evicted_ram self.model_load_status[model_to_evict] = "unloaded" logger.info(f"Model '{model_to_evict}' unloaded. RAM reclaimed: {evicted_ram:.2f}GB. Total est. RAM: {self.total_estimated_ram_gb:.2f}GB.") else: logger.warning(f"Model '{model_to_evict}' was in load order but not in loaded_models dict. Inconsistency detected.") def _generation_task_sync(self, model_instance: Any, prompt_messages: List[Dict[str,str]], gen_params: Dict[str, Any]) -> str: model_log_name = model_instance.model_path.split(os.sep)[-1] if hasattr(model_instance, 'model_path') else 'UnknownModel' logger.debug(f"--- [{model_log_name}] ENTERING _generation_task_sync ---") output_text = "[Error: Generation failed in thread]" try: completion = model_instance.create_chat_completion(messages=prompt_messages, **gen_params) if completion and "choices" in completion and completion["choices"]: message_content = completion["choices"][0].get("message", {}).get("content") output_text = message_content.strip() if message_content else "[No content in choice message]" else: output_text = "[No choices in completion or completion is empty]" logger.warning(f"[{model_log_name}] Malformed completion object: {completion}") logger.debug(f"✅ [{model_log_name}] Sync generation successful. Preview: {output_text[:100].replace(chr(10),' ')}...") except Exception as e: logger.error(f"❌ [{model_log_name}] Exception during model generation in thread: {e}", exc_info=True) output_text = f"[Error: Exception during generation - {type(e).__name__}: {str(e)[:100]}]" logger.debug(f"--- [{model_log_name}] EXITING _generation_task_sync ---") return output_text async def async_generation_wrapper(self, model_instance: Any, prompt_messages: List[Dict[str,str]], gen_params: Dict[str, Any], model_name_for_log: str) -> str: logger.debug(f"--- [{model_name_for_log}] ENTERING async_generation_wrapper ---") output_str = f"[Error: Model '{model_name_for_log}' instance not available for generation]" if not model_instance: logger.error(f"[{model_name_for_log}] Model instance is None in async_generation_wrapper.") return output_str try: output_str = await asyncio.wait_for( asyncio.to_thread(self._generation_task_sync, model_instance, prompt_messages, gen_params), timeout=GENERATION_THREAD_TIMEOUT_SECONDS + 10 ) except asyncio.TimeoutError: logger.error(f"❌ [{model_name_for_log}] Generation task TIMED OUT in asyncio.wait_for (>{GENERATION_THREAD_TIMEOUT_SECONDS + 10}s).") output_str = f"[Error: Generation timed out for model {model_name_for_log}]" except Exception as e: logger.error(f"❌ [{model_name_for_log}] Exception in async_generation_wrapper: {e}", exc_info=True) output_str = f"[Error: Async wrapper exception for model {model_name_for_log} - {type(e).__name__}]" logger.debug(f"--- [{model_name_for_log}] EXITING async_generation_wrapper ---") return output_str async def generate_with_model(self, model_name: str, prompt: str, preset_name: str = "default", system_prompt: Optional[str] = None) -> str: logger.info(f"--- [{model_name}] Received generation request. System prompt: {'Yes' if system_prompt else 'No'}. Preset: {preset_name} ---") # Check if Llama class is LlamaMock (meaning llama.cpp is not available) if not LLAMA_CPP_AVAILABLE or Llama == LlamaMock: logger.error(f"Cannot generate with model '{model_name}': Llama.cpp backend not available (LLAMA_CPP_AVAILABLE={LLAMA_CPP_AVAILABLE}, Llama is LlamaMock: {Llama == LlamaMock}).") return "[Error: Llama.cpp backend core ('Llama' class) not available or is mocked]" if not await self._ensure_model_loaded(model_name): return f"[Error: Model '{model_name}' could not be loaded or is not available.]" model_instance = self.loaded_models.get(model_name) if not model_instance: logger.error(f"Model '{model_name}' instance not found in loaded_models after successful _ensure_model_loaded. This is unexpected.") return f"[Error: Model '{model_name}' instance unexpectedly not found after load attempt.]" gen_params = INFERENCE_PRESETS.get(preset_name, INFERENCE_PRESETS.get("balanced", {"temperature": 0.7})).copy() # Fallback to "balanced" then hardcoded default prompt_messages = [] if system_prompt and system_prompt.strip(): prompt_messages.append({"role": "system", "content": system_prompt}) prompt_messages.append({"role": "user", "content": prompt}) logger.debug(f"[{model_name}] Calling async_generation_wrapper with messages: {prompt_messages}, params: {gen_params}") response_text = await self.async_generation_wrapper(model_instance, prompt_messages, gen_params, model_name) logger.info(f"--- [{model_name}] Generation complete. Response length: {len(response_text)} ---") return response_text def get_loaded_model_stats(self) -> Dict[str, Any]: return { "loaded_model_count": len(self.loaded_models), "total_ram_estimate_gb": round(self.total_estimated_ram_gb, 2), "max_ram_models_gb": self.max_ram_models_gb, "max_model_count": self.max_model_count, "models_in_memory": list(self.loaded_models.keys()), "load_order_lru": self.model_load_order, # LRU (oldest) to MRU (newest) "model_ram_details_gb": {name: round(ram, 2) for name, ram in self.model_ram_estimate_gb.items()}, "model_load_status": self.model_load_status, "llama_cpp_available": LLAMA_CPP_AVAILABLE, "llama_class_is_mock": Llama == LlamaMock } def shutdown(self): logger.info("🔌 Shutting down ModelManager...") self.executor.shutdown(wait=True, cancel_futures=True) # Cancel pending futures on shutdown self.loaded_models.clear() self.model_ram_estimate_gb.clear() self.model_load_order.clear() self.model_load_status.clear() self.total_estimated_ram_gb = 0 logger.info("✅ ModelManager shutdown complete.") # Example usage for direct testing async def main_model_manager_test(): # Configure logging for the test logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s') # Set specific loggers to higher levels if too verbose logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logger.info("--- ModelManager Test ---") if not MODEL_PATHS: logger.error("MODEL_PATHS is empty. Cannot run test. Check config_settings_public.py") return if Llama == LlamaMock: # Check if we are using the mock logger.warning("Llama.cpp is not available. Running test with LlamaMock. Responses will be mocked.") # Example: max 2 models, 10GB RAM limit mm = ModelManager(max_model_count=2, max_ram_models_gb=10.0) test_models_available = list(MODEL_PATHS.keys()) if not test_models_available: logger.error("No models defined in MODEL_PATHS for testing.") mm.shutdown() return # Select up to 3 models for testing, or fewer if not available model1_name = test_models_available[0] model2_name = test_models_available[1] if len(test_models_available) > 1 else model1_name model3_name = test_models_available[2] if len(test_models_available) > 2 else model1_name test_queries = [ (model1_name, "What is the capital of France?", "precise", "You are a helpful geography expert."), (model2_name, "Explain black holes simply.", "creative", None), (model1_name, "What is 1+1?", "precise", None), # Query model1 again ] if len(test_models_available) > 2 and model3_name not in [model1_name, model2_name]: # Only add model3 if it's distinct and available test_queries.append((model3_name, "Tell me a short joke.", "default", None)) for i, (model_name, query, preset, sys_prompt) in enumerate(test_queries): logger.info(f"\n--- Test Query {i+1}: Model '{model_name}', Preset '{preset}' ---") response = await mm.generate_with_model(model_name, query, preset_name=preset, system_prompt=sys_prompt) print(f"Response from '{model_name}': {response[:150]}...") # Print more of the response print(f"Stats after '{model_name}': {mm.get_loaded_model_stats()}") await asyncio.sleep(1) # Small delay between queries mm.shutdown() logger.info("--- ModelManager Test Complete ---") if __name__ == "__main__": # This allows testing ModelManager directly if needed # Ensure your MODEL_PATHS in config_settings_public.py point to valid GGUF files # To run: python -m modules.model_manager_public asyncio.run(main_model_manager_test())