import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import EarlyStopping def huber_loss(y_true, y_pred, delta=1.0): """ Función de pérdida Huber personalizada. """ error = y_true - y_pred is_small_error = tf.abs(error) <= delta small_error_loss = 0.5 * tf.square(error) big_error_loss = delta * (tf.abs(error) - 0.5 * delta) return tf.where(is_small_error, small_error_loss, big_error_loss) def create_sequences(data, time_steps): """ Función para crear secuencias de datos de entrada y salida. """ X, y = [], [] for i in range(len(data) - time_steps): X.append(data[i:(i + time_steps), :]) y.append(data[i + time_steps, 0]) return np.array(X), np.array(y) def build_lstm_model(time_steps, input_size): """ Función para construir el modelo LSTM mejorado. """ model = Sequential([ LSTM(100, return_sequences=True, input_shape=(time_steps, input_size)), Dropout(0.2), LSTM(100), Dropout(0.2), Dense(50), Dense(1) ]) optimizer = Adam(learning_rate=0.001) model.compile(optimizer=optimizer, loss=huber_loss) return model def train_model(model, X_train, y_train, X_val, y_val, epochs=200, batch_size=32): """ Función para entrenar el modelo LSTM mejorado. """ early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) history = model.fit( X_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(X_val, y_val), callbacks=[early_stopping], verbose=1 ) return history def add_brownian_noise(X): brownian_noise = tf.random.normal(tf.shape(X), mean=0.0, stddev=0.1, dtype=tf.float32) return X + brownian_noise def evaluate_model(model, X_test, y_test): result = model.evaluate(X_test, y_test, verbose=0) print("Pérdida en el conjunto de prueba:", result) def print_model_info(model): print("\nCaracterísticas del modelo:") print("Número de capas:", len(model.layers)) model.summary() def calculate_accuracy(y_true, y_pred): tasa_acierto = mean_absolute_error(y_test, y_pred) print("Tasa de acierto (MAE):", tasa_acierto) def plot_predictions(y_test, y_pred, df): plt.figure(figsize=(10, 6)) plt.plot(y_test, label='Valor Real') plt.plot(y_pred, label='Predicción', alpha=0.7) plt.title("Comparación de Predicciones vs. Valoreseales") plt.xlabel("Índice") plt.ylabel("Precio Escalado") plt.legend() plt.show() # Hacer predicciones train_predict = model.predict(X_train) test_predict = model.predict(X_test) # Invertir las predicciones a la escala original train_predict = scaler.inverse_transform(np.hstack([train_predict, np.zeros((train_predict.shape[0], X.shape[2]-1))]))[:, 0] test_predict = scaler.inverse_transform(np.hstack([test_predict, np.zeros((test_predict.shape[0], X.shape[2]-1))]))[:, 0] y_train_inv = scaler.inverse_transform(np.hstack([y_train.reshape(-1, 1), np.zeros((y_train.shape[0], X.shape[2]-1))]))[:, 0] y_test_inv = scaler.inverse_transform(np.hstack([y_test.reshape(-1, 1), np.zeros((y_test.shape[0], X.shape[2]-1))]))[:, 0] # Calcular el error cuadrático medio (RMSE) train_rmse = np.sqrt(np.mean((train_predict - y_train_inv)**2)) test_rmse = np.sqrt(np.mean((test_predict - y_test_inv)**2)) print(f"RMSE en entrenamiento: {train_rmse}") print(f"RMSE en prueba: {test_rmse}")