1
donnees_meteo/meteo/analysis/correlations.py
2025-11-18 09:01:34 +01:00

202 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Calculs statistiques liés aux corrélations (instantanées, décalées, glissantes)."""
from __future__ import annotations
from typing import Literal, Sequence
import numpy as np
import pandas as pd
from meteo.variables import Variable
from .core import _ensure_datetime_index
__all__ = ['compute_correlation_matrix', 'compute_correlation_matrix_for_variables', 'compute_lagged_correlation', 'compute_rolling_correlation_series', 'compute_rolling_correlations_for_pairs']
def compute_correlation_matrix(
df: pd.DataFrame,
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la matrice de corrélation entre toutes les colonnes numériques
du DataFrame.
Attention :
- La direction du vent est traitée ici comme une variable scalaire 0360°,
ce qui n'est pas idéal pour une analyse circulaire. On affinera plus tard
si besoin (représentation en sin/cos).
"""
numeric_df = df.select_dtypes(include=["number"])
corr = numeric_df.corr(method=method)
return corr
def compute_correlation_matrix_for_variables(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la matrice de corrélation pour un sous-ensemble de variables,
dans un ordre bien défini.
Paramètres
----------
df :
DataFrame contenant les colonnes à analyser.
variables :
Séquence de Variable décrivant les colonnes à prendre en compte.
method :
Méthode de corrélation pandas (pearson, spearman, ...).
Retour
------
DataFrame :
Matrice de corrélation, index et colonnes dans le même ordre que
`variables`, avec les colonnes pandas correspondant aux noms de colonnes
du DataFrame (ex: "temperature", "humidity", ...).
"""
columns = [v.column for v in variables]
missing = [c for c in columns if c not in df.columns]
if missing:
raise KeyError(f"Colonnes manquantes dans le DataFrame : {missing!r}")
numeric_df = df[columns].astype(float)
corr = numeric_df.corr(method=method)
# On s'assure de l'ordre
corr = corr.loc[columns, columns]
return corr
def compute_lagged_correlation(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
max_lag_minutes: int = 360,
step_minutes: int = 10,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la corrélation entre deux variables pour une série de décalages
temporels (lags).
Convention :
- lag > 0 : X "précède" Y de `lag` minutes.
On corrèle X(t) avec Y(t + lag).
- lag < 0 : Y "précède" X de |lag| minutes.
On corrèle X(t) avec Y(t + lag), lag étant négatif.
Implémentation :
- On utilise un DataFrame avec les deux colonnes,
puis on applique un `shift` sur Y.
"""
if var_x.column not in df.columns or var_y.column not in df.columns:
raise KeyError("Les colonnes demandées ne sont pas présentes dans le DataFrame.")
series_x = df[var_x.column]
series_y = df[var_y.column]
lags = range(-max_lag_minutes, max_lag_minutes + 1, step_minutes)
results: list[tuple[int, float]] = []
for lag in lags:
# Y décalé de -lag : pour lag positif, on corrèle X(t) à Y(t + lag)
shifted_y = series_y.shift(-lag)
pair = pd.concat([series_x, shifted_y], axis=1).dropna()
if pair.empty:
corr = np.nan
else:
corr = pair.iloc[:, 0].corr(pair.iloc[:, 1], method=method)
results.append((lag, corr))
lag_df = pd.DataFrame(results, columns=["lag_minutes", "correlation"])
lag_df = lag_df.set_index("lag_minutes")
return lag_df
def compute_rolling_correlation_series(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.Series:
"""
Calcule la corrélation glissante X/Y sur une fenêtre temporelle.
Retourne une série indexée par l'instant de fin de fenêtre.
"""
if not 0 < min_valid_fraction <= 1:
raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].")
for col in (var_x.column, var_y.column):
if col not in df.columns:
raise KeyError(f"Colonne absente du DataFrame : {col}")
_ensure_datetime_index(df)
pair = df[[var_x.column, var_y.column]].dropna().sort_index()
if pair.empty:
return pd.Series(dtype=float, name=f"{var_x.key}{var_y.key}")
window = f"{window_minutes}min"
min_periods = max(1, int(window_minutes * min_valid_fraction))
if method not in {"pearson"}:
raise NotImplementedError(
"Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'."
)
rolling_corr = pair[var_x.column].rolling(
window=window,
min_periods=min_periods,
).corr(pair[var_y.column])
rolling_corr = rolling_corr.dropna()
rolling_corr.name = f"{var_x.key}{var_y.key}"
if step_minutes and step_minutes > 1:
rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna()
return rolling_corr
def compute_rolling_correlations_for_pairs(
df: pd.DataFrame,
pairs: Sequence[tuple[Variable, Variable]],
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule les corrélations glissantes pour plusieurs paires et aligne les
résultats dans un DataFrame (index temps, colonnes = 'x→y').
"""
series_list: list[pd.Series] = []
for var_x, var_y in pairs:
corr = compute_rolling_correlation_series(
df=df,
var_x=var_x,
var_y=var_y,
window_minutes=window_minutes,
min_valid_fraction=min_valid_fraction,
step_minutes=step_minutes,
method=method,
)
if not corr.empty:
series_list.append(corr)
if not series_list:
return pd.DataFrame()
result = pd.concat(series_list, axis=1)
result = result.sort_index()
return result