# meteo/analysis.py from __future__ import annotations from dataclasses import dataclass from typing import Literal, Sequence import numpy as np import pandas as pd from .variables import Variable from .season import SEASON_LABELS MONTH_ORDER = list(range(1, 13)) def compute_correlation_matrix( df: pd.DataFrame, *, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.DataFrame: """ Calcule la matrice de corrélation entre toutes les colonnes numériques du DataFrame. Attention : - La direction du vent est traitée ici comme une variable scalaire 0–360°, ce qui n'est pas idéal pour une analyse circulaire. On affinera plus tard si besoin (représentation en sin/cos). """ numeric_df = df.select_dtypes(include=["number"]) corr = numeric_df.corr(method=method) return corr def compute_correlation_matrix_for_variables( df: pd.DataFrame, variables: Sequence[Variable], *, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.DataFrame: """ Calcule la matrice de corrélation pour un sous-ensemble de variables, dans un ordre bien défini. Paramètres ---------- df : DataFrame contenant les colonnes à analyser. variables : Séquence de Variable décrivant les colonnes à prendre en compte. method : Méthode de corrélation pandas (pearson, spearman, ...). Retour ------ DataFrame : Matrice de corrélation, index et colonnes dans le même ordre que `variables`, avec les colonnes pandas correspondant aux noms de colonnes du DataFrame (ex: "temperature", "humidity", ...). """ columns = [v.column for v in variables] missing = [c for c in columns if c not in df.columns] if missing: raise KeyError(f"Colonnes manquantes dans le DataFrame : {missing!r}") numeric_df = df[columns].astype(float) corr = numeric_df.corr(method=method) # On s'assure de l'ordre corr = corr.loc[columns, columns] return corr def compute_lagged_correlation( df: pd.DataFrame, var_x: Variable, var_y: Variable, *, max_lag_minutes: int = 360, step_minutes: int = 10, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.DataFrame: """ Calcule la corrélation entre deux variables pour une série de décalages temporels (lags). Convention : - lag > 0 : X "précède" Y de `lag` minutes. On corrèle X(t) avec Y(t + lag). - lag < 0 : Y "précède" X de |lag| minutes. On corrèle X(t) avec Y(t + lag), lag étant négatif. Implémentation : - On utilise un DataFrame avec les deux colonnes, puis on applique un `shift` sur Y. """ if var_x.column not in df.columns or var_y.column not in df.columns: raise KeyError("Les colonnes demandées ne sont pas présentes dans le DataFrame.") series_x = df[var_x.column] series_y = df[var_y.column] lags = range(-max_lag_minutes, max_lag_minutes + 1, step_minutes) results: list[tuple[int, float]] = [] for lag in lags: # Y décalé de -lag : pour lag positif, on corrèle X(t) à Y(t + lag) shifted_y = series_y.shift(-lag) pair = pd.concat([series_x, shifted_y], axis=1).dropna() if pair.empty: corr = np.nan else: corr = pair.iloc[:, 0].corr(pair.iloc[:, 1], method=method) results.append((lag, corr)) lag_df = pd.DataFrame(results, columns=["lag_minutes", "correlation"]) lag_df = lag_df.set_index("lag_minutes") return lag_df def _ensure_datetime_index(df: pd.DataFrame) -> pd.DatetimeIndex: if not isinstance(df.index, pd.DatetimeIndex): raise TypeError("Cette fonction nécessite un DataFrame indexé par le temps.") return df.index @dataclass class DiurnalCycleStats: mean: pd.DataFrame median: pd.DataFrame quantile_low: pd.DataFrame | None quantile_high: pd.DataFrame | None quantile_low_level: float | None = None quantile_high_level: float | None = None @dataclass class BinnedStatistics: centers: np.ndarray intervals: pd.IntervalIndex counts: pd.Series mean: pd.DataFrame median: pd.DataFrame quantile_low: pd.DataFrame | None quantile_high: pd.DataFrame | None quantile_low_level: float | None = None quantile_high_level: float | None = None def compute_rolling_correlation_series( df: pd.DataFrame, var_x: Variable, var_y: Variable, *, window_minutes: int, min_valid_fraction: float = 0.6, step_minutes: int | None = None, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.Series: """ Calcule la corrélation glissante X/Y sur une fenêtre temporelle. Retourne une série indexée par l'instant de fin de fenêtre. """ if not 0 < min_valid_fraction <= 1: raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].") for col in (var_x.column, var_y.column): if col not in df.columns: raise KeyError(f"Colonne absente du DataFrame : {col}") _ensure_datetime_index(df) pair = df[[var_x.column, var_y.column]].dropna().sort_index() if pair.empty: return pd.Series(dtype=float, name=f"{var_x.key}→{var_y.key}") window = f"{window_minutes}min" min_periods = max(1, int(window_minutes * min_valid_fraction)) if method not in {"pearson"}: raise NotImplementedError( "Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'." ) rolling_corr = pair[var_x.column].rolling( window=window, min_periods=min_periods, ).corr(pair[var_y.column]) rolling_corr = rolling_corr.dropna() rolling_corr.name = f"{var_x.key}→{var_y.key}" if step_minutes and step_minutes > 1: rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna() return rolling_corr def compute_rolling_correlations_for_pairs( df: pd.DataFrame, pairs: Sequence[tuple[Variable, Variable]], *, window_minutes: int, min_valid_fraction: float = 0.6, step_minutes: int | None = None, method: Literal["pearson", "spearman"] = "pearson", ) -> pd.DataFrame: """ Calcule les corrélations glissantes pour plusieurs paires et aligne les résultats dans un DataFrame (index temps, colonnes = 'x→y'). """ series_list: list[pd.Series] = [] for var_x, var_y in pairs: corr = compute_rolling_correlation_series( df=df, var_x=var_x, var_y=var_y, window_minutes=window_minutes, min_valid_fraction=min_valid_fraction, step_minutes=step_minutes, method=method, ) if not corr.empty: series_list.append(corr) if not series_list: return pd.DataFrame() result = pd.concat(series_list, axis=1) result = result.sort_index() return result def _infer_time_step(index: pd.DatetimeIndex) -> pd.Timedelta: diffs = index.to_series().diff().dropna() if diffs.empty: return pd.Timedelta(minutes=1) return diffs.median() def detect_threshold_events( series: pd.Series, *, threshold: float, min_duration: pd.Timedelta, min_gap: pd.Timedelta, ) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """ Détecte des événements où `series > threshold` (après remplissage des NaN par False) durant au moins `min_duration`. Les événements séparés d'un intervalle < min_gap sont fusionnés. """ if not isinstance(series.index, pd.DatetimeIndex): raise TypeError("series doit être indexée par le temps.") mask = (series > threshold).fillna(False) if not mask.any(): return [] groups = (mask != mask.shift()).cumsum() time_step = _infer_time_step(series.index) raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = [] for group_id, group_mask in mask.groupby(groups): if not group_mask.iloc[0]: continue start = group_mask.index[0] end = group_mask.index[-1] + time_step duration = end - start if duration >= min_duration: raw_events.append((start, end)) if not raw_events: return [] merged: list[tuple[pd.Timestamp, pd.Timestamp]] = [] for start, end in raw_events: if not merged: merged.append((start, end)) continue prev_start, prev_end = merged[-1] if start - prev_end < min_gap: merged[-1] = (prev_start, max(prev_end, end)) else: merged.append((start, end)) return merged def build_event_aligned_segments( df: pd.DataFrame, events: Sequence[tuple[pd.Timestamp, pd.Timestamp]], columns: Sequence[str], *, window_before_minutes: int, window_after_minutes: int, resample_minutes: int = 1, ) -> pd.DataFrame: """ Extrait, pour chaque événement, les séries centrées sur son début et retourne un DataFrame MultiIndex (event_id, offset_minutes). """ if not events: return pd.DataFrame(columns=columns) index = _ensure_datetime_index(df) data = df[columns].sort_index() freq = pd.Timedelta(minutes=resample_minutes) if resample_minutes > 1: data = data.resample(freq).mean() before = pd.Timedelta(minutes=window_before_minutes) after = pd.Timedelta(minutes=window_after_minutes) segments: list[pd.DataFrame] = [] for event_id, (start, _end) in enumerate(events): window_start = start - before window_end = start + after window_index = pd.date_range(window_start, window_end, freq=freq) segment = data.reindex(window_index) if segment.empty: continue offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float) multi_index = pd.MultiIndex.from_arrays( [np.full(len(segment), event_id), offsets], names=["event_id", "offset_minutes"], ) segment.index = multi_index segments.append(segment) if not segments: return pd.DataFrame(columns=columns) aligned = pd.concat(segments) return aligned def compute_diurnal_cycle_statistics( df: pd.DataFrame, variables: Sequence[Variable], *, quantiles: tuple[float, float] | None = (0.25, 0.75), ) -> DiurnalCycleStats: """ Agrège les variables par heure locale pour visualiser un cycle diurne moyen. """ _ensure_datetime_index(df) columns = [v.column for v in variables] grouped = df[columns].groupby(df.index.hour) mean_df = grouped.mean() median_df = grouped.median() quantile_low_df: pd.DataFrame | None = None quantile_high_df: pd.DataFrame | None = None q_low = q_high = None if quantiles is not None: q_low, q_high = quantiles if q_low is not None: quantile_low_df = grouped.quantile(q_low) if q_high is not None: quantile_high_df = grouped.quantile(q_high) return DiurnalCycleStats( mean=mean_df, median=median_df, quantile_low=quantile_low_df, quantile_high=quantile_high_df, quantile_low_level=q_low, quantile_high_level=q_high, ) def _format_speed_bin_labels(speed_bins: Sequence[float]) -> list[str]: labels: list[str] = [] for i in range(len(speed_bins) - 1): low = speed_bins[i] high = speed_bins[i + 1] if np.isinf(high): labels.append(f"≥{low:g}") else: labels.append(f"{low:g}–{high:g}") return labels def compute_wind_rose_distribution( df: pd.DataFrame, *, direction_sector_size: int = 30, speed_bins: Sequence[float] = (0, 10, 20, 30, 50, float("inf")), ) -> tuple[pd.DataFrame, list[str], float]: """ Regroupe la distribution vent/direction en secteurs angulaires et classes de vitesse. Retourne un DataFrame indexé par le début du secteur (en degrés) et colonnes = classes de vitesse (%). """ if direction_sector_size <= 0 or direction_sector_size > 180: raise ValueError("direction_sector_size doit être compris entre 1 et 180 degrés.") if "wind_speed" not in df.columns or "wind_direction" not in df.columns: raise KeyError("Le DataFrame doit contenir 'wind_speed' et 'wind_direction'.") data = df[["wind_speed", "wind_direction"]].dropna() if data.empty: return pd.DataFrame(), [], float(direction_sector_size) n_sectors = int(360 / direction_sector_size) direction = data["wind_direction"].to_numpy(dtype=float) % 360.0 sector_indices = np.floor(direction / direction_sector_size).astype(int) % n_sectors bins = list(speed_bins) if not np.isinf(bins[-1]): bins.append(float("inf")) labels = _format_speed_bin_labels(bins) speed_categories = pd.cut( data["wind_speed"], bins=bins, right=False, include_lowest=True, labels=labels, ) counts = ( pd.crosstab(sector_indices, speed_categories) .reindex(range(n_sectors), fill_value=0) .reindex(columns=labels, fill_value=0) ) total = counts.values.sum() frequencies = counts / total * 100.0 if total > 0 else counts.astype(float) frequencies.index = frequencies.index * direction_sector_size return frequencies, labels, float(direction_sector_size) def compute_daily_rainfall_totals( df: pd.DataFrame, *, rate_column: str = "rain_rate", ) -> pd.DataFrame: """ Convertit un taux de pluie (mm/h) en cumuls journaliers et cumulés. """ _ensure_datetime_index(df) if rate_column not in df.columns: raise KeyError(f"Colonne absente : {rate_column}") series = df[rate_column].fillna(0.0).sort_index() if series.empty: return pd.DataFrame(columns=["daily_total", "cumulative_total"]) time_step = _infer_time_step(series.index) diffs = series.index.to_series().diff() diffs = diffs.fillna(time_step) hours = diffs.dt.total_seconds() / 3600.0 rainfall_mm = series.to_numpy(dtype=float) * hours.to_numpy(dtype=float) rainfall_series = pd.Series(rainfall_mm, index=series.index) daily_totals = rainfall_series.resample("1D").sum() cumulative = daily_totals.cumsum() result = pd.DataFrame( { "daily_total": daily_totals, "cumulative_total": cumulative, } ) return result def compute_binned_statistics( df: pd.DataFrame, *, bin_source_column: str, target_columns: Sequence[str], bins: Sequence[float] | np.ndarray, min_count: int = 30, quantiles: tuple[float, float] | None = (0.25, 0.75), ) -> BinnedStatistics: """ Calcule des statistiques (mean/median/quantiles) pour plusieurs colonnes en regroupant les données selon des intervalles définis sur une colonne source. """ if bin_source_column not in df.columns: raise KeyError(f"Colonne source absente : {bin_source_column}") missing_targets = [col for col in target_columns if col not in df.columns] if missing_targets: raise KeyError(f"Colonnes cibles absentes : {missing_targets!r}") subset_cols = [bin_source_column, *target_columns] data = df[subset_cols].dropna(subset=[bin_source_column]) if data.empty: empty_interval_index = pd.IntervalIndex([]) empty_df = pd.DataFrame(columns=target_columns) empty_counts = pd.Series(dtype=int) return BinnedStatistics( centers=np.array([]), intervals=empty_interval_index, counts=empty_counts, mean=empty_df, median=empty_df, quantile_low=None, quantile_high=None, ) categories = pd.cut(data[bin_source_column], bins=bins, include_lowest=True) grouped = data.groupby(categories, observed=False) counts = grouped.size() valid_mask = counts >= max(1, min_count) valid_intervals = counts.index[valid_mask] if len(valid_intervals) == 0: empty_interval_index = pd.IntervalIndex([]) empty_df = pd.DataFrame(columns=target_columns) empty_counts = pd.Series(dtype=int) return BinnedStatistics( centers=np.array([]), intervals=empty_interval_index, counts=empty_counts, mean=empty_df, median=empty_df, quantile_low=None, quantile_high=None, ) interval_index = pd.IntervalIndex(valid_intervals) mean_df = grouped[target_columns].mean().loc[interval_index] median_df = grouped[target_columns].median().loc[interval_index] q_low = q_high = None quantile_low_df: pd.DataFrame | None = None quantile_high_df: pd.DataFrame | None = None if quantiles is not None: q_low, q_high = quantiles if q_low is not None: quantile_low_df = grouped[target_columns].quantile(q_low).loc[interval_index] if q_high is not None: quantile_high_df = grouped[target_columns].quantile(q_high).loc[interval_index] centers = np.array([interval.mid for interval in interval_index]) filtered_counts = counts.loc[interval_index] return BinnedStatistics( centers=centers, intervals=interval_index, counts=filtered_counts, mean=mean_df, median=median_df, quantile_low=quantile_low_df, quantile_high=quantile_high_df, quantile_low_level=q_low, quantile_high_level=q_high, ) def compute_rainfall_by_season( df: pd.DataFrame, *, rate_column: str = "rain_rate", season_column: str = "season", ) -> pd.DataFrame: """ Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses. """ _ensure_datetime_index(df) for col in (rate_column, season_column): if col not in df.columns: raise KeyError(f"Colonne absente : {col}") data = df[[rate_column, season_column]].copy() data[rate_column] = data[rate_column].fillna(0.0) data = data.dropna(subset=[season_column]) if data.empty: return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float) time_step = _infer_time_step(data.index) diffs = data.index.to_series().diff().fillna(time_step) hours = diffs.dt.total_seconds() / 3600.0 rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float) data["rainfall_mm"] = rainfall_mm data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float) agg = data.groupby(season_column).agg( total_rain_mm=("rainfall_mm", "sum"), rainy_hours=("rainy_hours", "sum"), ) order = [season for season in SEASON_LABELS if season in agg.index] agg = agg.loc[order] return agg def filter_by_condition( df: pd.DataFrame, *, condition: pd.Series, ) -> pd.DataFrame: """ Renvoie une copie filtrée du DataFrame selon une condition booleenne alignée. """ mask = condition.reindex(df.index) mask = mask.fillna(False) return df.loc[mask] def compute_monthly_climatology( df: pd.DataFrame, *, columns: Sequence[str], ) -> pd.DataFrame: """ Moyenne par mois (1–12) pour les colonnes fournies. """ _ensure_datetime_index(df) missing = [col for col in columns if col not in df.columns] if missing: raise KeyError(f"Colonnes absentes : {missing}") grouped = df[list(columns)].groupby(df.index.month).mean() grouped = grouped.reindex(MONTH_ORDER) grouped.index.name = "month" return grouped def compute_monthly_means( df: pd.DataFrame, *, columns: Sequence[str], ) -> pd.DataFrame: """ Moyennes calendaire par mois (indexé sur la fin de mois). """ _ensure_datetime_index(df) missing = [col for col in columns if col not in df.columns] if missing: raise KeyError(f"Colonnes absentes : {missing}") monthly = df[list(columns)].resample("1ME").mean() return monthly.dropna(how="all") def compute_seasonal_hourly_profile( df: pd.DataFrame, *, value_column: str, season_column: str = "season", ) -> pd.DataFrame: """ Retourne une matrice (heures x saisons) contenant la moyenne d'une variable. """ _ensure_datetime_index(df) for col in (value_column, season_column): if col not in df.columns: raise KeyError(f"Colonne absente : {col}") subset = df[[value_column, season_column]].dropna() if subset.empty: return pd.DataFrame(index=range(24)) grouped = subset.groupby([season_column, subset.index.hour])[value_column].mean() pivot = grouped.unstack(season_column) pivot = pivot.reindex(index=range(24)) order = [season for season in SEASON_LABELS if season in pivot.columns] if order: pivot = pivot[order] pivot.index.name = "hour" return pivot def compute_monthly_daylight_hours( df: pd.DataFrame, *, illuminance_column: str = "illuminance", threshold_lux: float = 1000.0, ) -> pd.Series: """ Calcule la durée moyenne de luminosité (> threshold_lux) par mois (en heures par jour). """ _ensure_datetime_index(df) if illuminance_column not in df.columns: raise KeyError(f"Colonne absente : {illuminance_column}") subset = df[[illuminance_column]].dropna() if subset.empty: return pd.Series(dtype=float) time_step = _infer_time_step(subset.index) hours_per_step = time_step.total_seconds() / 3600.0 daylight_flag = (subset[illuminance_column] >= threshold_lux).astype(float) daylight_hours = daylight_flag * hours_per_step daily_hours = daylight_hours.resample("1D").sum() monthly_avg = daily_hours.resample("1ME").mean() return monthly_avg.dropna() def compute_mean_wind_components( df: pd.DataFrame, *, freq: str = "1M", ) -> pd.DataFrame: """ Calcule les composantes zonale (u) et méridienne (v) du vent pour une fréquence donnée. Retourne également la vitesse moyenne. """ if "wind_speed" not in df.columns or "wind_direction" not in df.columns: raise KeyError("Les colonnes 'wind_speed' et 'wind_direction' sont requises.") _ensure_datetime_index(df) subset = df[["wind_speed", "wind_direction"]].dropna() if subset.empty: return pd.DataFrame(columns=["u", "v", "speed"]) radians = np.deg2rad(subset["wind_direction"].to_numpy(dtype=float)) speed = subset["wind_speed"].to_numpy(dtype=float) u = speed * np.sin(radians) * -1 # composante est-ouest (positive vers l'est) v = speed * np.cos(radians) * -1 # composante nord-sud (positive vers le nord) vector_df = pd.DataFrame( { "u": u, "v": v, "speed": speed, }, index=subset.index, ) actual_freq = "1ME" if freq == "1M" else freq grouped = vector_df.resample(actual_freq).mean() return grouped.dropna(how="all")