1
donnees_meteo/meteo/plots.py

1245 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# meteo/plots.py
from __future__ import annotations
import calendar
from pathlib import Path
from typing import Callable, Sequence
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
from matplotlib.ticker import FuncFormatter
import matplotlib.dates as mdates
import numpy as np
import pandas as pd
from .analysis import DiurnalCycleStats, BinnedStatistics, MONTH_ORDER
from .season import SEASON_LABELS
from .variables import Variable
def plot_scatter_pair(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
output_path: str | Path,
*,
sample_step: int = 10,
color_by_time: bool = True,
cmap: str = "viridis",
) -> Path:
"""
Trace un nuage de points (scatter) pour une paire de variables.
- On sous-échantillonne les données avec `sample_step` (par exemple,
1 point sur 10) pour éviter un graphique illisible.
- Si `color_by_time` vaut True et que l'index est temporel, les points
sont colorés du plus ancien (sombre) au plus récent (clair).
- Lorsque l'axe Y correspond à la direction du vent, on bascule sur
un graphique polaire plus adapté (0° = Nord, sens horaire) avec
un rayon normalisé : centre = valeur minimale, bord = maximale.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# On ne garde que les colonnes pertinentes et les lignes complètes
df_pair = df[[var_x.column, var_y.column]].dropna()
if sample_step > 1:
df_pair = df_pair.iloc[::sample_step, :]
use_polar = var_y.key == "wind_direction"
if use_polar:
fig, ax = plt.subplots(subplot_kw={"projection": "polar"})
else:
fig, ax = plt.subplots()
scatter_kwargs: dict = {"s": 5, "alpha": 0.5}
colorbar_meta: dict | None = None
if color_by_time and isinstance(df_pair.index, pd.DatetimeIndex):
idx = df_pair.index
timestamps = idx.view("int64")
time_span = np.ptp(timestamps)
norm = (
Normalize(vmin=timestamps.min(), vmax=timestamps.max())
if time_span > 0
else None
)
scatter_kwargs |= {"c": timestamps, "cmap": cmap}
if norm is not None:
scatter_kwargs["norm"] = norm
colorbar_meta = {
"index": idx,
"timestamps": timestamps,
"time_span": time_span,
}
if use_polar:
theta = np.deg2rad(df_pair[var_y.column].to_numpy(dtype=float) % 360.0)
radius_raw = df_pair[var_x.column].to_numpy(dtype=float)
if radius_raw.size == 0:
radius = radius_raw
value_min = value_max = float("nan")
else:
value_min = float(np.min(radius_raw))
value_max = float(np.max(radius_raw))
if np.isclose(value_min, value_max):
radius = np.zeros_like(radius_raw)
else:
radius = (radius_raw - value_min) / (value_max - value_min)
scatter = ax.scatter(theta, radius, **scatter_kwargs)
cardinal_angles = np.deg2rad(np.arange(0, 360, 45))
cardinal_labels = ["N", "NE", "E", "SE", "S", "SO", "O", "NO"]
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
ax.set_xticks(cardinal_angles)
ax.set_xticklabels(cardinal_labels)
if radius_raw.size > 0:
if np.isclose(value_min, value_max):
radial_positions = [0.0]
else:
radial_positions = np.linspace(0.0, 1.0, num=5).tolist()
if np.isclose(value_min, value_max):
actual_values = [value_min]
else:
actual_values = [
value_min + pos * (value_max - value_min)
for pos in radial_positions
]
ax.set_yticks(radial_positions)
ax.set_yticklabels([f"{val:.1f}" for val in actual_values])
ax.set_rlabel_position(225)
ax.set_ylim(0.0, 1.0)
unit_suffix = f" {var_x.unit}" if var_x.unit else ""
ax.text(
0.5,
-0.1,
f"Centre = {value_min:.1f}{unit_suffix}, bord = {value_max:.1f}{unit_suffix}",
transform=ax.transAxes,
ha="center",
va="top",
fontsize=8,
)
radial_label = f"{var_x.label} ({var_x.unit})" if var_x.unit else var_x.label
ax.set_ylabel(radial_label, labelpad=20)
else:
scatter = ax.scatter(
df_pair[var_x.column],
df_pair[var_y.column],
**scatter_kwargs,
)
if colorbar_meta is not None:
cbar = fig.colorbar(scatter, ax=ax)
idx = colorbar_meta["index"]
timestamps = colorbar_meta["timestamps"]
time_span = colorbar_meta["time_span"]
def _format_tick_label(ts: pd.Timestamp) -> str:
base = f"{ts.strftime('%Y-%m-%d')}\n{ts.strftime('%H:%M')}"
tz_name = ts.tzname()
return f"{base} ({tz_name})" if tz_name else base
if time_span > 0:
tick_datetimes = pd.date_range(start=idx.min(), end=idx.max(), periods=5)
tick_positions = tick_datetimes.view("int64")
tick_labels = [_format_tick_label(ts) for ts in tick_datetimes]
cbar.set_ticks(tick_positions)
cbar.set_ticklabels(tick_labels)
else:
cbar.set_ticks([timestamps[0]])
ts = idx[0]
cbar.set_ticklabels([_format_tick_label(ts)])
cbar.set_label("Temps (ancien → récent)")
if use_polar:
ax.set_title(f"{var_y.label} en fonction de {var_x.label}")
else:
ax.set_xlabel(f"{var_x.label} ({var_x.unit})")
ax.set_ylabel(f"{var_y.label} ({var_y.unit})")
ax.set_title(f"{var_y.label} en fonction de {var_x.label}")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_hexbin_with_third_variable(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
var_color: Variable,
output_path: str | Path,
*,
gridsize: int = 60,
mincnt: int = 5,
reduce_func: Callable[[np.ndarray], float] | None = None,
reduce_func_label: str | None = None,
cmap: str = "viridis",
) -> Path:
"""
Trace une carte de densité hexbin où la couleur encode une 3e variable.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
reduce_func = reduce_func or np.mean
df_xyz = df[[var_x.column, var_y.column, var_color.column]].dropna()
if df_xyz.empty:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Pas de données valides pour cette combinaison.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots()
hb = ax.hexbin(
df_xyz[var_x.column],
df_xyz[var_y.column],
C=df_xyz[var_color.column],
reduce_C_function=reduce_func,
gridsize=gridsize,
cmap=cmap,
mincnt=mincnt,
)
func_label = reduce_func_label or getattr(reduce_func, "__name__", "statistique")
colorbar_label = f"{func_label.capitalize()} de {var_color.label}"
cbar = fig.colorbar(hb, ax=ax)
cbar.set_label(colorbar_label)
ax.set_xlabel(f"{var_x.label} ({var_x.unit})")
ax.set_ylabel(f"{var_y.label} ({var_y.unit})")
ax.set_title(
f"{var_y.label} vs {var_x.label}\nCouleur : {func_label} de {var_color.label}"
)
ax.grid(False)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_lagged_correlation(
lag_df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
output_path: str | Path,
) -> Path:
"""
Trace la corrélation en fonction du lag (en minutes) entre deux variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
plt.figure()
plt.plot(lag_df.index, lag_df["correlation"])
plt.axvline(0, linestyle="--") # lag = 0
plt.xlabel("Décalage (minutes)\n(lag > 0 : X précède Y)")
plt.ylabel("Corrélation")
plt.title(f"Corrélation décalée : {var_x.label}{var_y.label}")
plt.grid(True)
plt.tight_layout()
plt.savefig(output_path, dpi=150)
plt.close()
return output_path.resolve()
def plot_correlation_heatmap(
corr: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
annotate: bool = True,
) -> Path:
"""
Trace une heatmap de la matrice de corrélation.
Paramètres
----------
corr :
Matrice de corrélation (index et colonnes doivent correspondre
aux noms de colonnes des variables).
variables :
Liste de Variable, dans l'ordre où elles doivent apparaître.
output_path :
Chemin du fichier image à écrire.
annotate :
Si True, affiche la valeur numérique dans chaque case.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
columns = [v.column for v in variables]
labels = [v.label for v in variables]
# On aligne la matrice sur l'ordre désiré
corr = corr.loc[columns, columns]
data = corr.to_numpy()
fig, ax = plt.subplots()
im = ax.imshow(data, vmin=-1.0, vmax=1.0)
# Ticks et labels
ax.set_xticks(np.arange(len(labels)))
ax.set_yticks(np.arange(len(labels)))
ax.set_xticklabels(labels, rotation=45, ha="right")
ax.set_yticklabels(labels)
# Axe en haut/bas selon préférence (ici on laisse en bas)
ax.set_title("Matrice de corrélation (coef. de Pearson)")
# Barre de couleur
cbar = plt.colorbar(im, ax=ax)
cbar.set_label("Corrélation")
# Annotation des cases
if annotate:
n = data.shape[0]
for i in range(n):
for j in range(n):
if i == j:
text = ""
else:
val = data[i, j]
if np.isnan(val):
text = ""
else:
text = f"{val:.2f}"
ax.text(
j,
i,
text,
ha="center",
va="center",
)
plt.tight_layout()
plt.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_rolling_correlation_heatmap(
rolling_corr: pd.DataFrame,
output_path: str | Path,
*,
cmap: str = "coolwarm",
vmin: float = -1.0,
vmax: float = 1.0,
time_tick_count: int = 6,
) -> Path:
"""
Visualise l'évolution de corrélations glissantes pour plusieurs paires.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if rolling_corr.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Aucune donnée de corrélation glissante.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
labels = list(rolling_corr.columns)
data = rolling_corr.to_numpy().T
height = max(3.0, 0.6 * len(labels))
fig, ax = plt.subplots(figsize=(10, height))
im = ax.imshow(data, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax)
ax.set_yticks(np.arange(len(labels)))
ax.set_yticklabels(labels)
if isinstance(rolling_corr.index, pd.DatetimeIndex):
times = rolling_corr.index
if len(times) > 1:
tick_idx = np.linspace(0, len(times) - 1, num=min(time_tick_count, len(times)), dtype=int)
else:
tick_idx = np.array([0])
tick_labels = [times[i].strftime("%Y-%m-%d\n%H:%M") for i in tick_idx]
else:
tick_idx = np.linspace(0, len(rolling_corr.index) - 1, num=min(time_tick_count, len(rolling_corr.index)), dtype=int)
tick_labels = [str(rolling_corr.index[i]) for i in tick_idx]
ax.set_xticks(tick_idx)
ax.set_xticklabels(tick_labels, rotation=30, ha="right")
ax.set_xlabel("Temps (fin de fenêtre)")
ax.set_ylabel("Paire de variables")
ax.set_title("Corrélations glissantes")
cbar = fig.colorbar(im, ax=ax)
cbar.set_label("Coefficient de corrélation")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_event_composite(
aligned_segments: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
quantiles: tuple[float, float] = (0.25, 0.75),
baseline_label: str = "Début de l'événement",
) -> Path:
"""
Trace les moyennes/médianes autour d'événements détectés avec éventail inter-quantiles.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if aligned_segments.empty:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Aucun événement aligné à tracer.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
if "offset_minutes" not in aligned_segments.index.names:
raise ValueError("aligned_segments doit avoir un niveau 'offset_minutes'.")
group = aligned_segments.groupby(level="offset_minutes")
mean_df = group.mean()
median_df = group.median()
q_low, q_high = quantiles
quantile_low = group.quantile(q_low) if q_low is not None else None
quantile_high = group.quantile(q_high) if q_high is not None else None
offsets = mean_df.index.to_numpy(dtype=float)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
col = var.column
ax.axvline(0, color="black", linestyle="--", linewidth=1, label=baseline_label)
ax.plot(offsets, mean_df[col], color="tab:blue", label="Moyenne")
ax.plot(offsets, median_df[col], color="tab:orange", linestyle="--", label="Médiane")
if quantile_low is not None and quantile_high is not None:
ax.fill_between(
offsets,
quantile_low[col],
quantile_high[col],
color="tab:blue",
alpha=0.2,
label=f"IQR {int(q_low*100)}{int(q_high*100)}%",
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Minutes autour de l'événement")
axes[0].legend(loc="upper right")
total_events = len(aligned_segments.index.get_level_values("event_id").unique())
fig.suptitle(f"Composites autour d'événements ({total_events} occurrences)")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_wind_rose(
frequencies: pd.DataFrame,
speed_bin_labels: Sequence[str],
output_path: str | Path,
*,
sector_size_deg: float,
cmap: str = "viridis",
) -> Path:
"""
Trace une rose des vents empilée par classes de vitesses (en % du temps).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if frequencies.empty:
fig, ax = plt.subplots(subplot_kw={"projection": "polar"})
ax.text(0.5, 0.5, "Données de vent insuffisantes.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots(subplot_kw={"projection": "polar"}, figsize=(6, 6))
cmap_obj = plt.get_cmap(cmap, len(speed_bin_labels))
colors = cmap_obj(np.linspace(0.2, 0.95, len(speed_bin_labels)))
angles = np.deg2rad(frequencies.index.to_numpy(dtype=float) + sector_size_deg / 2.0)
width = np.deg2rad(sector_size_deg)
bottom = np.zeros_like(angles, dtype=float)
for label, color in zip(speed_bin_labels, colors):
values = frequencies[label].to_numpy(dtype=float)
bars = ax.bar(
angles,
values,
width=width,
bottom=bottom,
color=color,
edgecolor="white",
linewidth=0.5,
align="center",
)
bottom += values
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
ax.set_xticks(np.deg2rad(np.arange(0, 360, 45)))
ax.set_xticklabels(["N", "NE", "E", "SE", "S", "SO", "O", "NO"])
max_radius = np.max(bottom)
ax.set_ylim(0, max(max_radius * 1.1, 1))
ax.yaxis.set_major_formatter(FuncFormatter(lambda val, _pos: f"{val:.0f}%"))
ax.set_title("Rose des vents (fréquence en %)")
legend_handles = [
plt.Line2D([0], [0], color=color, linewidth=6, label=label) for label, color in zip(speed_bin_labels, colors)
]
ax.legend(
handles=legend_handles,
loc="lower center",
bbox_to_anchor=(0.5, -0.15),
ncol=2,
title="Vitesses (km/h)",
)
fig.tight_layout()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
def plot_diurnal_cycle(
stats: DiurnalCycleStats,
variables: Sequence[Variable],
output_path: str | Path,
) -> Path:
"""
Trace les cycles diurnes moyens (moyenne/médiane + quantiles).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
hours = stats.mean.index.to_numpy(dtype=float)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
col = var.column
ax.plot(hours, stats.mean[col], label="Moyenne", color="tab:blue")
ax.plot(hours, stats.median[col], label="Médiane", color="tab:orange", linestyle="--")
if stats.quantile_low is not None and stats.quantile_high is not None:
ax.fill_between(
hours,
stats.quantile_low[col],
stats.quantile_high[col],
color="tab:blue",
alpha=0.15,
label=(
f"Quantiles {int(stats.quantile_low_level * 100)}{int(stats.quantile_high_level * 100)}%"
if stats.quantile_low_level is not None and stats.quantile_high_level is not None
else "Quantiles"
),
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Heure locale")
axes[0].legend(loc="upper right")
axes[-1].set_xticks(range(0, 24, 2))
axes[-1].set_xlim(0, 23)
fig.suptitle("Cycle diurne moyen")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_seasonal_boxplots(
df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
season_column: str = "season",
season_order: Sequence[str] | None = None,
title: str | None = None,
) -> Path:
"""
Trace des boxplots par saison pour une sélection de variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if season_column not in df.columns:
raise KeyError(f"Colonne saison absente : {season_column}")
available = df[season_column].dropna().unique()
if season_order is None:
season_order = [season for season in SEASON_LABELS if season in available]
else:
season_order = [season for season in season_order if season in available]
if not season_order:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Aucune donnée saisonnière disponible.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
colors = plt.get_cmap("Set3")(np.linspace(0.2, 0.8, len(season_order)))
labels = [season.capitalize() for season in season_order]
for ax, var in zip(axes, variables):
data = [
df.loc[df[season_column] == season, var.column].dropna().to_numpy()
for season in season_order
]
if not any(len(arr) > 0 for arr in data):
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
box = ax.boxplot(
data,
labels=labels,
showfliers=False,
patch_artist=True,
)
for patch, color in zip(box["boxes"], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Saison")
if title:
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.95])
else:
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_monthly_boxplots(
df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
) -> Path:
"""
Boxplots par mois (janvier → décembre) pour plusieurs variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("plot_monthly_boxplots nécessite un DatetimeIndex.")
month_labels = [calendar.month_abbr[m].capitalize() for m in MONTH_ORDER]
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(12, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
data = [
df.loc[df.index.month == month, var.column].dropna().to_numpy()
for month in MONTH_ORDER
]
if not any(len(arr) > 0 for arr in data):
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
box = ax.boxplot(
data,
labels=month_labels,
showfliers=False,
patch_artist=True,
)
colors = plt.get_cmap("Spectral")(np.linspace(0.2, 0.8, len(data)))
for patch, color in zip(box["boxes"], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Mois")
fig.suptitle("Distribution mensuelle")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_binned_profiles(
stats: BinnedStatistics,
variables: Sequence[Variable],
output_path: str | Path,
*,
xlabel: str,
title: str,
show_counts: bool = False,
) -> Path:
"""
Trace les statistiques agrégées d'une ou plusieurs variables en fonction de bins.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if stats.centers.size == 0:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Aucune donnée suffisante pour ces intervalles.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
base_axes = len(variables)
total_axes = base_axes + (1 if show_counts else 0)
fig, axes = plt.subplots(
total_axes,
1,
sharex=True,
figsize=(10, 3 * total_axes),
)
if total_axes == 1:
axes = [axes]
else:
axes = list(axes)
x_values = stats.centers
bin_widths = np.array([interval.length for interval in stats.intervals])
if show_counts:
count_ax = axes.pop(0)
count_ax.bar(
x_values,
stats.counts.to_numpy(dtype=float),
width=bin_widths,
color="lightgray",
edgecolor="gray",
align="center",
)
count_ax.set_ylabel("Nombre de points")
count_ax.grid(True, linestyle=":", alpha=0.4)
count_ax.set_title("Densité des observations par bin")
for ax, var in zip(axes, variables):
col = var.column
ax.plot(x_values, stats.mean[col], color="tab:blue", label="Moyenne")
ax.plot(x_values, stats.median[col], color="tab:orange", linestyle="--", label="Médiane")
if stats.quantile_low is not None and stats.quantile_high is not None:
ax.fill_between(
x_values,
stats.quantile_low[col],
stats.quantile_high[col],
color="tab:blue",
alpha=0.15,
label=(
f"Quantiles {int(stats.quantile_low_level * 100)}{int(stats.quantile_high_level * 100)}%"
if stats.quantile_low_level is not None and stats.quantile_high_level is not None
else "Quantiles"
),
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel(xlabel)
axes[0].legend(loc="upper right")
axes[-1].set_xlim(stats.intervals.left.min(), stats.intervals.right.max())
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_daily_rainfall_hyetograph(
daily_rain: pd.DataFrame,
output_path: str | Path,
) -> Path:
"""
Affiche les cumuls quotidiens de pluie (barres) et le cumul annuel (ligne).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if daily_rain.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de précipitations disponibles.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax1 = plt.subplots(figsize=(12, 5))
ax1.bar(
daily_rain.index,
daily_rain["daily_total"],
width=0.8,
color="tab:blue",
alpha=0.7,
label="Pluie quotidienne",
)
ax1.set_ylabel("Pluie quotidienne (mm)")
ax1.set_xlabel("Date")
ax1.grid(True, axis="y", linestyle=":", alpha=0.5)
ax2 = ax1.twinx()
ax2.plot(
daily_rain.index,
daily_rain["cumulative_total"],
color="tab:red",
linewidth=2,
label="Cumul annuel",
)
ax2.set_ylabel("Pluie cumulée (mm)")
locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
ax1.xaxis.set_major_locator(locator)
ax1.xaxis.set_major_formatter(formatter)
lines_labels = [
(ax1.get_legend_handles_labels()),
(ax2.get_legend_handles_labels()),
]
lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]
ax1.legend(lines, labels, loc="upper left")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_rainfall_by_season(
rainfall_df: pd.DataFrame,
output_path: str | Path,
*,
title: str = "Pluie cumulée par saison",
) -> Path:
"""
Affiche la pluie cumulée par saison ainsi que le nombre d'heures pluvieuses.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if rainfall_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de pluie saisonnière.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
seasons = rainfall_df.index.tolist()
x = np.arange(len(seasons))
totals = rainfall_df["total_rain_mm"].to_numpy(dtype=float)
fig, ax1 = plt.subplots(figsize=(9, 4))
bars = ax1.bar(x, totals, color="tab:blue", alpha=0.7, label="Pluie cumulée")
ax1.set_ylabel("Pluie cumulée (mm)")
ax1.set_xlabel("Saison")
ax1.set_xticks(x)
ax1.set_xticklabels([season.capitalize() for season in seasons])
ax1.grid(True, axis="y", linestyle=":", alpha=0.5)
for rect, value in zip(bars, totals):
height = rect.get_height()
ax1.text(rect.get_x() + rect.get_width() / 2, height, f"{value:.0f}", ha="center", va="bottom", fontsize=8)
lines = []
labels = []
if "rainy_hours" in rainfall_df.columns:
ax2 = ax1.twinx()
rainy_hours = rainfall_df["rainy_hours"].to_numpy(dtype=float)
line = ax2.plot(
x,
rainy_hours,
color="tab:red",
marker="o",
label="Heures pluvieuses",
)[0]
ax2.set_ylabel("Heures pluvieuses")
lines.append(line)
labels.append("Heures pluvieuses")
handles, lbls = ax1.get_legend_handles_labels()
handles.extend(lines)
lbls.extend(labels)
if handles:
ax1.legend(handles, lbls, loc="upper left")
ax1.set_title(title)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_monthly_anomalies(
monthly_means: pd.DataFrame,
climatology: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
title: str = "Moyennes mensuelles vs climatologie",
) -> Path:
"""
Compare les moyennes mensuelles observées à la climatologie pour plusieurs variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if monthly_means.empty or climatology.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données mensuelles disponibles.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(12, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
for ax, var in zip(axes, variables):
actual = monthly_means[var.column].dropna()
if actual.empty:
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
months = actual.index.month
clim = climatology.loc[months, var.column].to_numpy(dtype=float)
anomaly = actual.to_numpy(dtype=float) - clim
ax.plot(actual.index, actual, color="tab:blue", label="Moyenne mensuelle")
ax.plot(actual.index, clim, color="tab:gray", linestyle="--", label="Climatologie")
ax.fill_between(
actual.index,
actual,
clim,
where=anomaly >= 0,
color="tab:blue",
alpha=0.15,
)
ax.fill_between(
actual.index,
actual,
clim,
where=anomaly < 0,
color="tab:red",
alpha=0.15,
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
axes[-1].set_xlabel("Date")
axes[0].legend(loc="upper right")
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_wind_vector_series(
vector_df: pd.DataFrame,
output_path: str | Path,
*,
title: str = "Vecteurs moyens du vent",
) -> Path:
"""
Représente les composantes moyennes du vent sous forme de flèches (u/v).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if vector_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de vent.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
times = vector_df.index
x = mdates.date2num(times)
u = vector_df["u"].to_numpy(dtype=float)
v = vector_df["v"].to_numpy(dtype=float)
speed = vector_df["speed"].to_numpy(dtype=float)
fig, ax = plt.subplots(figsize=(12, 4))
q = ax.quiver(
x,
np.zeros_like(x),
u,
v,
speed,
angles="xy",
scale_units="xy",
scale=1,
cmap="viridis",
)
ax.axhline(0, color="black", linewidth=0.5)
ax.set_ylim(-max(abs(v)) * 1.2 if np.any(v) else -1, max(abs(v)) * 1.2 if np.any(v) else 1)
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.ConciseDateFormatter(ax.xaxis.get_major_locator()))
ax.set_ylabel("Composante nord (v)")
ax.set_xlabel("Date")
ax.set_title(title)
cbar = fig.colorbar(q, ax=ax)
cbar.set_label("Vitesse moyenne (km/h)")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_calendar_heatmap(
matrix: pd.DataFrame,
output_path: str | Path,
*,
title: str,
cmap: str = "YlGnBu",
colorbar_label: str = "",
) -> Path:
"""
Affiche une heatmap calendrier (lignes = mois, colonnes = jours).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if matrix.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données pour la heatmap.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots(figsize=(14, 6))
data = matrix.to_numpy(dtype=float)
im = ax.imshow(data, aspect="auto", cmap=cmap, interpolation="nearest")
ax.set_xticks(np.arange(matrix.shape[1]))
ax.set_xticklabels(matrix.columns, rotation=90)
ax.set_yticks(np.arange(matrix.shape[0]))
ax.set_yticklabels(matrix.index)
ax.set_xlabel("Jour du mois")
ax.set_ylabel("Mois")
ax.set_title(title)
cbar = fig.colorbar(im, ax=ax)
if colorbar_label:
cbar.set_label(colorbar_label)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_weekday_profiles(
weekday_df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
title: str,
) -> Path:
"""
Affiche les moyennes par jour de semaine pour plusieurs variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if weekday_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données hebdomadaires.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
weekday_labels = ["Lun", "Mar", "Mer", "Jeu", "Ven", "Sam", "Dim"]
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
x = np.arange(len(weekday_labels))
for ax, var in zip(axes, variables):
if var.column not in weekday_df.columns:
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
values = weekday_df[var.column].to_numpy(dtype=float)
ax.plot(x, values, marker="o", label=var.label)
ax.set_ylabel(f"{var.label} ({var.unit})" if var.unit else var.label)
ax.grid(True, linestyle=":", alpha=0.5)
ax.set_xticks(x)
ax.set_xticklabels(weekday_labels)
axes[-1].set_xlabel("Jour de semaine")
axes[0].legend(loc="upper right")
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_seasonal_hourly_profiles(
profile_df: pd.DataFrame,
output_path: str | Path,
*,
title: str,
ylabel: str,
) -> Path:
"""
Courbes moyennes par heure pour chaque saison.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if profile_df.empty or profile_df.isna().all().all():
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de profil saisonnier disponible.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
hours = profile_df.index.to_numpy(dtype=float)
fig, ax = plt.subplots(figsize=(10, 4))
colors = plt.get_cmap("turbo")(np.linspace(0.1, 0.9, profile_df.shape[1]))
for color, season in zip(colors, profile_df.columns):
ax.plot(hours, profile_df[season], label=season.capitalize(), color=color)
ax.set_xlabel("Heure locale")
ax.set_ylabel(ylabel)
ax.set_title(title)
ax.grid(True, linestyle=":", alpha=0.5)
ax.legend()
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_daylight_hours(
monthly_series: pd.Series,
output_path: str | Path,
*,
title: str = "Durée moyenne de luminosité (> seuil)",
) -> Path:
"""
Représente la durée moyenne quotidienne de luminosité par mois.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if monthly_series.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données sur la luminosité.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
months = monthly_series.index
fig, ax = plt.subplots(figsize=(10, 4))
ax.bar(months, monthly_series.values, color="goldenrod", alpha=0.8)
ax.set_ylabel("Heures de luminosité par jour")
ax.set_xlabel("Mois")
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.ConciseDateFormatter(ax.xaxis.get_major_locator()))
ax.set_title(title)
ax.grid(True, axis="y", linestyle=":", alpha=0.5)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()