# meteo/plots.py from __future__ import annotations from pathlib import Path from typing import Sequence import matplotlib.pyplot as plt from matplotlib.colors import Normalize import numpy as np import pandas as pd from .variables import Variable def plot_scatter_pair( df: pd.DataFrame, var_x: Variable, var_y: Variable, output_path: str | Path, *, sample_step: int = 10, color_by_time: bool = True, cmap: str = "viridis", ) -> Path: """ Trace un nuage de points (scatter) pour une paire de variables. - On sous-échantillonne les données avec `sample_step` (par exemple, 1 point sur 10) pour éviter un graphique illisible. - Si `color_by_time` vaut True et que l'index est temporel, les points sont colorés du plus ancien (sombre) au plus récent (clair). - Lorsque l'axe Y correspond à la direction du vent, on bascule sur un graphique polaire plus adapté (0° = Nord, sens horaire) avec un rayon normalisé : centre = valeur minimale, bord = maximale. """ output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # On ne garde que les colonnes pertinentes et les lignes complètes df_pair = df[[var_x.column, var_y.column]].dropna() if sample_step > 1: df_pair = df_pair.iloc[::sample_step, :] use_polar = var_y.key == "wind_direction" if use_polar: fig, ax = plt.subplots(subplot_kw={"projection": "polar"}) else: fig, ax = plt.subplots() scatter_kwargs: dict = {"s": 5, "alpha": 0.5} colorbar_meta: dict | None = None if color_by_time and isinstance(df_pair.index, pd.DatetimeIndex): idx = df_pair.index timestamps = idx.view("int64") time_span = np.ptp(timestamps) norm = ( Normalize(vmin=timestamps.min(), vmax=timestamps.max()) if time_span > 0 else None ) scatter_kwargs |= {"c": timestamps, "cmap": cmap} if norm is not None: scatter_kwargs["norm"] = norm colorbar_meta = { "index": idx, "timestamps": timestamps, "time_span": time_span, } if use_polar: theta = np.deg2rad(df_pair[var_y.column].to_numpy(dtype=float) % 360.0) radius_raw = df_pair[var_x.column].to_numpy(dtype=float) if radius_raw.size == 0: radius = radius_raw value_min = value_max = float("nan") else: value_min = float(np.min(radius_raw)) value_max = float(np.max(radius_raw)) if np.isclose(value_min, value_max): radius = np.zeros_like(radius_raw) else: radius = (radius_raw - value_min) / (value_max - value_min) scatter = ax.scatter(theta, radius, **scatter_kwargs) cardinal_angles = np.deg2rad(np.arange(0, 360, 45)) cardinal_labels = ["N", "NE", "E", "SE", "S", "SO", "O", "NO"] ax.set_theta_zero_location("N") ax.set_theta_direction(-1) ax.set_xticks(cardinal_angles) ax.set_xticklabels(cardinal_labels) if radius_raw.size > 0: if np.isclose(value_min, value_max): radial_positions = [0.0] else: radial_positions = np.linspace(0.0, 1.0, num=5).tolist() if np.isclose(value_min, value_max): actual_values = [value_min] else: actual_values = [ value_min + pos * (value_max - value_min) for pos in radial_positions ] ax.set_yticks(radial_positions) ax.set_yticklabels([f"{val:.1f}" for val in actual_values]) ax.set_rlabel_position(225) ax.set_ylim(0.0, 1.0) unit_suffix = f" {var_x.unit}" if var_x.unit else "" ax.text( 0.5, -0.1, f"Centre = {value_min:.1f}{unit_suffix}, bord = {value_max:.1f}{unit_suffix}", transform=ax.transAxes, ha="center", va="top", fontsize=8, ) radial_label = f"{var_x.label} ({var_x.unit})" if var_x.unit else var_x.label ax.set_ylabel(radial_label, labelpad=20) else: scatter = ax.scatter( df_pair[var_x.column], df_pair[var_y.column], **scatter_kwargs, ) if colorbar_meta is not None: cbar = fig.colorbar(scatter, ax=ax) idx = colorbar_meta["index"] timestamps = colorbar_meta["timestamps"] time_span = colorbar_meta["time_span"] def _format_tick_label(ts: pd.Timestamp) -> str: base = f"{ts.strftime('%Y-%m-%d')}\n{ts.strftime('%H:%M')}" tz_name = ts.tzname() return f"{base} ({tz_name})" if tz_name else base if time_span > 0: tick_datetimes = pd.date_range(start=idx.min(), end=idx.max(), periods=5) tick_positions = tick_datetimes.view("int64") tick_labels = [_format_tick_label(ts) for ts in tick_datetimes] cbar.set_ticks(tick_positions) cbar.set_ticklabels(tick_labels) else: cbar.set_ticks([timestamps[0]]) ts = idx[0] cbar.set_ticklabels([_format_tick_label(ts)]) cbar.set_label("Temps (ancien → récent)") if use_polar: ax.set_title(f"{var_y.label} en fonction de {var_x.label}") else: ax.set_xlabel(f"{var_x.label} ({var_x.unit})") ax.set_ylabel(f"{var_y.label} ({var_y.unit})") ax.set_title(f"{var_y.label} en fonction de {var_x.label}") fig.tight_layout() fig.savefig(output_path, dpi=150) plt.close(fig) return output_path.resolve() def plot_lagged_correlation( lag_df: pd.DataFrame, var_x: Variable, var_y: Variable, output_path: str | Path, ) -> Path: """ Trace la corrélation en fonction du lag (en minutes) entre deux variables. """ output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) plt.figure() plt.plot(lag_df.index, lag_df["correlation"]) plt.axvline(0, linestyle="--") # lag = 0 plt.xlabel("Décalage (minutes)\n(lag > 0 : X précède Y)") plt.ylabel("Corrélation") plt.title(f"Corrélation décalée : {var_x.label} → {var_y.label}") plt.grid(True) plt.tight_layout() plt.savefig(output_path, dpi=150) plt.close() return output_path.resolve() def plot_correlation_heatmap( corr: pd.DataFrame, variables: Sequence[Variable], output_path: str | Path, *, annotate: bool = True, ) -> Path: """ Trace une heatmap de la matrice de corrélation. Paramètres ---------- corr : Matrice de corrélation (index et colonnes doivent correspondre aux noms de colonnes des variables). variables : Liste de Variable, dans l'ordre où elles doivent apparaître. output_path : Chemin du fichier image à écrire. annotate : Si True, affiche la valeur numérique dans chaque case. """ output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) columns = [v.column for v in variables] labels = [v.label for v in variables] # On aligne la matrice sur l'ordre désiré corr = corr.loc[columns, columns] data = corr.to_numpy() fig, ax = plt.subplots() im = ax.imshow(data, vmin=-1.0, vmax=1.0) # Ticks et labels ax.set_xticks(np.arange(len(labels))) ax.set_yticks(np.arange(len(labels))) ax.set_xticklabels(labels, rotation=45, ha="right") ax.set_yticklabels(labels) # Axe en haut/bas selon préférence (ici on laisse en bas) ax.set_title("Matrice de corrélation (coef. de Pearson)") # Barre de couleur cbar = plt.colorbar(im, ax=ax) cbar.set_label("Corrélation") # Annotation des cases if annotate: n = data.shape[0] for i in range(n): for j in range(n): if i == j: text = "—" else: val = data[i, j] if np.isnan(val): text = "" else: text = f"{val:.2f}" ax.text( j, i, text, ha="center", va="center", ) plt.tight_layout() plt.savefig(output_path, dpi=150) plt.close(fig) return output_path.resolve()