"""Fonctions utilitaires pour préparer des variables dérivées simples.""" from __future__ import annotations from collections.abc import Mapping from dataclasses import dataclass from typing import Iterable, Sequence import numpy as np import pandas as pd # Valeurs par défaut pour rester aligné sur le pas de 10 minutes du CSV DEFAULT_BASE_FREQ_MINUTES = 10 # Les lags et fenêtres par défaut servent de garde-fous génériques pour un pas de 10 minutes. # Lorsque des lags spécifiques ont été identifiés (ex : matrices du chapitre 5), on peut # surcharger ces valeurs via FeatureSpec ou directement dans les fonctions. DEFAULT_LAGS_MINUTES: tuple[int, ...] = (10, 20, 30) DEFAULT_ROLLING_WINDOWS_MINUTES: tuple[int, ...] = (30, 60) @dataclass(frozen=True) class FeatureSpec: """Configuration minimale pour construire les variables dérivées.""" # Peut être une séquence (appliquée à toutes les colonnes) ou un mapping {col: [lags]} lags_minutes: Sequence[int] | Mapping[str, Sequence[int]] = DEFAULT_LAGS_MINUTES rolling_windows_minutes: Sequence[int] = DEFAULT_ROLLING_WINDOWS_MINUTES base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES def _steps_from_minutes(minutes: int, base_freq_minutes: int) -> int: """Convertit un horizon en minutes vers un nombre de pas (arrondi à l'entier).""" steps = int(round(minutes / base_freq_minutes)) if steps <= 0: raise ValueError("L'horizon en minutes doit représenter au moins un pas de temps.") return steps def add_time_features(df: pd.DataFrame) -> pd.DataFrame: """ Ajoute des composantes sin/cos pour l'heure et le jour de l'année. On évite ainsi le faux saut entre 23 h et 0 h (ou 31 décembre / 1er janvier). """ if not isinstance(df.index, pd.DatetimeIndex): raise TypeError("add_time_features attend un index temporel (DatetimeIndex).") out = df.copy() idx = out.index # Heure locale (fraction de la journée) hour_fraction = (idx.hour + idx.minute / 60.0 + idx.second / 3600.0) / 24.0 hour_angle = 2 * np.pi * hour_fraction out["time_hour_sin"] = np.sin(hour_angle) out["time_hour_cos"] = np.cos(hour_angle) # Jour de l'année (1..365/366) day_of_year = idx.dayofyear year_days = 366 if idx.is_leap_year.any() else 365 day_fraction = (day_of_year - 1) / year_days day_angle = 2 * np.pi * day_fraction out["time_dayofyear_sin"] = np.sin(day_angle) out["time_dayofyear_cos"] = np.cos(day_angle) return out def add_wind_components( df: pd.DataFrame, *, speed_col: str = "wind_speed", direction_col: str = "wind_direction", ) -> pd.DataFrame: """Ajoute les composantes cartésiennes du vent (u, v) pour éviter la discontinuité 0/360°.""" out = df.copy() missing = [col for col in (speed_col, direction_col) if col not in out.columns] if missing: raise KeyError(f"Colonnes manquantes pour le vent : {missing}") direction_rad = np.deg2rad(out[direction_col].astype(float)) speed = out[speed_col].astype(float) out["wind_u"] = speed * np.sin(direction_rad) out["wind_v"] = speed * np.cos(direction_rad) return out def add_lag_features( df: pd.DataFrame, columns: Iterable[str], *, lags_minutes: Sequence[int] | Mapping[str, Sequence[int]] = DEFAULT_LAGS_MINUTES, base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES, ) -> pd.DataFrame: """Ajoute des valeurs décalées dans le passé (lags) pour les colonnes données.""" out = df.copy() for col in columns: if col not in out.columns: raise KeyError(f"Colonne absente pour les lags : {col}") lags_for_col: Sequence[int] if isinstance(lags_minutes, Mapping): lags_for_col = lags_minutes.get(col, DEFAULT_LAGS_MINUTES) else: lags_for_col = lags_minutes for lag in lags_for_col: steps = _steps_from_minutes(lag, base_freq_minutes) out[f"{col}_lag_{lag}m"] = out[col].shift(steps) return out def add_delta_features( df: pd.DataFrame, columns: Iterable[str], *, delta_minutes: int = DEFAULT_BASE_FREQ_MINUTES, base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES, ) -> pd.DataFrame: """ Ajoute des variations récentes (delta) pour chaque colonne : valeur actuelle moins valeur à T - delta. Cela donne la pente locale (vitesse de variation) sur le dernier intervalle. """ out = df.copy() steps = _steps_from_minutes(delta_minutes, base_freq_minutes) for col in columns: if col not in out.columns: raise KeyError(f"Colonne absente pour les deltas : {col}") out[f"{col}_delta_{delta_minutes}m"] = out[col] - out[col].shift(steps) return out def add_rolling_features( df: pd.DataFrame, columns: Iterable[str], *, windows_minutes: Sequence[int] = DEFAULT_ROLLING_WINDOWS_MINUTES, base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES, stats: Sequence[str] = ("mean", "median"), ) -> pd.DataFrame: """Ajoute des statistiques glissantes (moyenne/médiane) sur les colonnes données.""" out = df.copy() for col in columns: if col not in out.columns: raise KeyError(f"Colonne absente pour les moyennes glissantes : {col}") for window in windows_minutes: steps = _steps_from_minutes(window, base_freq_minutes) rolling = out[col].rolling(window=steps, min_periods=max(1, steps // 2)) for stat in stats: if stat == "mean": out[f"{col}_rollmean_{window}m"] = rolling.mean() elif stat == "median": out[f"{col}_rollmed_{window}m"] = rolling.median() else: raise ValueError(f"Statistique glissante non supportée : {stat}") return out def build_feature_dataframe( df: pd.DataFrame, *, feature_spec: FeatureSpec | None = None, target_columns: Sequence[str] = ("temperature", "wind_speed", "rain_rate"), include_event_flags: bool = True, ) -> pd.DataFrame: """ Construit un DataFrame enrichi avec les variables dérivées essentielles. - Ajoute les composantes temporelles sin/cos. - Ajoute les composantes du vent (u, v) si disponibles. - Ajoute lags, deltas, moyennes glissantes pour les colonnes cibles fournies. - Optionnellement ajoute des indicateurs d'événements simples. """ spec = feature_spec or FeatureSpec() out = df.copy() out = add_time_features(out) # Vent en composantes u/v si les colonnes existent if "wind_speed" in out.columns and "wind_direction" in out.columns: out = add_wind_components(out) out = add_lag_features( out, columns=target_columns, lags_minutes=spec.lags_minutes, base_freq_minutes=spec.base_freq_minutes, ) out = add_delta_features( out, columns=target_columns, delta_minutes=spec.base_freq_minutes, base_freq_minutes=spec.base_freq_minutes, ) out = add_rolling_features( out, columns=target_columns, windows_minutes=spec.rolling_windows_minutes, base_freq_minutes=spec.base_freq_minutes, ) if include_event_flags: if "rain_rate" in out.columns: out["flag_rain_now"] = (out["rain_rate"] > 0).astype(int) if "wind_speed" in out.columns: out["flag_wind_strong"] = (out["wind_speed"] >= 30.0).astype(int) if "temperature" in out.columns: out["flag_hot"] = (out["temperature"] >= 30.0).astype(int) out["flag_cold"] = (out["temperature"] <= 0.0).astype(int) return out