# scripts/run_baselines.py from __future__ import annotations from pathlib import Path import sys from typing import Iterable import matplotlib.pyplot as plt import numpy as np import pandas as pd from sklearn.metrics import ( mean_absolute_error, mean_squared_error, precision_recall_fscore_support, brier_score_loss, ) PROJECT_ROOT = Path(__file__).resolve().parents[3] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from meteo.dataset import load_raw_csv from model.baselines import ( persistence_baseline, moving_average_baseline, hourly_climatology_baseline, ) from model.splits import chronological_split from model.features import _steps_from_minutes, DEFAULT_BASE_FREQ_MINUTES CSV_PATH = Path("data/weather_minutely.csv") DOC_DIR = Path(__file__).resolve().parent.parent DATA_DIR = DOC_DIR / "data" FIG_DIR = DOC_DIR / "figures" HORIZONS_MINUTES: tuple[int, ...] = (10, 60, 360, 1440) CONTINUOUS_TARGETS: tuple[str, ...] = ("temperature", "wind_speed") RAIN_TARGET: str = "rain_rate" MOVING_AVG_WINDOW_MINUTES = 60 def _ensure_columns(df: pd.DataFrame, columns: Iterable[str]) -> None: missing = [c for c in columns if c not in df.columns] if missing: raise KeyError(f"Colonnes manquantes dans le DataFrame : {missing}") def _regression_scores(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: return { "mae": float(mean_absolute_error(y_true, y_pred)), "rmse": float(np.sqrt(mean_squared_error(y_true, y_pred))), } def _classification_scores(y_true: pd.Series, proba: pd.Series, threshold: float = 0.5) -> dict[str, float]: proba = proba.clip(0.0, 1.0) y_pred = (proba >= threshold).astype(int) precision, recall, f1, _ = precision_recall_fscore_support( y_true, y_pred, average="binary", zero_division=0 ) try: brier = float(brier_score_loss(y_true, proba)) except ValueError: brier = float("nan") return { "precision": float(precision), "recall": float(recall), "f1": float(f1), "brier": brier, } def evaluate_regression_baselines( series_train: pd.Series, series_eval: pd.Series, *, horizons: Iterable[int], ) -> pd.DataFrame: """ Évalue persistance, moyenne mobile et climatologie horaire sur un jeu (validation ou test). """ rows: list[dict[str, object]] = [] for horizon in horizons: # Persistance (évaluée sur le jeu cible uniquement) frame_persist = persistence_baseline(series_eval, horizon_minutes=horizon) reg_persist = _regression_scores(frame_persist["y_true"], frame_persist["y_pred"]) rows.append( { "target": series_eval.name, "horizon_min": horizon, "baseline": "persistance", "n_samples": len(frame_persist), **reg_persist, } ) # Moyenne mobile (évaluée sur le jeu cible uniquement) frame_ma = moving_average_baseline( series_eval, horizon_minutes=horizon, window_minutes=MOVING_AVG_WINDOW_MINUTES, ) reg_ma = _regression_scores(frame_ma["y_true"], frame_ma["y_pred"]) rows.append( { "target": series_eval.name, "horizon_min": horizon, "baseline": f"moyenne_mobile_{MOVING_AVG_WINDOW_MINUTES}m", "n_samples": len(frame_ma), **reg_ma, } ) # Climatologie horaire : nécessite l'heure de la cible (utilise la partie train) steps = _steps_from_minutes(horizon, DEFAULT_BASE_FREQ_MINUTES) y_true = series_eval.shift(-steps) y_true = y_true.dropna() preds = hourly_climatology_baseline( series_train, eval_index=y_true.index, horizon_minutes=horizon, ) preds = preds.loc[y_true.index] reg_clim = _regression_scores(y_true, preds) rows.append( { "target": series_eval.name, "horizon_min": horizon, "baseline": "climatologie_horaire", "n_samples": len(y_true), **reg_clim, } ) return pd.DataFrame(rows) def evaluate_rain_baselines( rain_train: pd.Series, rain_eval: pd.Series, *, horizons: Iterable[int], ) -> pd.DataFrame: """ Évalue des baselines pour la pluie (version binaire pluie oui/non). """ rows: list[dict[str, object]] = [] rain_train_bin = (rain_train > 0).astype(int) rain_eval_bin = (rain_eval > 0).astype(int) for horizon in horizons: steps = _steps_from_minutes(horizon, DEFAULT_BASE_FREQ_MINUTES) # Persistance frame_persist = persistence_baseline(rain_eval, horizon_minutes=horizon) y_true = (frame_persist["y_true"] > 0).astype(int) proba = (frame_persist["y_pred"] > 0).astype(float) cls_persist = _classification_scores(y_true, proba, threshold=0.5) rows.append( { "target": "rain", "horizon_min": horizon, "baseline": "persistance", "n_samples": len(y_true), **cls_persist, } ) # Moyenne mobile (prédiction binaire à partir du cumul moyen) frame_ma = moving_average_baseline( rain_eval, horizon_minutes=horizon, window_minutes=MOVING_AVG_WINDOW_MINUTES, ) y_true_ma = (frame_ma["y_true"] > 0).astype(int) proba_ma = (frame_ma["y_pred"] > 0).astype(float) cls_ma = _classification_scores(y_true_ma, proba_ma, threshold=0.5) rows.append( { "target": "rain", "horizon_min": horizon, "baseline": f"moyenne_mobile_{MOVING_AVG_WINDOW_MINUTES}m", "n_samples": len(y_true_ma), **cls_ma, } ) # Climatologie horaire (probabilité de pluie par heure) y_true_clim = rain_eval_bin.shift(-steps).dropna() proba_clim = hourly_climatology_baseline( rain_train_bin, eval_index=rain_eval_bin.index, horizon_minutes=horizon, ) proba_clim = proba_clim.loc[y_true_clim.index].fillna(0.0) cls_clim = _classification_scores(y_true_clim, proba_clim, threshold=0.5) rows.append( { "target": "rain", "horizon_min": horizon, "baseline": "climatologie_horaire", "n_samples": len(y_true_clim), **cls_clim, } ) return pd.DataFrame(rows) def _save_csv(df: pd.DataFrame, path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(path, index=False) def plot_regression_mae(reg_df: pd.DataFrame, output_path: Path) -> None: """Trace la MAE des baselines (validation) par horizon pour température et vent.""" output_path.parent.mkdir(parents=True, exist_ok=True) df = reg_df[reg_df["split"] == "validation"] fig, axes = plt.subplots(2, 1, figsize=(8, 6), sharex=True) targets = ["temperature", "wind_speed"] baselines = df["baseline"].unique() for ax, target in zip(axes, targets): sub = df[df["target"] == target] for baseline in baselines: line = sub[sub["baseline"] == baseline].sort_values("horizon_min") ax.plot(line["horizon_min"], line["mae"], marker="o", label=baseline) ax.set_title(f"MAE {target} (validation)") ax.set_ylabel("MAE") ax.grid(True, linestyle=":", alpha=0.4) axes[-1].set_xlabel("Horizon (minutes)") axes[0].legend() fig.tight_layout() fig.savefig(output_path, dpi=150) plt.close(fig) def plot_rain_scores(rain_df: pd.DataFrame, output_path: Path) -> None: """Trace F1 et Brier des baselines pluie (validation) par horizon.""" output_path.parent.mkdir(parents=True, exist_ok=True) df = rain_df[rain_df["split"] == "validation"] baselines = df["baseline"].unique() fig, axes = plt.subplots(2, 1, figsize=(8, 6), sharex=True) for metric, ax in zip(("f1", "brier"), axes): for baseline in baselines: line = df[df["baseline"] == baseline].sort_values("horizon_min") ax.plot(line["horizon_min"], line[metric], marker="o", label=baseline) ax.set_title(f"{metric.upper()} pluie (validation)" if metric == "f1" else "Brier pluie (validation)") ax.set_ylabel(metric.upper() if metric == "f1" else "Brier") ax.grid(True, linestyle=":", alpha=0.4) axes[-1].set_xlabel("Horizon (minutes)") axes[0].legend() fig.tight_layout() fig.savefig(output_path, dpi=150) plt.close(fig) def main() -> None: if not CSV_PATH.exists(): print(f"⚠ Fichier introuvable : {CSV_PATH}") return df = load_raw_csv(CSV_PATH) _ensure_columns(df, CONTINUOUS_TARGETS + (RAIN_TARGET,)) # Découpe temporelle sans fuite train_df, val_df, test_df = chronological_split(df, train_frac=0.7, val_frac=0.15) print(f"Dataset chargé : {CSV_PATH}") print(f" Train : {len(train_df)} lignes") print(f" Val : {len(val_df)} lignes") print(f" Test : {len(test_df)} lignes") print() # Évalue sur validation reg_val_rows: list[pd.DataFrame] = [] for target in CONTINUOUS_TARGETS: reg_val_rows.append( evaluate_regression_baselines( train_df[target], val_df[target], horizons=HORIZONS_MINUTES, ) ) reg_val = pd.concat(reg_val_rows, ignore_index=True) rain_val = evaluate_rain_baselines( rain_train=train_df[RAIN_TARGET], rain_eval=val_df[RAIN_TARGET], horizons=HORIZONS_MINUTES, ) # Évalue sur test reg_test_rows: list[pd.DataFrame] = [] for target in CONTINUOUS_TARGETS: reg_test_rows.append( evaluate_regression_baselines( train_df[target], test_df[target], horizons=HORIZONS_MINUTES, ) ) reg_test = pd.concat(reg_test_rows, ignore_index=True) rain_test = evaluate_rain_baselines( rain_train=train_df[RAIN_TARGET], rain_eval=test_df[RAIN_TARGET], horizons=HORIZONS_MINUTES, ) # Combine et sauvegarde en CSV reg_val["split"] = "validation" reg_test["split"] = "test" reg_all = pd.concat([reg_val, reg_test], ignore_index=True) rain_val["split"] = "validation" rain_test["split"] = "test" rain_all = pd.concat([rain_val, rain_test], ignore_index=True) DATA_DIR.mkdir(parents=True, exist_ok=True) _save_csv(reg_all, DATA_DIR / "baselines_regression.csv") _save_csv(rain_all, DATA_DIR / "baselines_rain.csv") # Figures (validation uniquement pour la lisibilité) FIG_DIR.mkdir(parents=True, exist_ok=True) plot_regression_mae(reg_all, FIG_DIR / "baselines_mae_validation.png") plot_rain_scores(rain_all, FIG_DIR / "baselines_rain_validation.png") print("=== Baselines validation (température / vent) ===") print(reg_val.to_string(index=False, float_format=lambda x: f"{x:.3f}")) print() print("=== Baselines validation (pluie binaire) ===") print(rain_val.to_string(index=False, float_format=lambda x: f"{x:.3f}")) print() print("=== Baselines test (température / vent) ===") print(reg_test.to_string(index=False, float_format=lambda x: f"{x:.3f}")) print() print("=== Baselines test (pluie binaire) ===") print(rain_test.to_string(index=False, float_format=lambda x: f"{x:.3f}")) if __name__ == "__main__": main()