1

Visualisations saisonnières et cumul de pluie

This commit is contained in:
2025-11-17 22:30:31 +01:00
parent 01cf686af3
commit 45b6beac98
8 changed files with 340 additions and 1 deletions

View File

@@ -8,6 +8,7 @@ import numpy as np
import pandas as pd
from .variables import Variable
from .season import SEASON_LABELS
def compute_correlation_matrix(
@@ -559,3 +560,42 @@ def compute_binned_statistics(
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def compute_rainfall_by_season(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
season_column: str = "season",
) -> pd.DataFrame:
"""
Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses.
"""
_ensure_datetime_index(df)
for col in (rate_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
data = df[[rate_column, season_column]].copy()
data[rate_column] = data[rate_column].fillna(0.0)
data = data.dropna(subset=[season_column])
if data.empty:
return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float)
time_step = _infer_time_step(data.index)
diffs = data.index.to_series().diff().fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float)
data["rainfall_mm"] = rainfall_mm
data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float)
agg = data.groupby(season_column).agg(
total_rain_mm=("rainfall_mm", "sum"),
rainy_hours=("rainy_hours", "sum"),
)
order = [season for season in SEASON_LABELS if season in agg.index]
agg = agg.loc[order]
return agg

View File

@@ -12,6 +12,7 @@ import numpy as np
import pandas as pd
from .analysis import DiurnalCycleStats, BinnedStatistics
from .season import SEASON_LABELS
from .variables import Variable
@@ -596,6 +597,81 @@ def plot_diurnal_cycle(
return output_path.resolve()
def plot_seasonal_boxplots(
df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
season_column: str = "season",
season_order: Sequence[str] | None = None,
title: str | None = None,
) -> Path:
"""
Trace des boxplots par saison pour une sélection de variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if season_column not in df.columns:
raise KeyError(f"Colonne saison absente : {season_column}")
available = df[season_column].dropna().unique()
if season_order is None:
season_order = [season for season in SEASON_LABELS if season in available]
else:
season_order = [season for season in season_order if season in available]
if not season_order:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Aucune donnée saisonnière disponible.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
colors = plt.get_cmap("Set3")(np.linspace(0.2, 0.8, len(season_order)))
labels = [season.capitalize() for season in season_order]
for ax, var in zip(axes, variables):
data = [
df.loc[df[season_column] == season, var.column].dropna().to_numpy()
for season in season_order
]
if not any(len(arr) > 0 for arr in data):
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
box = ax.boxplot(
data,
labels=labels,
showfliers=False,
patch_artist=True,
)
for patch, color in zip(box["boxes"], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Saison")
if title:
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.95])
else:
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_binned_profiles(
stats: BinnedStatistics,
variables: Sequence[Variable],
@@ -747,3 +823,69 @@ def plot_daily_rainfall_hyetograph(
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_rainfall_by_season(
rainfall_df: pd.DataFrame,
output_path: str | Path,
*,
title: str = "Pluie cumulée par saison",
) -> Path:
"""
Affiche la pluie cumulée par saison ainsi que le nombre d'heures pluvieuses.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if rainfall_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de pluie saisonnière.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
seasons = rainfall_df.index.tolist()
x = np.arange(len(seasons))
totals = rainfall_df["total_rain_mm"].to_numpy(dtype=float)
fig, ax1 = plt.subplots(figsize=(9, 4))
bars = ax1.bar(x, totals, color="tab:blue", alpha=0.7, label="Pluie cumulée")
ax1.set_ylabel("Pluie cumulée (mm)")
ax1.set_xlabel("Saison")
ax1.set_xticks(x)
ax1.set_xticklabels([season.capitalize() for season in seasons])
ax1.grid(True, axis="y", linestyle=":", alpha=0.5)
for rect, value in zip(bars, totals):
height = rect.get_height()
ax1.text(rect.get_x() + rect.get_width() / 2, height, f"{value:.0f}", ha="center", va="bottom", fontsize=8)
lines = []
labels = []
if "rainy_hours" in rainfall_df.columns:
ax2 = ax1.twinx()
rainy_hours = rainfall_df["rainy_hours"].to_numpy(dtype=float)
line = ax2.plot(
x,
rainy_hours,
color="tab:red",
marker="o",
label="Heures pluvieuses",
)[0]
ax2.set_ylabel("Heures pluvieuses")
lines.append(line)
labels.append("Heures pluvieuses")
handles, lbls = ax1.get_legend_handles_labels()
handles.extend(lines)
lbls.extend(labels)
if handles:
ax1.legend(handles, lbls, loc="upper left")
ax1.set_title(title)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

84
meteo/season.py Normal file
View File

@@ -0,0 +1,84 @@
# meteo/season.py
from __future__ import annotations
from typing import Iterable, Sequence
import numpy as np
import pandas as pd
SEASON_LABELS = np.array(["winter", "spring", "summer", "autumn"])
MONTH_TO_SEASON_INDEX = {
12: 0,
1: 0,
2: 0,
3: 1,
4: 1,
5: 1,
6: 2,
7: 2,
8: 2,
9: 3,
10: 3,
11: 3,
}
def _ensure_datetime_index(index: pd.Index) -> pd.DatetimeIndex:
if not isinstance(index, pd.DatetimeIndex):
raise TypeError("Cette fonction nécessite un DatetimeIndex.")
return index
def _season_indices_for_month(months: np.ndarray, hemisphere: str) -> np.ndarray:
base_indices = np.vectorize(MONTH_TO_SEASON_INDEX.get)(months)
if hemisphere == "south":
return (base_indices + 2) % len(SEASON_LABELS)
return base_indices
def compute_season_series(
index: pd.Index,
*,
hemisphere: str = "north",
column_name: str = "season",
) -> pd.Series:
"""
Retourne une série catégorielle indiquant la saison météorologique pour chaque timestamp.
"""
hemisphere = hemisphere.lower()
if hemisphere not in {"north", "south"}:
raise ValueError("hemisphere doit valoir 'north' ou 'south'.")
dt_index = _ensure_datetime_index(index)
month_array = dt_index.month.to_numpy()
season_indices = _season_indices_for_month(month_array, hemisphere)
labels = SEASON_LABELS[season_indices]
return pd.Series(labels, index=dt_index, name=column_name)
def add_season_column(
df: pd.DataFrame,
*,
hemisphere: str = "north",
column_name: str = "season",
) -> pd.DataFrame:
"""
Ajoute une colonne 'season' (winter/spring/summer/autumn) au DataFrame.
"""
series = compute_season_series(df.index, hemisphere=hemisphere, column_name=column_name)
df[column_name] = series
return df
def sort_season_labels(
labels: Iterable[str],
*,
order: Sequence[str] | None = None,
) -> list[str]:
"""
Trie la liste fournie en respectant l'ordre saisonnier par défaut.
"""
reference = [str(season) for season in (order if order is not None else SEASON_LABELS)]
label_set = {str(label) for label in labels if label}
return [season for season in reference if season in label_set]