beac_inflation_app / models.py
Arvador237's picture
Update models.py
8ec9cad verified
import streamlit as st # <-- Ajoutez cette ligne
import warnings
import pgmpy.factors.discrete
from pgmpy.factors.discrete import TabularCPD
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
import time
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from statsmodels.tsa.arima.model import ARIMA
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping
from pgmpy.models import BayesianNetwork
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.inference import VariableElimination
from data_processing import apply_scenarios, prepare_timeseries_data
def train_arima(data, country, start_date, end_date,
taux_directeur_change=0, pib_change=0, m2_change=0,
p=1, d=1, q=1):
"""Modèle ARIMA avec gestion d'erreurs"""
start_time = time.time()
try:
# Validation des paramètres
p, d, q = int(p), int(d), int(q)
country_data = data[data['Pays'] == country]
filtered_data = country_data[
(country_data['Année'] >= str(start_date)) &
(country_data['Année'] <= str(end_date))
].sort_values('Année')
modified_data = apply_scenarios(
filtered_data,
float(taux_directeur_change),
float(pib_change),
float(m2_change)
)
model = ARIMA(modified_data["Taux d'inflation (%)"], order=(p, d, q))
model_fit = model.fit()
predictions = model_fit.predict(start=0, end=len(modified_data)-1)
results = pd.DataFrame({
'Année': modified_data['Année'],
'Inflation réelle': modified_data["Taux d'inflation (%)"],
'Inflation prédite': predictions
})
mae = mean_absolute_error(results['Inflation réelle'], results['Inflation prédite'])
rmse = np.sqrt(mean_squared_error(results['Inflation réelle'], results['Inflation prédite']))
r2 = r2_score(results['Inflation réelle'], results['Inflation prédite'])
return model_fit, results, {
'mae': mae,
'rmse': rmse,
'r2': r2,
'training_time': time.time() - start_time
}
except Exception as e:
raise ValueError(f"Erreur ARIMA: {str(e)}")
def train_mlp(data, country, start_date, end_date,
taux_directeur_change=0, pib_change=0, m2_change=0,
hidden_layers=2, neurons=50, epochs=100):
"""Réseau de neurones MLP avec validation des paramètres"""
start_time = time.time()
try:
# Validation des paramètres
hidden_layers = max(1, int(hidden_layers))
neurons = max(1, int(neurons))
epochs = max(1, int(epochs))
country_data = data[data['Pays'] == country]
filtered_data = country_data[
(country_data['Année'] >= str(start_date)) &
(country_data['Année'] <= str(end_date))
].sort_values('Année')
modified_data = apply_scenarios(
filtered_data,
float(taux_directeur_change),
float(pib_change),
float(m2_change)
)
X = modified_data[[
"Masse monétaire (M2)",
"Croissance PIB (%)",
"Taux directeur",
"Balance commerciale",
"Taux de change FCFA/USD"
]].astype(np.float32)
y = modified_data["Taux d'inflation (%)"].astype(np.float32)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False, random_state=42
)
model = MLPRegressor(
hidden_layer_sizes=tuple([neurons]*hidden_layers),
max_iter=epochs,
random_state=42,
early_stopping=True,
solver='adam',
activation='relu'
)
model.fit(X_train, y_train)
y_pred = model.predict(X)
results = pd.DataFrame({
'Année': modified_data['Année'],
'Inflation réelle': y,
'Inflation prédite': y_pred
})
mae = mean_absolute_error(y, y_pred)
rmse = np.sqrt(mean_squared_error(y, y_pred))
r2 = r2_score(y, y_pred)
return model, results, {
'mae': mae,
'rmse': rmse,
'r2': r2,
'training_time': time.time() - start_time
}
except Exception as e:
raise ValueError(f"Erreur MLP: {str(e)}")
def train_lstm(data, country, start_date, end_date, lstm_units, epochs, look_back, *scenarios):
try:
# 1. Chargement et vérification des données
country_data = data[data["Pays"] == country]
train_data = country_data[
(country_data["Année"] >= str(start_date)) &
(country_data["Année"] <= str(end_date))
].sort_values('Année')
if len(train_data) < 2:
raise ValueError(f"Données insuffisantes ({len(train_data)} points). Minimum 2 requis.")
# 2. Normalisation des données
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = train_data["Taux d'inflation (%)"].values.astype('float32')
dataset_normalized = scaler.fit_transform(dataset.reshape(-1, 1)).flatten()
# 3. Création des séquences
dataX, dataY = [], []
for i in range(len(dataset_normalized) - look_back):
dataX.append(dataset_normalized[i:(i + look_back)])
dataY.append(dataset_normalized[i + look_back])
if len(dataX) == 0:
raise ValueError("Aucune séquence créée - réduire look_back")
X_train = np.array(dataX)
y_train = np.array(dataY)
X_train = np.reshape(X_train, (X_train.shape[0], look_back, 1))
# 5. Configuration du modèle avec gestion des erreurs
model = Sequential()
model.add(LSTM(
units=int(lstm_units),
input_shape=(look_back, 1),
activation='tanh',
recurrent_activation='sigmoid',
kernel_initializer='glorot_uniform',
return_sequences=False
))
model.add(Dense(1))
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae'],
run_eagerly=False
)
# 6. Entraînement avec callback et validation
early_stop = EarlyStopping(
monitor='loss',
patience=5,
restore_best_weights=True
)
history = model.fit(
X_train, y_train,
epochs=int(epochs),
batch_size=1,
verbose=1,
callbacks=[early_stop],
shuffle=False
)
# 7. Prédiction et inversion de la normalisation
train_predict = model.predict(X_train)
train_predict = scaler.inverse_transform(train_predict).flatten()
y_train = scaler.inverse_transform(y_train.reshape(-1, 1)).flatten()
return model, train_predict, {
"mae": mean_absolute_error(y_train, train_predict),
"rmse": np.sqrt(mean_squared_error(y_train, train_predict)),
"r2": r2_score(y_train, train_predict)
}
return model, pd.DataFrame({
'Année': train_data['Année'].iloc[look_back:].values, # Format explicite
'Inflation réelle': y_train.flatten(),
'Inflation prédite': train_predict.flatten()
}), metrics
except Exception as e:
error_details = {
'data_points': len(train_data) if 'train_data' in locals() else 0,
'look_back_used': look_back,
'sequences_created': len(dataX) if 'dataX' in locals() else 0
}
raise ValueError(f"Erreur LSTM: {str(e)}\nContexte: {error_details}")
def train_bayesian_network(data, country, start_date, end_date,
taux_directeur_change=0, pib_change=0, m2_change=0):
"""Version compatible avec toutes les versions de pgmpy"""
start_time = time.time()
try:
# 1. Préparation des données
country_data = data[data['Pays'] == country]
filtered_data = country_data[
(country_data['Année'] >= str(start_date)) &
(country_data['Année'] <= str(end_date))
].sort_values('Année')
modified_data = apply_scenarios(filtered_data, taux_directeur_change, pib_change, m2_change)
df = modified_data.copy()
# 2. Discrétisation robuste
discretized_cols = {
"Taux d'inflation (%)": "Inflation",
"Masse monétaire (M2)": "M2",
"Croissance PIB (%)": "PIB",
"Taux directeur": "TauxDirecteur",
"Balance commerciale": "Balance",
"Taux de change FCFA/USD": "Change"
}
for src, dest in discretized_cols.items():
unique_vals = len(df[src].unique())
bins = min(5, unique_vals)
if bins > 1:
df[dest] = pd.qcut(df[src], q=bins, duplicates='drop', labels=False).astype(int)
else:
df[dest] = 0 # Cas où toutes les valeurs sont identiques
# 3. Construction du réseau
model = BayesianNetwork([
("M2", "Inflation"),
("PIB", "Inflation"),
("TauxDirecteur", "Inflation"),
("Balance", "Inflation"),
("Change", "Inflation")
])
# 4. Entraînement avec vérification
model.fit(df, estimator=MaximumLikelihoodEstimator)
# 5. Gestion des CPDs compatible multi-versions
for node in model.nodes():
cpd = model.get_cpds(node)
# Méthode universelle d'accès aux propriétés
if hasattr(cpd, 'variables'): # Anciennes versions
evidence = [v for v in cpd.variables if v != node]
evidence_card = [len(df[v].unique()) for v in evidence]
else: # Nouvelles versions
evidence = cpd.get_evidence()
evidence_card = [len(df[v].unique()) for v in evidence]
# Normalisation manuelle
cpd_values = np.nan_to_num(cpd.values, nan=1e-10)
normalized = cpd_values / (cpd_values.sum(axis=0) + 1e-10)
# Reconstruction du CPD
new_cpd = TabularCPD(
variable=node,
variable_card=len(df[node].unique()),
values=normalized,
evidence=evidence,
evidence_card=evidence_card
)
model.remove_cpds(node)
model.add_cpds(new_cpd)
# [6] Inférence et prédictions (identique)
infer = VariableElimination(model)
predictions = []
for _, row in df.iterrows():
evidence = {
"M2": int(row["M2"]),
"PIB": int(row["PIB"]),
"TauxDirecteur": int(row["TauxDirecteur"]),
"Balance": int(row["Balance"]),
"Change": int(row["Change"])
}
result = infer.query(variables=["Inflation"], evidence=evidence)
pred_value = result.values.argmax()
predictions.append(float(
modified_data["Taux d'inflation (%)"].min() +
(pred_value + 0.5) *
(modified_data["Taux d'inflation (%)"].max() -
modified_data["Taux d'inflation (%)"].min()) / 5
))
# [7] Retour des résultats
results = pd.DataFrame({
'Année': modified_data['Année'],
'Inflation réelle': modified_data["Taux d'inflation (%)"],
'Inflation prédite': predictions
})
return model, results, {
'mae': mean_absolute_error(results['Inflation réelle'], results['Inflation prédite']),
'rmse': np.sqrt(mean_squared_error(results['Inflation réelle'], results['Inflation prédite'])),
'r2': r2_score(results['Inflation réelle'], results['Inflation prédite']),
'training_time': time.time() - start_time
}
except Exception as e:
error_details = {
'data_samples': len(filtered_data) if 'filtered_data' in locals() else 0,
'discretization_bins': {k: len(df[v].unique()) for k, v in discretized_cols.items()}
if 'df' in locals() else None
}
raise ValueError(f"Erreur Réseau Bayésien: {str(e)}\nContexte: {error_details}")