Spaces:
Running
Running
import streamlit as st # <-- Ajoutez cette ligne | |
import warnings | |
import pgmpy.factors.discrete | |
from pgmpy.factors.discrete import TabularCPD | |
from sklearn.preprocessing import MinMaxScaler | |
import numpy as np | |
import pandas as pd | |
import time | |
from sklearn.model_selection import train_test_split | |
from sklearn.neural_network import MLPRegressor | |
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score | |
from statsmodels.tsa.arima.model import ARIMA | |
import tensorflow as tf | |
from tensorflow.keras.models import Sequential | |
from tensorflow.keras.layers import LSTM, Dense | |
from tensorflow.keras.callbacks import EarlyStopping | |
from pgmpy.models import BayesianNetwork | |
from pgmpy.estimators import MaximumLikelihoodEstimator | |
from pgmpy.inference import VariableElimination | |
from data_processing import apply_scenarios, prepare_timeseries_data | |
def train_arima(data, country, start_date, end_date, | |
taux_directeur_change=0, pib_change=0, m2_change=0, | |
p=1, d=1, q=1): | |
"""Modèle ARIMA avec gestion d'erreurs""" | |
start_time = time.time() | |
try: | |
# Validation des paramètres | |
p, d, q = int(p), int(d), int(q) | |
country_data = data[data['Pays'] == country] | |
filtered_data = country_data[ | |
(country_data['Année'] >= str(start_date)) & | |
(country_data['Année'] <= str(end_date)) | |
].sort_values('Année') | |
modified_data = apply_scenarios( | |
filtered_data, | |
float(taux_directeur_change), | |
float(pib_change), | |
float(m2_change) | |
) | |
model = ARIMA(modified_data["Taux d'inflation (%)"], order=(p, d, q)) | |
model_fit = model.fit() | |
predictions = model_fit.predict(start=0, end=len(modified_data)-1) | |
results = pd.DataFrame({ | |
'Année': modified_data['Année'], | |
'Inflation réelle': modified_data["Taux d'inflation (%)"], | |
'Inflation prédite': predictions | |
}) | |
mae = mean_absolute_error(results['Inflation réelle'], results['Inflation prédite']) | |
rmse = np.sqrt(mean_squared_error(results['Inflation réelle'], results['Inflation prédite'])) | |
r2 = r2_score(results['Inflation réelle'], results['Inflation prédite']) | |
return model_fit, results, { | |
'mae': mae, | |
'rmse': rmse, | |
'r2': r2, | |
'training_time': time.time() - start_time | |
} | |
except Exception as e: | |
raise ValueError(f"Erreur ARIMA: {str(e)}") | |
def train_mlp(data, country, start_date, end_date, | |
taux_directeur_change=0, pib_change=0, m2_change=0, | |
hidden_layers=2, neurons=50, epochs=100): | |
"""Réseau de neurones MLP avec validation des paramètres""" | |
start_time = time.time() | |
try: | |
# Validation des paramètres | |
hidden_layers = max(1, int(hidden_layers)) | |
neurons = max(1, int(neurons)) | |
epochs = max(1, int(epochs)) | |
country_data = data[data['Pays'] == country] | |
filtered_data = country_data[ | |
(country_data['Année'] >= str(start_date)) & | |
(country_data['Année'] <= str(end_date)) | |
].sort_values('Année') | |
modified_data = apply_scenarios( | |
filtered_data, | |
float(taux_directeur_change), | |
float(pib_change), | |
float(m2_change) | |
) | |
X = modified_data[[ | |
"Masse monétaire (M2)", | |
"Croissance PIB (%)", | |
"Taux directeur", | |
"Balance commerciale", | |
"Taux de change FCFA/USD" | |
]].astype(np.float32) | |
y = modified_data["Taux d'inflation (%)"].astype(np.float32) | |
X_train, X_test, y_train, y_test = train_test_split( | |
X, y, test_size=0.2, shuffle=False, random_state=42 | |
) | |
model = MLPRegressor( | |
hidden_layer_sizes=tuple([neurons]*hidden_layers), | |
max_iter=epochs, | |
random_state=42, | |
early_stopping=True, | |
solver='adam', | |
activation='relu' | |
) | |
model.fit(X_train, y_train) | |
y_pred = model.predict(X) | |
results = pd.DataFrame({ | |
'Année': modified_data['Année'], | |
'Inflation réelle': y, | |
'Inflation prédite': y_pred | |
}) | |
mae = mean_absolute_error(y, y_pred) | |
rmse = np.sqrt(mean_squared_error(y, y_pred)) | |
r2 = r2_score(y, y_pred) | |
return model, results, { | |
'mae': mae, | |
'rmse': rmse, | |
'r2': r2, | |
'training_time': time.time() - start_time | |
} | |
except Exception as e: | |
raise ValueError(f"Erreur MLP: {str(e)}") | |
def train_lstm(data, country, start_date, end_date, lstm_units, epochs, look_back, *scenarios): | |
try: | |
# 1. Chargement et vérification des données | |
country_data = data[data["Pays"] == country] | |
train_data = country_data[ | |
(country_data["Année"] >= str(start_date)) & | |
(country_data["Année"] <= str(end_date)) | |
].sort_values('Année') | |
if len(train_data) < 2: | |
raise ValueError(f"Données insuffisantes ({len(train_data)} points). Minimum 2 requis.") | |
# 2. Normalisation des données | |
scaler = MinMaxScaler(feature_range=(0, 1)) | |
dataset = train_data["Taux d'inflation (%)"].values.astype('float32') | |
dataset_normalized = scaler.fit_transform(dataset.reshape(-1, 1)).flatten() | |
# 3. Création des séquences | |
dataX, dataY = [], [] | |
for i in range(len(dataset_normalized) - look_back): | |
dataX.append(dataset_normalized[i:(i + look_back)]) | |
dataY.append(dataset_normalized[i + look_back]) | |
if len(dataX) == 0: | |
raise ValueError("Aucune séquence créée - réduire look_back") | |
X_train = np.array(dataX) | |
y_train = np.array(dataY) | |
X_train = np.reshape(X_train, (X_train.shape[0], look_back, 1)) | |
# 5. Configuration du modèle avec gestion des erreurs | |
model = Sequential() | |
model.add(LSTM( | |
units=int(lstm_units), | |
input_shape=(look_back, 1), | |
activation='tanh', | |
recurrent_activation='sigmoid', | |
kernel_initializer='glorot_uniform', | |
return_sequences=False | |
)) | |
model.add(Dense(1)) | |
model.compile( | |
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), | |
loss='mse', | |
metrics=['mae'], | |
run_eagerly=False | |
) | |
# 6. Entraînement avec callback et validation | |
early_stop = EarlyStopping( | |
monitor='loss', | |
patience=5, | |
restore_best_weights=True | |
) | |
history = model.fit( | |
X_train, y_train, | |
epochs=int(epochs), | |
batch_size=1, | |
verbose=1, | |
callbacks=[early_stop], | |
shuffle=False | |
) | |
# 7. Prédiction et inversion de la normalisation | |
train_predict = model.predict(X_train) | |
train_predict = scaler.inverse_transform(train_predict).flatten() | |
y_train = scaler.inverse_transform(y_train.reshape(-1, 1)).flatten() | |
return model, train_predict, { | |
"mae": mean_absolute_error(y_train, train_predict), | |
"rmse": np.sqrt(mean_squared_error(y_train, train_predict)), | |
"r2": r2_score(y_train, train_predict) | |
} | |
return model, pd.DataFrame({ | |
'Année': train_data['Année'].iloc[look_back:].values, # Format explicite | |
'Inflation réelle': y_train.flatten(), | |
'Inflation prédite': train_predict.flatten() | |
}), metrics | |
except Exception as e: | |
error_details = { | |
'data_points': len(train_data) if 'train_data' in locals() else 0, | |
'look_back_used': look_back, | |
'sequences_created': len(dataX) if 'dataX' in locals() else 0 | |
} | |
raise ValueError(f"Erreur LSTM: {str(e)}\nContexte: {error_details}") | |
def train_bayesian_network(data, country, start_date, end_date, | |
taux_directeur_change=0, pib_change=0, m2_change=0): | |
"""Version compatible avec toutes les versions de pgmpy""" | |
start_time = time.time() | |
try: | |
# 1. Préparation des données | |
country_data = data[data['Pays'] == country] | |
filtered_data = country_data[ | |
(country_data['Année'] >= str(start_date)) & | |
(country_data['Année'] <= str(end_date)) | |
].sort_values('Année') | |
modified_data = apply_scenarios(filtered_data, taux_directeur_change, pib_change, m2_change) | |
df = modified_data.copy() | |
# 2. Discrétisation robuste | |
discretized_cols = { | |
"Taux d'inflation (%)": "Inflation", | |
"Masse monétaire (M2)": "M2", | |
"Croissance PIB (%)": "PIB", | |
"Taux directeur": "TauxDirecteur", | |
"Balance commerciale": "Balance", | |
"Taux de change FCFA/USD": "Change" | |
} | |
for src, dest in discretized_cols.items(): | |
unique_vals = len(df[src].unique()) | |
bins = min(5, unique_vals) | |
if bins > 1: | |
df[dest] = pd.qcut(df[src], q=bins, duplicates='drop', labels=False).astype(int) | |
else: | |
df[dest] = 0 # Cas où toutes les valeurs sont identiques | |
# 3. Construction du réseau | |
model = BayesianNetwork([ | |
("M2", "Inflation"), | |
("PIB", "Inflation"), | |
("TauxDirecteur", "Inflation"), | |
("Balance", "Inflation"), | |
("Change", "Inflation") | |
]) | |
# 4. Entraînement avec vérification | |
model.fit(df, estimator=MaximumLikelihoodEstimator) | |
# 5. Gestion des CPDs compatible multi-versions | |
for node in model.nodes(): | |
cpd = model.get_cpds(node) | |
# Méthode universelle d'accès aux propriétés | |
if hasattr(cpd, 'variables'): # Anciennes versions | |
evidence = [v for v in cpd.variables if v != node] | |
evidence_card = [len(df[v].unique()) for v in evidence] | |
else: # Nouvelles versions | |
evidence = cpd.get_evidence() | |
evidence_card = [len(df[v].unique()) for v in evidence] | |
# Normalisation manuelle | |
cpd_values = np.nan_to_num(cpd.values, nan=1e-10) | |
normalized = cpd_values / (cpd_values.sum(axis=0) + 1e-10) | |
# Reconstruction du CPD | |
new_cpd = TabularCPD( | |
variable=node, | |
variable_card=len(df[node].unique()), | |
values=normalized, | |
evidence=evidence, | |
evidence_card=evidence_card | |
) | |
model.remove_cpds(node) | |
model.add_cpds(new_cpd) | |
# [6] Inférence et prédictions (identique) | |
infer = VariableElimination(model) | |
predictions = [] | |
for _, row in df.iterrows(): | |
evidence = { | |
"M2": int(row["M2"]), | |
"PIB": int(row["PIB"]), | |
"TauxDirecteur": int(row["TauxDirecteur"]), | |
"Balance": int(row["Balance"]), | |
"Change": int(row["Change"]) | |
} | |
result = infer.query(variables=["Inflation"], evidence=evidence) | |
pred_value = result.values.argmax() | |
predictions.append(float( | |
modified_data["Taux d'inflation (%)"].min() + | |
(pred_value + 0.5) * | |
(modified_data["Taux d'inflation (%)"].max() - | |
modified_data["Taux d'inflation (%)"].min()) / 5 | |
)) | |
# [7] Retour des résultats | |
results = pd.DataFrame({ | |
'Année': modified_data['Année'], | |
'Inflation réelle': modified_data["Taux d'inflation (%)"], | |
'Inflation prédite': predictions | |
}) | |
return model, results, { | |
'mae': mean_absolute_error(results['Inflation réelle'], results['Inflation prédite']), | |
'rmse': np.sqrt(mean_squared_error(results['Inflation réelle'], results['Inflation prédite'])), | |
'r2': r2_score(results['Inflation réelle'], results['Inflation prédite']), | |
'training_time': time.time() - start_time | |
} | |
except Exception as e: | |
error_details = { | |
'data_samples': len(filtered_data) if 'filtered_data' in locals() else 0, | |
'discretization_bins': {k: len(df[v].unique()) for k, v in discretized_cols.items()} | |
if 'df' in locals() else None | |
} | |
raise ValueError(f"Erreur Réseau Bayésien: {str(e)}\nContexte: {error_details}") |