1

Nouvelles visualisations exploratoires

This commit is contained in:
2025-11-17 21:57:13 +01:00
parent b72349a369
commit fd42a692d9
10 changed files with 679 additions and 2 deletions

View File

@@ -1,7 +1,7 @@
# meteo/analysis.py
from __future__ import annotations
from typing import Literal
from typing import Literal, Sequence
import numpy as np
import pandas as pd
@@ -115,3 +115,199 @@ def compute_lagged_correlation(
lag_df = lag_df.set_index("lag_minutes")
return lag_df
def _ensure_datetime_index(df: pd.DataFrame) -> pd.DatetimeIndex:
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("Cette fonction nécessite un DataFrame indexé par le temps.")
return df.index
def compute_rolling_correlation_series(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.Series:
"""
Calcule la corrélation glissante X/Y sur une fenêtre temporelle.
Retourne une série indexée par l'instant de fin de fenêtre.
"""
if not 0 < min_valid_fraction <= 1:
raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].")
for col in (var_x.column, var_y.column):
if col not in df.columns:
raise KeyError(f"Colonne absente du DataFrame : {col}")
_ensure_datetime_index(df)
pair = df[[var_x.column, var_y.column]].dropna().sort_index()
if pair.empty:
return pd.Series(dtype=float, name=f"{var_x.key}{var_y.key}")
window = f"{window_minutes}min"
min_periods = max(1, int(window_minutes * min_valid_fraction))
if method not in {"pearson"}:
raise NotImplementedError(
"Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'."
)
rolling_corr = pair[var_x.column].rolling(
window=window,
min_periods=min_periods,
).corr(pair[var_y.column])
rolling_corr = rolling_corr.dropna()
rolling_corr.name = f"{var_x.key}{var_y.key}"
if step_minutes and step_minutes > 1:
rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna()
return rolling_corr
def compute_rolling_correlations_for_pairs(
df: pd.DataFrame,
pairs: Sequence[tuple[Variable, Variable]],
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule les corrélations glissantes pour plusieurs paires et aligne les
résultats dans un DataFrame (index temps, colonnes = 'x→y').
"""
series_list: list[pd.Series] = []
for var_x, var_y in pairs:
corr = compute_rolling_correlation_series(
df=df,
var_x=var_x,
var_y=var_y,
window_minutes=window_minutes,
min_valid_fraction=min_valid_fraction,
step_minutes=step_minutes,
method=method,
)
if not corr.empty:
series_list.append(corr)
if not series_list:
return pd.DataFrame()
result = pd.concat(series_list, axis=1)
result = result.sort_index()
return result
def _infer_time_step(index: pd.DatetimeIndex) -> pd.Timedelta:
diffs = index.to_series().diff().dropna()
if diffs.empty:
return pd.Timedelta(minutes=1)
return diffs.median()
def detect_threshold_events(
series: pd.Series,
*,
threshold: float,
min_duration: pd.Timedelta,
min_gap: pd.Timedelta,
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""
Détecte des événements où `series > threshold` (après remplissage des NaN
par False) durant au moins `min_duration`. Les événements séparés d'un
intervalle < min_gap sont fusionnés.
"""
if not isinstance(series.index, pd.DatetimeIndex):
raise TypeError("series doit être indexée par le temps.")
mask = (series > threshold).fillna(False)
if not mask.any():
return []
groups = (mask != mask.shift()).cumsum()
time_step = _infer_time_step(series.index)
raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for group_id, group_mask in mask.groupby(groups):
if not group_mask.iloc[0]:
continue
start = group_mask.index[0]
end = group_mask.index[-1] + time_step
duration = end - start
if duration >= min_duration:
raw_events.append((start, end))
if not raw_events:
return []
merged: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for start, end in raw_events:
if not merged:
merged.append((start, end))
continue
prev_start, prev_end = merged[-1]
if start - prev_end < min_gap:
merged[-1] = (prev_start, max(prev_end, end))
else:
merged.append((start, end))
return merged
def build_event_aligned_segments(
df: pd.DataFrame,
events: Sequence[tuple[pd.Timestamp, pd.Timestamp]],
columns: Sequence[str],
*,
window_before_minutes: int,
window_after_minutes: int,
resample_minutes: int = 1,
) -> pd.DataFrame:
"""
Extrait, pour chaque événement, les séries centrées sur son début et
retourne un DataFrame MultiIndex (event_id, offset_minutes).
"""
if not events:
return pd.DataFrame(columns=columns)
index = _ensure_datetime_index(df)
data = df[columns].sort_index()
freq = pd.Timedelta(minutes=resample_minutes)
if resample_minutes > 1:
data = data.resample(freq).mean()
before = pd.Timedelta(minutes=window_before_minutes)
after = pd.Timedelta(minutes=window_after_minutes)
segments: list[pd.DataFrame] = []
for event_id, (start, _end) in enumerate(events):
window_start = start - before
window_end = start + after
window_index = pd.date_range(window_start, window_end, freq=freq)
segment = data.reindex(window_index)
if segment.empty:
continue
offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float)
multi_index = pd.MultiIndex.from_arrays(
[np.full(len(segment), event_id), offsets],
names=["event_id", "offset_minutes"],
)
segment.index = multi_index
segments.append(segment)
if not segments:
return pd.DataFrame(columns=columns)
aligned = pd.concat(segments)
return aligned

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from pathlib import Path
from typing import Sequence
from typing import Callable, Sequence
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
@@ -168,6 +168,71 @@ def plot_scatter_pair(
return output_path.resolve()
def plot_hexbin_with_third_variable(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
var_color: Variable,
output_path: str | Path,
*,
gridsize: int = 60,
mincnt: int = 5,
reduce_func: Callable[[np.ndarray], float] | None = None,
reduce_func_label: str | None = None,
cmap: str = "viridis",
) -> Path:
"""
Trace une carte de densité hexbin où la couleur encode une 3e variable.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
reduce_func = reduce_func or np.mean
df_xyz = df[[var_x.column, var_y.column, var_color.column]].dropna()
if df_xyz.empty:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Pas de données valides pour cette combinaison.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots()
hb = ax.hexbin(
df_xyz[var_x.column],
df_xyz[var_y.column],
C=df_xyz[var_color.column],
reduce_C_function=reduce_func,
gridsize=gridsize,
cmap=cmap,
mincnt=mincnt,
)
func_label = reduce_func_label or getattr(reduce_func, "__name__", "statistique")
colorbar_label = f"{func_label.capitalize()} de {var_color.label}"
cbar = fig.colorbar(hb, ax=ax)
cbar.set_label(colorbar_label)
ax.set_xlabel(f"{var_x.label} ({var_x.unit})")
ax.set_ylabel(f"{var_y.label} ({var_y.unit})")
ax.set_title(
f"{var_y.label} vs {var_x.label}\nCouleur : {func_label} de {var_color.label}"
)
ax.grid(False)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_lagged_correlation(
lag_df: pd.DataFrame,
var_x: Variable,
@@ -270,3 +335,141 @@ def plot_correlation_heatmap(
plt.close(fig)
return output_path.resolve()
def plot_rolling_correlation_heatmap(
rolling_corr: pd.DataFrame,
output_path: str | Path,
*,
cmap: str = "coolwarm",
vmin: float = -1.0,
vmax: float = 1.0,
time_tick_count: int = 6,
) -> Path:
"""
Visualise l'évolution de corrélations glissantes pour plusieurs paires.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if rolling_corr.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Aucune donnée de corrélation glissante.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
labels = list(rolling_corr.columns)
data = rolling_corr.to_numpy().T
height = max(3.0, 0.6 * len(labels))
fig, ax = plt.subplots(figsize=(10, height))
im = ax.imshow(data, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax)
ax.set_yticks(np.arange(len(labels)))
ax.set_yticklabels(labels)
if isinstance(rolling_corr.index, pd.DatetimeIndex):
times = rolling_corr.index
if len(times) > 1:
tick_idx = np.linspace(0, len(times) - 1, num=min(time_tick_count, len(times)), dtype=int)
else:
tick_idx = np.array([0])
tick_labels = [times[i].strftime("%Y-%m-%d\n%H:%M") for i in tick_idx]
else:
tick_idx = np.linspace(0, len(rolling_corr.index) - 1, num=min(time_tick_count, len(rolling_corr.index)), dtype=int)
tick_labels = [str(rolling_corr.index[i]) for i in tick_idx]
ax.set_xticks(tick_idx)
ax.set_xticklabels(tick_labels, rotation=30, ha="right")
ax.set_xlabel("Temps (fin de fenêtre)")
ax.set_ylabel("Paire de variables")
ax.set_title("Corrélations glissantes")
cbar = fig.colorbar(im, ax=ax)
cbar.set_label("Coefficient de corrélation")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_event_composite(
aligned_segments: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
quantiles: tuple[float, float] = (0.25, 0.75),
baseline_label: str = "Début de l'événement",
) -> Path:
"""
Trace les moyennes/médianes autour d'événements détectés avec éventail inter-quantiles.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if aligned_segments.empty:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Aucun événement aligné à tracer.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
if "offset_minutes" not in aligned_segments.index.names:
raise ValueError("aligned_segments doit avoir un niveau 'offset_minutes'.")
group = aligned_segments.groupby(level="offset_minutes")
mean_df = group.mean()
median_df = group.median()
q_low, q_high = quantiles
quantile_low = group.quantile(q_low) if q_low is not None else None
quantile_high = group.quantile(q_high) if q_high is not None else None
offsets = mean_df.index.to_numpy(dtype=float)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
col = var.column
ax.axvline(0, color="black", linestyle="--", linewidth=1, label=baseline_label)
ax.plot(offsets, mean_df[col], color="tab:blue", label="Moyenne")
ax.plot(offsets, median_df[col], color="tab:orange", linestyle="--", label="Médiane")
if quantile_low is not None and quantile_high is not None:
ax.fill_between(
offsets,
quantile_low[col],
quantile_high[col],
color="tab:blue",
alpha=0.2,
label=f"IQR {int(q_low*100)}{int(q_high*100)}%",
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Minutes autour de l'événement")
axes[0].legend(loc="upper right")
total_events = len(aligned_segments.index.get_level_values("event_id").unique())
fig.suptitle(f"Composites autour d'événements ({total_events} occurrences)")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()