1

346 lines
12 KiB
Python

# scripts/run_first_models.py
from __future__ import annotations
from pathlib import Path
import sys
from typing import Iterable, Sequence
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge, Lasso, LogisticRegression
from sklearn.metrics import (
mean_absolute_error,
mean_squared_error,
f1_score,
precision_recall_curve,
roc_curve,
average_precision_score,
brier_score_loss,
)
from sklearn.preprocessing import StandardScaler
PROJECT_ROOT = Path(__file__).resolve().parents[3]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from meteo.dataset import load_raw_csv
from model.features import build_feature_dataframe, FeatureSpec, _steps_from_minutes
from model.splits import chronological_split
CSV_PATH = Path("data/weather_minutely.csv")
DOC_DIR = Path(__file__).resolve().parent.parent
DATA_DIR = DOC_DIR / "data"
FIG_DIR = DOC_DIR / "figures"
HORIZONS_MINUTES: tuple[int, ...] = (10, 60, 360, 1440)
CONTINUOUS_TARGETS: tuple[str, ...] = ("temperature", "wind_speed")
RAIN_TARGET: str = "rain_rate"
# Lags spécifiques issus des analyses du chapitre 5 (exemple de mapping ; sinon défauts)
DEFAULT_LAGS_BY_COL: dict[str, Sequence[int]] = {
"temperature": (10, 20, 30),
"wind_speed": (10, 20, 30),
"rain_rate": (10, 20, 30),
"humidity": (10, 20, 30),
"pressure": (10, 20, 30),
"illuminance": (10, 20, 30),
"wind_direction": (10, 20, 30),
"sun_elevation": (10, 20, 30),
}
USE_CORR_FILTER = True
CORR_THRESHOLD = 0.2
CORR_PATH = Path("docs/05 - Corrélations binaires avancées/data/correlation_matrix_lagged.csv")
LAG_MATRIX_PATH = Path("docs/05 - Corrélations binaires avancées/data/lag_matrix_minutes.csv")
def _align_target(
df: pd.DataFrame,
target_col: str,
horizon_minutes: int,
base_freq_minutes: int = 10,
) -> tuple[pd.DataFrame, pd.Series]:
"""
Décale la cible dans le futur pour l'horizon souhaité et aligne X, y.
"""
steps = _steps_from_minutes(horizon_minutes, base_freq_minutes)
y = df[target_col].shift(-steps)
X_full = df.drop(columns=[target_col])
# Ne garder que les colonnes numériques/booléennes (exclut "season" textuelle)
X = X_full.select_dtypes(include=["number", "bool"])
aligned = pd.concat([X, y.rename("target")], axis=1).dropna()
return aligned.drop(columns=["target"]), aligned["target"]
def _load_correlation_and_lag() -> tuple[pd.DataFrame | None, pd.DataFrame | None]:
corr_df = pd.read_csv(CORR_PATH, index_col=0) if CORR_PATH.exists() else None
lag_df = pd.read_csv(LAG_MATRIX_PATH, index_col=0) if LAG_MATRIX_PATH.exists() else None
return corr_df, lag_df
def _select_features_from_corr(
corr_df: pd.DataFrame | None,
targets: Sequence[str],
threshold: float,
) -> set[str]:
if corr_df is None:
return set()
selected: set[str] = set()
for target in targets:
if target not in corr_df.columns:
continue
corrs = corr_df[target].drop(labels=[target], errors="ignore")
strong = corrs[corrs.abs() >= threshold]
selected.update(strong.index.tolist())
return selected
def _build_lags_from_matrices(
lag_df: pd.DataFrame | None,
corr_df: pd.DataFrame | None,
selected_cols: Iterable[str],
default_lags: dict[str, Sequence[int]],
threshold: float,
) -> dict[str, Sequence[int]]:
"""
Combine lags par défaut et lags issus de la matrice de décalage si |corr| dépasse le seuil.
"""
mapping: dict[str, Sequence[int]] = {}
for col in selected_cols:
base = list(default_lags.get(col, (10, 20, 30)))
extra: set[int] = set()
if lag_df is not None and corr_df is not None and col in lag_df.index:
corrs = corr_df.loc[col]
for tgt, corr_val in corrs.items():
if tgt == col:
continue
if abs(corr_val) < threshold:
continue
lag_val = lag_df.loc[col, tgt]
if pd.notna(lag_val) and lag_val != 0:
extra.add(int(abs(round(float(lag_val)))))
merged = sorted({*base, *extra})
mapping[col] = merged
return mapping
def _scale_train_val_test(X_train: pd.DataFrame, X_val: pd.DataFrame, X_test: pd.DataFrame) -> tuple[np.ndarray, np.ndarray, np.ndarray, StandardScaler]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
return X_train_scaled, X_val_scaled, X_test_scaled, scaler
def _regression_scores(y_true: np.ndarray, y_pred: np.ndarray) -> dict[str, float]:
return {
"mae": float(mean_absolute_error(y_true, y_pred)),
"rmse": float(np.sqrt(mean_squared_error(y_true, y_pred))),
}
def _classification_scores(y_true: np.ndarray, proba: np.ndarray, threshold: float = 0.5) -> dict[str, float]:
y_pred = (proba >= threshold).astype(int)
return {
"f1": float(f1_score(y_true, y_pred, zero_division=0)),
"brier": float(brier_score_loss(y_true, proba)),
"ap": float(average_precision_score(y_true, proba)),
}
def run_regression_models(train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame) -> pd.DataFrame:
rows: list[dict[str, object]] = []
for target_col in CONTINUOUS_TARGETS:
for horizon in HORIZONS_MINUTES:
X_train, y_train = _align_target(train_df, target_col, horizon)
X_val, y_val = _align_target(val_df, target_col, horizon)
X_test, y_test = _align_target(test_df, target_col, horizon)
if y_train.empty or y_val.empty or y_test.empty:
continue
X_train_s, X_val_s, X_test_s, scaler = _scale_train_val_test(X_train, X_val, X_test)
for model_name, model in (
("ridge", Ridge(alpha=1.0)),
("lasso", Lasso(alpha=0.001)),
):
model.fit(X_train_s, y_train)
y_val_pred = model.predict(X_val_s)
y_test_pred = model.predict(X_test_s)
val_scores = _regression_scores(y_val, y_val_pred)
test_scores = _regression_scores(y_test, y_test_pred)
rows.append(
{
"target": target_col,
"horizon_min": horizon,
"model": model_name,
"split": "validation",
**val_scores,
}
)
rows.append(
{
"target": target_col,
"horizon_min": horizon,
"model": model_name,
"split": "test",
**test_scores,
}
)
return pd.DataFrame(rows)
def run_rain_model(train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame) -> pd.DataFrame:
rows: list[dict[str, object]] = []
target_col = RAIN_TARGET
for horizon in HORIZONS_MINUTES:
X_train, y_train = _align_target(train_df, target_col, horizon)
X_val, y_val = _align_target(val_df, target_col, horizon)
X_test, y_test = _align_target(test_df, target_col, horizon)
y_train_bin = (y_train > 0).astype(int)
y_val_bin = (y_val > 0).astype(int)
y_test_bin = (y_test > 0).astype(int)
if y_train_bin.empty or y_val_bin.empty or y_test_bin.empty:
continue
X_train_s, X_val_s, X_test_s, scaler = _scale_train_val_test(X_train, X_val, X_test)
clf = LogisticRegression(max_iter=200)
clf.fit(X_train_s, y_train_bin)
proba_val = clf.predict_proba(X_val_s)[:, 1]
proba_test = clf.predict_proba(X_test_s)[:, 1]
val_scores = _classification_scores(y_val_bin, proba_val)
test_scores = _classification_scores(y_test_bin, proba_test)
rows.append(
{
"target": "rain_binary",
"horizon_min": horizon,
"model": "logistic_regression",
"split": "validation",
**val_scores,
}
)
rows.append(
{
"target": "rain_binary",
"horizon_min": horizon,
"model": "logistic_regression",
"split": "test",
**test_scores,
}
)
return pd.DataFrame(rows)
def plot_regression_results(df: pd.DataFrame, output_path: Path) -> None:
"""Trace la MAE par horizon pour chaque modèle (validation)."""
output_path.parent.mkdir(parents=True, exist_ok=True)
df_val = df[df["split"] == "validation"]
targets = df_val["target"].unique()
models = df_val["model"].unique()
fig, axes = plt.subplots(len(targets), 1, figsize=(8, 4 * len(targets)), sharex=True)
if len(targets) == 1:
axes = [axes]
for ax, target in zip(axes, targets):
sub = df_val[df_val["target"] == target]
for model in models:
line = sub[sub["model"] == model].sort_values("horizon_min")
ax.plot(line["horizon_min"], line["mae"], marker="o", label=model)
ax.set_title(f"MAE {target} (validation)")
ax.set_ylabel("MAE")
ax.grid(True, linestyle=":", alpha=0.4)
axes[-1].set_xlabel("Horizon (minutes)")
axes[0].legend()
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
def plot_rain_curves(df: pd.DataFrame, output_prefix: Path) -> None:
"""Trace PR et ROC sur la validation pour la pluie binaire (logistique)."""
output_prefix.parent.mkdir(parents=True, exist_ok=True)
# Il faut recalculer les courbes à partir des probas ; on les régénère sur val
# On recompute une fois (pas stockées dans df)
# Ce helper est pour garder un format cohérent et simple
return # On gardera les courbes basées sur les scores déjà exportés pour l'instant
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df_raw = load_raw_csv(CSV_PATH)
print(f"Dataset chargé : {CSV_PATH}")
corr_df, lag_df = _load_correlation_and_lag()
selected_from_corr = _select_features_from_corr(corr_df, CONTINUOUS_TARGETS + (RAIN_TARGET,), CORR_THRESHOLD) if USE_CORR_FILTER else set()
# Sélection des colonnes numériques
numeric_cols = df_raw.select_dtypes(include=["number", "bool"]).columns
if USE_CORR_FILTER and selected_from_corr:
# On garde les cibles + les colonnes corrélées
selected_cols = [col for col in numeric_cols if col in selected_from_corr or col in CONTINUOUS_TARGETS or col == RAIN_TARGET]
else:
selected_cols = list(numeric_cols)
lags_mapping = _build_lags_from_matrices(
lag_df,
corr_df,
selected_cols,
default_lags=DEFAULT_LAGS_BY_COL,
threshold=CORR_THRESHOLD,
)
feature_spec = FeatureSpec(lags_minutes=lags_mapping)
df_feat = build_feature_dataframe(df_raw[selected_cols], feature_spec=feature_spec, target_columns=selected_cols)
# Découpe temporelle sans fuite
train_df, val_df, test_df = chronological_split(df_feat, train_frac=0.7, val_frac=0.15)
print(f" Train : {len(train_df)} lignes")
print(f" Val : {len(val_df)} lignes")
print(f" Test : {len(test_df)} lignes")
print()
# Régressions (température/vent)
reg_results = run_regression_models(train_df, val_df, test_df)
# Pluie binaire
rain_results = run_rain_model(train_df, val_df, test_df)
DATA_DIR.mkdir(parents=True, exist_ok=True)
reg_path = DATA_DIR / "models_regression.csv"
rain_path = DATA_DIR / "models_rain.csv"
reg_results.to_csv(reg_path, index=False)
rain_results.to_csv(rain_path, index=False)
print(f"✔ Résultats régression sauvegardés : {reg_path}")
print(f"✔ Résultats pluie sauvegardés : {rain_path}")
# Figures
FIG_DIR.mkdir(parents=True, exist_ok=True)
plot_regression_results(reg_results, FIG_DIR / "models_mae_validation.png")
# Pas de courbes ROC/PR générées ici pour simplifier, mais les scores (F1/Brier/AP) sont disponibles.
print("=== Scores régression (validation) ===")
print(reg_results[reg_results["split"] == "validation"].to_string(index=False, float_format=lambda x: f"{x:.3f}"))
print()
print("=== Scores pluie (validation) ===")
print(rain_results[rain_results["split"] == "validation"].to_string(index=False, float_format=lambda x: f"{x:.3f}"))
if __name__ == "__main__":
main()