""" Database Initialization Module Handles database creation, schema setup, and health checks """ import os import aiosqlite import logging from pathlib import Path import tempfile import atexit from typing import Dict, Any, List logger = logging.getLogger(__name__) class DatabaseInitializer: """Handles database initialization and health checks""" def __init__(self, db_path: str = None): # Check if we should use a temporary database use_temp_db = os.getenv("USE_TEMP_DATABASE", "false").lower() == "true" if db_path: self.db_path = db_path elif use_temp_db: # Create a temporary database file that gets cleaned up on exit temp_fd, temp_path = tempfile.mkstemp(suffix='.db', prefix='ai_tutor_') os.close(temp_fd) # Close the file descriptor, we'll reopen it with sqlite self.db_path = temp_path # Register cleanup on exit atexit.register(self._cleanup_temp_db) logger.info(f"Using temporary database: {self.db_path}") else: self.db_path = os.getenv("DATABASE_PATH", "/tmp/ai_tutor.db") self.schema_path = self._find_schema_file() def _cleanup_temp_db(self): """Clean up temporary database file""" try: if os.path.exists(self.db_path): os.unlink(self.db_path) logger.info(f"Cleaned up temporary database: {self.db_path}") except Exception as e: logger.error(f"Error cleaning up temporary database: {e}") def _find_schema_file(self) -> str: """Return the path to the schema.sql file. The schema.sql file is expected to be in the same directory as this script. """ schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql') if not os.path.exists(schema_path): raise FileNotFoundError(f"schema.sql not found at {schema_path}") return schema_path async def check_database_exists(self) -> bool: """Check if database file exists""" return os.path.exists(self.db_path) async def check_database_health(self) -> Dict[str, Any]: """Comprehensive database health check""" health_status = { "database_exists": False, "database_accessible": False, "schema_loaded": False, "tables_exist": False, "views_exist": False, "can_write": False, "record_count": {}, "errors": [] } try: # Check if database file exists health_status["database_exists"] = await self.check_database_exists() if not health_status["database_exists"]: health_status["errors"].append("Database file does not exist") return health_status # Try to connect to database async with aiosqlite.connect(self.db_path) as db: health_status["database_accessible"] = True # Check if required tables exist required_tables = ['metadata_extractions', 'curricula', 'learning_content', 'api_cache'] existing_tables = await self._get_existing_tables(db) missing_tables = [table for table in required_tables if table not in existing_tables] if missing_tables: health_status["errors"].append(f"Missing tables: {missing_tables}") else: health_status["tables_exist"] = True # Check if views exist required_views = ['user_learning_journeys', 'curriculum_content_status'] existing_views = await self._get_existing_views(db) missing_views = [view for view in required_views if view not in existing_views] if missing_views: health_status["errors"].append(f"Missing views: {missing_views}") else: health_status["views_exist"] = True # Test write capability try: await db.execute("CREATE TEMPORARY TABLE test_write (id INTEGER)") await db.execute("DROP TABLE test_write") health_status["can_write"] = True except Exception as e: health_status["errors"].append(f"Cannot write to database: {str(e)}") # Get record counts if health_status["tables_exist"]: for table in required_tables: try: async with db.execute(f"SELECT COUNT(*) FROM {table}") as cursor: count = await cursor.fetchone() health_status["record_count"][table] = count[0] if count else 0 except Exception as e: health_status["record_count"][table] = f"Error: {str(e)}" health_status["schema_loaded"] = ( health_status["tables_exist"] and health_status["views_exist"] ) except Exception as e: health_status["errors"].append(f"Database connection error: {str(e)}") return health_status async def _get_existing_tables(self, db: aiosqlite.Connection) -> List[str]: """Get list of existing tables""" async with db.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' """) as cursor: rows = await cursor.fetchall() return [row[0] for row in rows] async def _get_existing_views(self, db: aiosqlite.Connection) -> List[str]: """Get list of existing views""" async with db.execute(""" SELECT name FROM sqlite_master WHERE type='view' """) as cursor: rows = await cursor.fetchall() return [row[0] for row in rows] async def create_database(self) -> bool: """Create database file and initialize with schema""" try: logger.info(f"Creating database at: {self.db_path}") # Ensure directory exists db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) logger.info(f"Created directory: {db_dir}") # Create database and load schema async with aiosqlite.connect(self.db_path) as db: # Read schema file with open(self.schema_path, 'r') as f: schema = f.read() # Execute schema await db.executescript(schema) await db.commit() logger.info("Database created and schema loaded successfully") return True except Exception as e: logger.error(f"Error creating database: {str(e)}") return False async def initialize_database(self, force_recreate: bool = False) -> Dict[str, Any]: """Initialize database with comprehensive checks and creation""" result = { "success": False, "action_taken": "none", "health_check": {}, "errors": [] } try: # Check current database health health_check = await self.check_database_health() result["health_check"] = health_check # Determine if we need to create/recreate database needs_creation = ( not health_check["database_exists"] or not health_check["schema_loaded"] or force_recreate ) if needs_creation: if health_check["database_exists"] and force_recreate: # Backup existing database backup_path = f"{self.db_path}.backup" if os.path.exists(self.db_path): os.rename(self.db_path, backup_path) logger.info(f"Backed up existing database to: {backup_path}") result["action_taken"] = "recreated_with_backup" else: result["action_taken"] = "force_recreated" else: result["action_taken"] = "created" # Create database creation_success = await self.create_database() if not creation_success: result["errors"].append("Failed to create database") return result # Verify creation final_health = await self.check_database_health() result["health_check"] = final_health if final_health["schema_loaded"] and final_health["can_write"]: result["success"] = True logger.info("Database initialization completed successfully") else: result["errors"].append("Database created but health check failed") else: # Database exists and is healthy result["success"] = True result["action_taken"] = "already_exists" logger.info("Database already exists and is healthy") except Exception as e: error_msg = f"Database initialization error: {str(e)}" logger.error(error_msg) result["errors"].append(error_msg) return result async def repair_database(self) -> Dict[str, Any]: """Attempt to repair database issues""" result = { "success": False, "repairs_attempted": [], "errors": [] } try: health_check = await self.check_database_health() if not health_check["database_exists"]: # Database doesn't exist - create it creation_result = await self.initialize_database() result["repairs_attempted"].append("created_missing_database") result["success"] = creation_result["success"] result["errors"].extend(creation_result.get("errors", [])) return result # Database exists but has issues async with aiosqlite.connect(self.db_path) as db: # Check and repair missing tables if not health_check["tables_exist"]: with open(self.schema_path, 'r') as f: schema = f.read() await db.executescript(schema) await db.commit() result["repairs_attempted"].append("recreated_schema") # Verify repair final_health = await self.check_database_health() result["success"] = final_health["schema_loaded"] except Exception as e: error_msg = f"Database repair error: {str(e)}" logger.error(error_msg) result["errors"].append(error_msg) return result # Global instance db_initializer = DatabaseInitializer()