"""Conversions et agrégations des mesures de pluie.""" from __future__ import annotations import numpy as np import pandas as pd from meteo.season import SEASON_LABELS from .core import _ensure_datetime_index, _infer_time_step __all__ = ['compute_daily_rainfall_totals', 'compute_rainfall_by_season'] def compute_daily_rainfall_totals( df: pd.DataFrame, *, rate_column: str = "rain_rate", ) -> pd.DataFrame: """ Convertit un taux de pluie (mm/h) en cumuls journaliers et cumulés. """ _ensure_datetime_index(df) if rate_column not in df.columns: raise KeyError(f"Colonne absente : {rate_column}") series = df[rate_column].fillna(0.0).sort_index() if series.empty: return pd.DataFrame(columns=["daily_total", "cumulative_total"]) time_step = _infer_time_step(series.index) diffs = series.index.to_series().diff() diffs = diffs.fillna(time_step) hours = diffs.dt.total_seconds() / 3600.0 rainfall_mm = series.to_numpy(dtype=float) * hours.to_numpy(dtype=float) rainfall_series = pd.Series(rainfall_mm, index=series.index) daily_totals = rainfall_series.resample("1D").sum() cumulative = daily_totals.cumsum() result = pd.DataFrame( { "daily_total": daily_totals, "cumulative_total": cumulative, } ) return result def compute_rainfall_by_season( df: pd.DataFrame, *, rate_column: str = "rain_rate", season_column: str = "season", ) -> pd.DataFrame: """ Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses. """ _ensure_datetime_index(df) for col in (rate_column, season_column): if col not in df.columns: raise KeyError(f"Colonne absente : {col}") data = df[[rate_column, season_column]].copy() data[rate_column] = data[rate_column].fillna(0.0) data = data.dropna(subset=[season_column]) if data.empty: return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float) time_step = _infer_time_step(data.index) diffs = data.index.to_series().diff().fillna(time_step) hours = diffs.dt.total_seconds() / 3600.0 rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float) data["rainfall_mm"] = rainfall_mm data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float) agg = data.groupby(season_column).agg( total_rain_mm=("rainfall_mm", "sum"), rainy_hours=("rainy_hours", "sum"), ) order = [season for season in SEASON_LABELS if season in agg.index] agg = agg.loc[order] return agg