1
donnees_meteo/meteo/analysis/statistics.py
2025-11-18 09:01:34 +01:00

141 lines
4.4 KiB
Python

"""Statistiques descriptives utilisées par les tracés (cycle diurne, regroupements par bins)."""
from __future__ import annotations
from typing import Sequence
import numpy as np
import pandas as pd
from meteo.variables import Variable
from .core import BinnedStatistics, DiurnalCycleStats, _ensure_datetime_index
__all__ = ['compute_diurnal_cycle_statistics', 'compute_binned_statistics']
def compute_diurnal_cycle_statistics(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> DiurnalCycleStats:
"""
Agrège les variables par heure locale pour visualiser un cycle diurne moyen.
"""
_ensure_datetime_index(df)
columns = [v.column for v in variables]
grouped = df[columns].groupby(df.index.hour)
mean_df = grouped.mean()
median_df = grouped.median()
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
q_low = q_high = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped.quantile(q_low)
if q_high is not None:
quantile_high_df = grouped.quantile(q_high)
return DiurnalCycleStats(
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def compute_binned_statistics(
df: pd.DataFrame,
*,
bin_source_column: str,
target_columns: Sequence[str],
bins: Sequence[float] | np.ndarray,
min_count: int = 30,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> BinnedStatistics:
"""
Calcule des statistiques (mean/median/quantiles) pour plusieurs colonnes
en regroupant les données selon des intervalles définis sur une colonne source.
"""
if bin_source_column not in df.columns:
raise KeyError(f"Colonne source absente : {bin_source_column}")
missing_targets = [col for col in target_columns if col not in df.columns]
if missing_targets:
raise KeyError(f"Colonnes cibles absentes : {missing_targets!r}")
subset_cols = [bin_source_column, *target_columns]
data = df[subset_cols].dropna(subset=[bin_source_column])
if data.empty:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
categories = pd.cut(data[bin_source_column], bins=bins, include_lowest=True)
grouped = data.groupby(categories, observed=False)
counts = grouped.size()
valid_mask = counts >= max(1, min_count)
valid_intervals = counts.index[valid_mask]
if len(valid_intervals) == 0:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
interval_index = pd.IntervalIndex(valid_intervals)
mean_df = grouped[target_columns].mean().loc[interval_index]
median_df = grouped[target_columns].median().loc[interval_index]
q_low = q_high = None
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped[target_columns].quantile(q_low).loc[interval_index]
if q_high is not None:
quantile_high_df = grouped[target_columns].quantile(q_high).loc[interval_index]
centers = np.array([interval.mid for interval in interval_index])
filtered_counts = counts.loc[interval_index]
return BinnedStatistics(
centers=centers,
intervals=interval_index,
counts=filtered_counts,
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)