"""Calculs statistiques liés aux corrélations (instantanées, décalées, glissantes).""" from __future__ import annotations from typing import Callable, Literal, Sequence import numpy as np import pandas as pd from meteo.variables import Variable from .core import _ensure_datetime_index __all__ = ['compute_correlation_matrix', 'compute_correlation_matrix_for_variables', 'compute_correlation_matrices_for_methods', 'compute_lagged_correlation', 'compute_rolling_correlation_series', 'compute_rolling_correlations_for_pairs', 'transform_correlation_matrix'] CorrelationMethod = Literal["pearson", "spearman", "kendall"] CorrelationTransform = Literal["identity", "absolute", "square"] def compute_correlation_matrix( df: pd.DataFrame, *, method: CorrelationMethod = "pearson", ) -> pd.DataFrame: """ Calcule la matrice de corrélation entre toutes les colonnes numériques du DataFrame. Attention : - La direction du vent est traitée ici comme une variable scalaire 0–360°, ce qui n'est pas idéal pour une analyse circulaire. On affinera plus tard si besoin (représentation en sin/cos). """ numeric_df = df.select_dtypes(include=["number"]) corr = numeric_df.corr(method=method) return corr def compute_correlation_matrix_for_variables( df: pd.DataFrame, variables: Sequence[Variable], *, method: CorrelationMethod = "pearson", ) -> pd.DataFrame: """ Calcule la matrice de corrélation pour un sous-ensemble de variables, dans un ordre bien défini. Paramètres ---------- df : DataFrame contenant les colonnes à analyser. variables : Séquence de Variable décrivant les colonnes à prendre en compte. method : Méthode de corrélation pandas (pearson, spearman, ...). Retour ------ DataFrame : Matrice de corrélation, index et colonnes dans le même ordre que `variables`, avec les colonnes pandas correspondant aux noms de colonnes du DataFrame (ex: "temperature", "humidity", ...). """ columns = [v.column for v in variables] missing = [c for c in columns if c not in df.columns] if missing: raise KeyError(f"Colonnes manquantes dans le DataFrame : {missing!r}") numeric_df = df[columns].astype(float) corr = numeric_df.corr(method=method) # On s'assure de l'ordre corr = corr.loc[columns, columns] return corr def transform_correlation_matrix( corr: pd.DataFrame, *, transform: CorrelationTransform | Callable[[pd.DataFrame], pd.DataFrame] = "identity", ) -> pd.DataFrame: """Applique une transformation générique sur une matrice de corrélation.""" if callable(transform): return transform(corr) if transform == "identity": return corr if transform == "absolute": return corr.abs() if transform == "square": return corr.pow(2) raise ValueError(f"Transformation de corrélation inconnue : {transform!r}") def compute_correlation_matrices_for_methods( df: pd.DataFrame, variables: Sequence[Variable], *, methods: Sequence[CorrelationMethod], transform: CorrelationTransform | Callable[[pd.DataFrame], pd.DataFrame] = "identity", ) -> dict[str, pd.DataFrame]: """Calcule plusieurs matrices de corrélation en une seule passe.""" if not methods: raise ValueError("La liste des méthodes de corrélation est vide.") matrices: dict[str, pd.DataFrame] = {} for method in methods: corr = compute_correlation_matrix_for_variables(df, variables, method=method) matrices[method] = transform_correlation_matrix(corr, transform=transform) return matrices def compute_lagged_correlation( df: pd.DataFrame, var_x: Variable, var_y: Variable, *, max_lag_minutes: int = 360, step_minutes: int = 10, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.DataFrame: """ Calcule la corrélation entre deux variables pour une série de décalages temporels (lags). Convention : - lag > 0 : X "précède" Y de `lag` minutes. On corrèle X(t) avec Y(t + lag). - lag < 0 : Y "précède" X de |lag| minutes. On corrèle X(t) avec Y(t + lag), lag étant négatif. Implémentation : - On utilise un DataFrame avec les deux colonnes, puis on applique un `shift` sur Y. """ if var_x.column not in df.columns or var_y.column not in df.columns: raise KeyError("Les colonnes demandées ne sont pas présentes dans le DataFrame.") series_x = df[var_x.column] series_y = df[var_y.column] lags = range(-max_lag_minutes, max_lag_minutes + 1, step_minutes) results: list[tuple[int, float]] = [] for lag in lags: # Y décalé de -lag : pour lag positif, on corrèle X(t) à Y(t + lag) shifted_y = series_y.shift(-lag) pair = pd.concat([series_x, shifted_y], axis=1).dropna() if pair.empty: corr = np.nan else: corr = pair.iloc[:, 0].corr(pair.iloc[:, 1], method=method) results.append((lag, corr)) lag_df = pd.DataFrame(results, columns=["lag_minutes", "correlation"]) lag_df = lag_df.set_index("lag_minutes") return lag_df def compute_rolling_correlation_series( df: pd.DataFrame, var_x: Variable, var_y: Variable, *, window_minutes: int, min_valid_fraction: float = 0.6, step_minutes: int | None = None, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.Series: """ Calcule la corrélation glissante X/Y sur une fenêtre temporelle. Retourne une série indexée par l'instant de fin de fenêtre. """ if not 0 < min_valid_fraction <= 1: raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].") for col in (var_x.column, var_y.column): if col not in df.columns: raise KeyError(f"Colonne absente du DataFrame : {col}") _ensure_datetime_index(df) pair = df[[var_x.column, var_y.column]].dropna().sort_index() if pair.empty: return pd.Series(dtype=float, name=f"{var_x.key}→{var_y.key}") window = f"{window_minutes}min" min_periods = max(1, int(window_minutes * min_valid_fraction)) if method not in {"pearson"}: raise NotImplementedError( "Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'." ) rolling_corr = pair[var_x.column].rolling( window=window, min_periods=min_periods, ).corr(pair[var_y.column]) rolling_corr = rolling_corr.dropna() rolling_corr.name = f"{var_x.key}→{var_y.key}" if step_minutes and step_minutes > 1: rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna() return rolling_corr def compute_rolling_correlations_for_pairs( df: pd.DataFrame, pairs: Sequence[tuple[Variable, Variable]], *, window_minutes: int, min_valid_fraction: float = 0.6, step_minutes: int | None = None, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.DataFrame: """ Calcule les corrélations glissantes pour plusieurs paires et aligne les résultats dans un DataFrame (index temps, colonnes = 'x→y'). """ series_list: list[pd.Series] = [] for var_x, var_y in pairs: corr = compute_rolling_correlation_series( df=df, var_x=var_x, var_y=var_y, window_minutes=window_minutes, min_valid_fraction=min_valid_fraction, step_minutes=step_minutes, method=method, ) if not corr.empty: series_list.append(corr) if not series_list: return pd.DataFrame() result = pd.concat(series_list, axis=1) result = result.sort_index() return result