1

Ajout de visualisations classiques

This commit is contained in:
2025-11-17 22:07:28 +01:00
parent fd42a692d9
commit 268d5d65ec
10 changed files with 515 additions and 0 deletions

View File

@@ -1,6 +1,7 @@
# meteo/analysis.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Sequence
import numpy as np
@@ -123,6 +124,16 @@ def _ensure_datetime_index(df: pd.DataFrame) -> pd.DatetimeIndex:
return df.index
@dataclass
class DiurnalCycleStats:
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
def compute_rolling_correlation_series(
df: pd.DataFrame,
var_x: Variable,
@@ -311,3 +322,137 @@ def build_event_aligned_segments(
aligned = pd.concat(segments)
return aligned
def compute_diurnal_cycle_statistics(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> DiurnalCycleStats:
"""
Agrège les variables par heure locale pour visualiser un cycle diurne moyen.
"""
_ensure_datetime_index(df)
columns = [v.column for v in variables]
grouped = df[columns].groupby(df.index.hour)
mean_df = grouped.mean()
median_df = grouped.median()
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
q_low = q_high = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped.quantile(q_low)
if q_high is not None:
quantile_high_df = grouped.quantile(q_high)
return DiurnalCycleStats(
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def _format_speed_bin_labels(speed_bins: Sequence[float]) -> list[str]:
labels: list[str] = []
for i in range(len(speed_bins) - 1):
low = speed_bins[i]
high = speed_bins[i + 1]
if np.isinf(high):
labels.append(f"{low:g}")
else:
labels.append(f"{low:g}{high:g}")
return labels
def compute_wind_rose_distribution(
df: pd.DataFrame,
*,
direction_sector_size: int = 30,
speed_bins: Sequence[float] = (0, 10, 20, 30, 50, float("inf")),
) -> tuple[pd.DataFrame, list[str], float]:
"""
Regroupe la distribution vent/direction en secteurs angulaires et classes de vitesse.
Retourne un DataFrame indexé par le début du secteur (en degrés) et colonnes = classes de vitesse (%).
"""
if direction_sector_size <= 0 or direction_sector_size > 180:
raise ValueError("direction_sector_size doit être compris entre 1 et 180 degrés.")
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Le DataFrame doit contenir 'wind_speed' et 'wind_direction'.")
data = df[["wind_speed", "wind_direction"]].dropna()
if data.empty:
return pd.DataFrame(), [], float(direction_sector_size)
n_sectors = int(360 / direction_sector_size)
direction = data["wind_direction"].to_numpy(dtype=float) % 360.0
sector_indices = np.floor(direction / direction_sector_size).astype(int) % n_sectors
bins = list(speed_bins)
if not np.isinf(bins[-1]):
bins.append(float("inf"))
labels = _format_speed_bin_labels(bins)
speed_categories = pd.cut(
data["wind_speed"],
bins=bins,
right=False,
include_lowest=True,
labels=labels,
)
counts = (
pd.crosstab(sector_indices, speed_categories)
.reindex(range(n_sectors), fill_value=0)
.reindex(columns=labels, fill_value=0)
)
total = counts.values.sum()
frequencies = counts / total * 100.0 if total > 0 else counts.astype(float)
frequencies.index = frequencies.index * direction_sector_size
return frequencies, labels, float(direction_sector_size)
def compute_daily_rainfall_totals(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
) -> pd.DataFrame:
"""
Convertit un taux de pluie (mm/h) en cumuls journaliers et cumulés.
"""
_ensure_datetime_index(df)
if rate_column not in df.columns:
raise KeyError(f"Colonne absente : {rate_column}")
series = df[rate_column].fillna(0.0).sort_index()
if series.empty:
return pd.DataFrame(columns=["daily_total", "cumulative_total"])
time_step = _infer_time_step(series.index)
diffs = series.index.to_series().diff()
diffs = diffs.fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = series.to_numpy(dtype=float) * hours.to_numpy(dtype=float)
rainfall_series = pd.Series(rainfall_mm, index=series.index)
daily_totals = rainfall_series.resample("1D").sum()
cumulative = daily_totals.cumsum()
result = pd.DataFrame(
{
"daily_total": daily_totals,
"cumulative_total": cumulative,
}
)
return result

View File

@@ -6,9 +6,12 @@ from typing import Callable, Sequence
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
from matplotlib.ticker import FuncFormatter
import matplotlib.dates as mdates
import numpy as np
import pandas as pd
from .analysis import DiurnalCycleStats
from .variables import Variable
@@ -473,3 +476,180 @@ def plot_event_composite(
plt.close(fig)
return output_path.resolve()
def plot_wind_rose(
frequencies: pd.DataFrame,
speed_bin_labels: Sequence[str],
output_path: str | Path,
*,
sector_size_deg: float,
cmap: str = "viridis",
) -> Path:
"""
Trace une rose des vents empilée par classes de vitesses (en % du temps).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if frequencies.empty:
fig, ax = plt.subplots(subplot_kw={"projection": "polar"})
ax.text(0.5, 0.5, "Données de vent insuffisantes.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots(subplot_kw={"projection": "polar"}, figsize=(6, 6))
cmap_obj = plt.get_cmap(cmap, len(speed_bin_labels))
colors = cmap_obj(np.linspace(0.2, 0.95, len(speed_bin_labels)))
angles = np.deg2rad(frequencies.index.to_numpy(dtype=float) + sector_size_deg / 2.0)
width = np.deg2rad(sector_size_deg)
bottom = np.zeros_like(angles, dtype=float)
for label, color in zip(speed_bin_labels, colors):
values = frequencies[label].to_numpy(dtype=float)
bars = ax.bar(
angles,
values,
width=width,
bottom=bottom,
color=color,
edgecolor="white",
linewidth=0.5,
align="center",
)
bottom += values
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
ax.set_xticks(np.deg2rad(np.arange(0, 360, 45)))
ax.set_xticklabels(["N", "NE", "E", "SE", "S", "SO", "O", "NO"])
max_radius = np.max(bottom)
ax.set_ylim(0, max(max_radius * 1.1, 1))
ax.yaxis.set_major_formatter(FuncFormatter(lambda val, _pos: f"{val:.0f}%"))
ax.set_title("Rose des vents (fréquence en %)")
legend_handles = [
plt.Line2D([0], [0], color=color, linewidth=6, label=label) for label, color in zip(speed_bin_labels, colors)
]
ax.legend(
handles=legend_handles,
loc="lower center",
bbox_to_anchor=(0.5, -0.15),
ncol=2,
title="Vitesses (km/h)",
)
fig.tight_layout()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
def plot_diurnal_cycle(
stats: DiurnalCycleStats,
variables: Sequence[Variable],
output_path: str | Path,
) -> Path:
"""
Trace les cycles diurnes moyens (moyenne/médiane + quantiles).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
hours = stats.mean.index.to_numpy(dtype=float)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
col = var.column
ax.plot(hours, stats.mean[col], label="Moyenne", color="tab:blue")
ax.plot(hours, stats.median[col], label="Médiane", color="tab:orange", linestyle="--")
if stats.quantile_low is not None and stats.quantile_high is not None:
ax.fill_between(
hours,
stats.quantile_low[col],
stats.quantile_high[col],
color="tab:blue",
alpha=0.15,
label=(
f"Quantiles {int(stats.quantile_low_level * 100)}{int(stats.quantile_high_level * 100)}%"
if stats.quantile_low_level is not None and stats.quantile_high_level is not None
else "Quantiles"
),
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Heure locale")
axes[0].legend(loc="upper right")
axes[-1].set_xticks(range(0, 24, 2))
axes[-1].set_xlim(0, 23)
fig.suptitle("Cycle diurne moyen")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_daily_rainfall_hyetograph(
daily_rain: pd.DataFrame,
output_path: str | Path,
) -> Path:
"""
Affiche les cumuls quotidiens de pluie (barres) et le cumul annuel (ligne).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if daily_rain.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de précipitations disponibles.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax1 = plt.subplots(figsize=(12, 5))
ax1.bar(
daily_rain.index,
daily_rain["daily_total"],
width=0.8,
color="tab:blue",
alpha=0.7,
label="Pluie quotidienne",
)
ax1.set_ylabel("Pluie quotidienne (mm)")
ax1.set_xlabel("Date")
ax1.grid(True, axis="y", linestyle=":", alpha=0.5)
ax2 = ax1.twinx()
ax2.plot(
daily_rain.index,
daily_rain["cumulative_total"],
color="tab:red",
linewidth=2,
label="Cumul annuel",
)
ax2.set_ylabel("Pluie cumulée (mm)")
locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
ax1.xaxis.set_major_locator(locator)
ax1.xaxis.set_major_formatter(formatter)
lines_labels = [
(ax1.get_legend_handles_labels()),
(ax2.get_legend_handles_labels()),
]
lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]
ax1.legend(lines, labels, loc="upper left")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()