1
donnees_meteo/model/features.py

216 lines
7.5 KiB
Python

"""Fonctions utilitaires pour préparer des variables dérivées simples."""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Iterable, Sequence
import numpy as np
import pandas as pd
# Valeurs par défaut pour rester aligné sur le pas de 10 minutes du CSV
DEFAULT_BASE_FREQ_MINUTES = 10
# Les lags et fenêtres par défaut servent de garde-fous génériques pour un pas de 10 minutes.
# Lorsque des lags spécifiques ont été identifiés (ex : matrices du chapitre 5), on peut
# surcharger ces valeurs via FeatureSpec ou directement dans les fonctions.
DEFAULT_LAGS_MINUTES: tuple[int, ...] = (10, 20, 30)
DEFAULT_ROLLING_WINDOWS_MINUTES: tuple[int, ...] = (30, 60)
@dataclass(frozen=True)
class FeatureSpec:
"""Configuration minimale pour construire les variables dérivées."""
# Peut être une séquence (appliquée à toutes les colonnes) ou un mapping {col: [lags]}
lags_minutes: Sequence[int] | Mapping[str, Sequence[int]] = DEFAULT_LAGS_MINUTES
rolling_windows_minutes: Sequence[int] = DEFAULT_ROLLING_WINDOWS_MINUTES
base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES
def _steps_from_minutes(minutes: int, base_freq_minutes: int) -> int:
"""Convertit un horizon en minutes vers un nombre de pas (arrondi à l'entier)."""
steps = int(round(minutes / base_freq_minutes))
if steps <= 0:
raise ValueError("L'horizon en minutes doit représenter au moins un pas de temps.")
return steps
def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Ajoute des composantes sin/cos pour l'heure et le jour de l'année.
On évite ainsi le faux saut entre 23 h et 0 h (ou 31 décembre / 1er janvier).
"""
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("add_time_features attend un index temporel (DatetimeIndex).")
out = df.copy()
idx = out.index
# Heure locale (fraction de la journée)
hour_fraction = (idx.hour + idx.minute / 60.0 + idx.second / 3600.0) / 24.0
hour_angle = 2 * np.pi * hour_fraction
out["time_hour_sin"] = np.sin(hour_angle)
out["time_hour_cos"] = np.cos(hour_angle)
# Jour de l'année (1..365/366)
day_of_year = idx.dayofyear
year_days = 366 if idx.is_leap_year.any() else 365
day_fraction = (day_of_year - 1) / year_days
day_angle = 2 * np.pi * day_fraction
out["time_dayofyear_sin"] = np.sin(day_angle)
out["time_dayofyear_cos"] = np.cos(day_angle)
return out
def add_wind_components(
df: pd.DataFrame,
*,
speed_col: str = "wind_speed",
direction_col: str = "wind_direction",
) -> pd.DataFrame:
"""Ajoute les composantes cartésiennes du vent (u, v) pour éviter la discontinuité 0/360°."""
out = df.copy()
missing = [col for col in (speed_col, direction_col) if col not in out.columns]
if missing:
raise KeyError(f"Colonnes manquantes pour le vent : {missing}")
direction_rad = np.deg2rad(out[direction_col].astype(float))
speed = out[speed_col].astype(float)
out["wind_u"] = speed * np.sin(direction_rad)
out["wind_v"] = speed * np.cos(direction_rad)
return out
def add_lag_features(
df: pd.DataFrame,
columns: Iterable[str],
*,
lags_minutes: Sequence[int] | Mapping[str, Sequence[int]] = DEFAULT_LAGS_MINUTES,
base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES,
) -> pd.DataFrame:
"""Ajoute des valeurs décalées dans le passé (lags) pour les colonnes données."""
out = df.copy()
for col in columns:
if col not in out.columns:
raise KeyError(f"Colonne absente pour les lags : {col}")
lags_for_col: Sequence[int]
if isinstance(lags_minutes, Mapping):
lags_for_col = lags_minutes.get(col, DEFAULT_LAGS_MINUTES)
else:
lags_for_col = lags_minutes
for lag in lags_for_col:
steps = _steps_from_minutes(lag, base_freq_minutes)
out[f"{col}_lag_{lag}m"] = out[col].shift(steps)
return out
def add_delta_features(
df: pd.DataFrame,
columns: Iterable[str],
*,
delta_minutes: int = DEFAULT_BASE_FREQ_MINUTES,
base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES,
) -> pd.DataFrame:
"""
Ajoute des variations récentes (delta) pour chaque colonne : valeur actuelle moins valeur à T - delta.
Cela donne la pente locale (vitesse de variation) sur le dernier intervalle.
"""
out = df.copy()
steps = _steps_from_minutes(delta_minutes, base_freq_minutes)
for col in columns:
if col not in out.columns:
raise KeyError(f"Colonne absente pour les deltas : {col}")
out[f"{col}_delta_{delta_minutes}m"] = out[col] - out[col].shift(steps)
return out
def add_rolling_features(
df: pd.DataFrame,
columns: Iterable[str],
*,
windows_minutes: Sequence[int] = DEFAULT_ROLLING_WINDOWS_MINUTES,
base_freq_minutes: int = DEFAULT_BASE_FREQ_MINUTES,
stats: Sequence[str] = ("mean", "median"),
) -> pd.DataFrame:
"""Ajoute des statistiques glissantes (moyenne/médiane) sur les colonnes données."""
out = df.copy()
for col in columns:
if col not in out.columns:
raise KeyError(f"Colonne absente pour les moyennes glissantes : {col}")
for window in windows_minutes:
steps = _steps_from_minutes(window, base_freq_minutes)
rolling = out[col].rolling(window=steps, min_periods=max(1, steps // 2))
for stat in stats:
if stat == "mean":
out[f"{col}_rollmean_{window}m"] = rolling.mean()
elif stat == "median":
out[f"{col}_rollmed_{window}m"] = rolling.median()
else:
raise ValueError(f"Statistique glissante non supportée : {stat}")
return out
def build_feature_dataframe(
df: pd.DataFrame,
*,
feature_spec: FeatureSpec | None = None,
target_columns: Sequence[str] = ("temperature", "wind_speed", "rain_rate"),
include_event_flags: bool = True,
) -> pd.DataFrame:
"""
Construit un DataFrame enrichi avec les variables dérivées essentielles.
- Ajoute les composantes temporelles sin/cos.
- Ajoute les composantes du vent (u, v) si disponibles.
- Ajoute lags, deltas, moyennes glissantes pour les colonnes cibles fournies.
- Optionnellement ajoute des indicateurs d'événements simples.
"""
spec = feature_spec or FeatureSpec()
out = df.copy()
out = add_time_features(out)
# Vent en composantes u/v si les colonnes existent
if "wind_speed" in out.columns and "wind_direction" in out.columns:
out = add_wind_components(out)
out = add_lag_features(
out,
columns=target_columns,
lags_minutes=spec.lags_minutes,
base_freq_minutes=spec.base_freq_minutes,
)
out = add_delta_features(
out,
columns=target_columns,
delta_minutes=spec.base_freq_minutes,
base_freq_minutes=spec.base_freq_minutes,
)
out = add_rolling_features(
out,
columns=target_columns,
windows_minutes=spec.rolling_windows_minutes,
base_freq_minutes=spec.base_freq_minutes,
)
if include_event_flags:
if "rain_rate" in out.columns:
out["flag_rain_now"] = (out["rain_rate"] > 0).astype(int)
if "wind_speed" in out.columns:
out["flag_wind_strong"] = (out["wind_speed"] >= 30.0).astype(int)
if "temperature" in out.columns:
out["flag_hot"] = (out["temperature"] >= 30.0).astype(int)
out["flag_cold"] = (out["temperature"] <= 0.0).astype(int)
return out