1

Refactoring

This commit is contained in:
Richard Dern 2025-11-18 09:01:34 +01:00
parent b3d657deb9
commit 85da4e4931
25 changed files with 2407 additions and 2155 deletions

3
.gitignore vendored
View File

@ -1,5 +1,4 @@
.venv
.env
data
scripts/__pycache__
meteo/__pycache__
__pycache__

View File

@ -1,745 +0,0 @@
# meteo/analysis.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Sequence
import numpy as np
import pandas as pd
from .variables import Variable
from .season import SEASON_LABELS
MONTH_ORDER = list(range(1, 13))
def compute_correlation_matrix(
df: pd.DataFrame,
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la matrice de corrélation entre toutes les colonnes numériques
du DataFrame.
Attention :
- La direction du vent est traitée ici comme une variable scalaire 0360°,
ce qui n'est pas idéal pour une analyse circulaire. On affinera plus tard
si besoin (représentation en sin/cos).
"""
numeric_df = df.select_dtypes(include=["number"])
corr = numeric_df.corr(method=method)
return corr
def compute_correlation_matrix_for_variables(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la matrice de corrélation pour un sous-ensemble de variables,
dans un ordre bien défini.
Paramètres
----------
df :
DataFrame contenant les colonnes à analyser.
variables :
Séquence de Variable décrivant les colonnes à prendre en compte.
method :
Méthode de corrélation pandas (pearson, spearman, ...).
Retour
------
DataFrame :
Matrice de corrélation, index et colonnes dans le même ordre que
`variables`, avec les colonnes pandas correspondant aux noms de colonnes
du DataFrame (ex: "temperature", "humidity", ...).
"""
columns = [v.column for v in variables]
missing = [c for c in columns if c not in df.columns]
if missing:
raise KeyError(f"Colonnes manquantes dans le DataFrame : {missing!r}")
numeric_df = df[columns].astype(float)
corr = numeric_df.corr(method=method)
# On s'assure de l'ordre
corr = corr.loc[columns, columns]
return corr
def compute_lagged_correlation(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
max_lag_minutes: int = 360,
step_minutes: int = 10,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la corrélation entre deux variables pour une série de décalages
temporels (lags).
Convention :
- lag > 0 : X "précède" Y de `lag` minutes.
On corrèle X(t) avec Y(t + lag).
- lag < 0 : Y "précède" X de |lag| minutes.
On corrèle X(t) avec Y(t + lag), lag étant négatif.
Implémentation :
- On utilise un DataFrame avec les deux colonnes,
puis on applique un `shift` sur Y.
"""
if var_x.column not in df.columns or var_y.column not in df.columns:
raise KeyError("Les colonnes demandées ne sont pas présentes dans le DataFrame.")
series_x = df[var_x.column]
series_y = df[var_y.column]
lags = range(-max_lag_minutes, max_lag_minutes + 1, step_minutes)
results: list[tuple[int, float]] = []
for lag in lags:
# Y décalé de -lag : pour lag positif, on corrèle X(t) à Y(t + lag)
shifted_y = series_y.shift(-lag)
pair = pd.concat([series_x, shifted_y], axis=1).dropna()
if pair.empty:
corr = np.nan
else:
corr = pair.iloc[:, 0].corr(pair.iloc[:, 1], method=method)
results.append((lag, corr))
lag_df = pd.DataFrame(results, columns=["lag_minutes", "correlation"])
lag_df = lag_df.set_index("lag_minutes")
return lag_df
def _ensure_datetime_index(df: pd.DataFrame) -> pd.DatetimeIndex:
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("Cette fonction nécessite un DataFrame indexé par le temps.")
return df.index
@dataclass
class DiurnalCycleStats:
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
@dataclass
class BinnedStatistics:
centers: np.ndarray
intervals: pd.IntervalIndex
counts: pd.Series
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
def compute_rolling_correlation_series(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.Series:
"""
Calcule la corrélation glissante X/Y sur une fenêtre temporelle.
Retourne une série indexée par l'instant de fin de fenêtre.
"""
if not 0 < min_valid_fraction <= 1:
raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].")
for col in (var_x.column, var_y.column):
if col not in df.columns:
raise KeyError(f"Colonne absente du DataFrame : {col}")
_ensure_datetime_index(df)
pair = df[[var_x.column, var_y.column]].dropna().sort_index()
if pair.empty:
return pd.Series(dtype=float, name=f"{var_x.key}{var_y.key}")
window = f"{window_minutes}min"
min_periods = max(1, int(window_minutes * min_valid_fraction))
if method not in {"pearson"}:
raise NotImplementedError(
"Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'."
)
rolling_corr = pair[var_x.column].rolling(
window=window,
min_periods=min_periods,
).corr(pair[var_y.column])
rolling_corr = rolling_corr.dropna()
rolling_corr.name = f"{var_x.key}{var_y.key}"
if step_minutes and step_minutes > 1:
rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna()
return rolling_corr
def compute_rolling_correlations_for_pairs(
df: pd.DataFrame,
pairs: Sequence[tuple[Variable, Variable]],
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule les corrélations glissantes pour plusieurs paires et aligne les
résultats dans un DataFrame (index temps, colonnes = 'x→y').
"""
series_list: list[pd.Series] = []
for var_x, var_y in pairs:
corr = compute_rolling_correlation_series(
df=df,
var_x=var_x,
var_y=var_y,
window_minutes=window_minutes,
min_valid_fraction=min_valid_fraction,
step_minutes=step_minutes,
method=method,
)
if not corr.empty:
series_list.append(corr)
if not series_list:
return pd.DataFrame()
result = pd.concat(series_list, axis=1)
result = result.sort_index()
return result
def _infer_time_step(index: pd.DatetimeIndex) -> pd.Timedelta:
diffs = index.to_series().diff().dropna()
if diffs.empty:
return pd.Timedelta(minutes=1)
return diffs.median()
def detect_threshold_events(
series: pd.Series,
*,
threshold: float,
min_duration: pd.Timedelta,
min_gap: pd.Timedelta,
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""
Détecte des événements `series > threshold` (après remplissage des NaN
par False) durant au moins `min_duration`. Les événements séparés d'un
intervalle < min_gap sont fusionnés.
"""
if not isinstance(series.index, pd.DatetimeIndex):
raise TypeError("series doit être indexée par le temps.")
mask = (series > threshold).fillna(False)
if not mask.any():
return []
groups = (mask != mask.shift()).cumsum()
time_step = _infer_time_step(series.index)
raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for group_id, group_mask in mask.groupby(groups):
if not group_mask.iloc[0]:
continue
start = group_mask.index[0]
end = group_mask.index[-1] + time_step
duration = end - start
if duration >= min_duration:
raw_events.append((start, end))
if not raw_events:
return []
merged: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for start, end in raw_events:
if not merged:
merged.append((start, end))
continue
prev_start, prev_end = merged[-1]
if start - prev_end < min_gap:
merged[-1] = (prev_start, max(prev_end, end))
else:
merged.append((start, end))
return merged
def build_event_aligned_segments(
df: pd.DataFrame,
events: Sequence[tuple[pd.Timestamp, pd.Timestamp]],
columns: Sequence[str],
*,
window_before_minutes: int,
window_after_minutes: int,
resample_minutes: int = 1,
) -> pd.DataFrame:
"""
Extrait, pour chaque événement, les séries centrées sur son début et
retourne un DataFrame MultiIndex (event_id, offset_minutes).
"""
if not events:
return pd.DataFrame(columns=columns)
index = _ensure_datetime_index(df)
data = df[columns].sort_index()
freq = pd.Timedelta(minutes=resample_minutes)
if resample_minutes > 1:
data = data.resample(freq).mean()
before = pd.Timedelta(minutes=window_before_minutes)
after = pd.Timedelta(minutes=window_after_minutes)
segments: list[pd.DataFrame] = []
for event_id, (start, _end) in enumerate(events):
window_start = start - before
window_end = start + after
window_index = pd.date_range(window_start, window_end, freq=freq)
segment = data.reindex(window_index)
if segment.empty:
continue
offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float)
multi_index = pd.MultiIndex.from_arrays(
[np.full(len(segment), event_id), offsets],
names=["event_id", "offset_minutes"],
)
segment.index = multi_index
segments.append(segment)
if not segments:
return pd.DataFrame(columns=columns)
aligned = pd.concat(segments)
return aligned
def compute_diurnal_cycle_statistics(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> DiurnalCycleStats:
"""
Agrège les variables par heure locale pour visualiser un cycle diurne moyen.
"""
_ensure_datetime_index(df)
columns = [v.column for v in variables]
grouped = df[columns].groupby(df.index.hour)
mean_df = grouped.mean()
median_df = grouped.median()
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
q_low = q_high = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped.quantile(q_low)
if q_high is not None:
quantile_high_df = grouped.quantile(q_high)
return DiurnalCycleStats(
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def _format_speed_bin_labels(speed_bins: Sequence[float]) -> list[str]:
labels: list[str] = []
for i in range(len(speed_bins) - 1):
low = speed_bins[i]
high = speed_bins[i + 1]
if np.isinf(high):
labels.append(f"{low:g}")
else:
labels.append(f"{low:g}{high:g}")
return labels
def compute_wind_rose_distribution(
df: pd.DataFrame,
*,
direction_sector_size: int = 30,
speed_bins: Sequence[float] = (0, 10, 20, 30, 50, float("inf")),
) -> tuple[pd.DataFrame, list[str], float]:
"""
Regroupe la distribution vent/direction en secteurs angulaires et classes de vitesse.
Retourne un DataFrame indexé par le début du secteur (en degrés) et colonnes = classes de vitesse (%).
"""
if direction_sector_size <= 0 or direction_sector_size > 180:
raise ValueError("direction_sector_size doit être compris entre 1 et 180 degrés.")
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Le DataFrame doit contenir 'wind_speed' et 'wind_direction'.")
data = df[["wind_speed", "wind_direction"]].dropna()
if data.empty:
return pd.DataFrame(), [], float(direction_sector_size)
n_sectors = int(360 / direction_sector_size)
direction = data["wind_direction"].to_numpy(dtype=float) % 360.0
sector_indices = np.floor(direction / direction_sector_size).astype(int) % n_sectors
bins = list(speed_bins)
if not np.isinf(bins[-1]):
bins.append(float("inf"))
labels = _format_speed_bin_labels(bins)
speed_categories = pd.cut(
data["wind_speed"],
bins=bins,
right=False,
include_lowest=True,
labels=labels,
)
counts = (
pd.crosstab(sector_indices, speed_categories)
.reindex(range(n_sectors), fill_value=0)
.reindex(columns=labels, fill_value=0)
)
total = counts.values.sum()
frequencies = counts / total * 100.0 if total > 0 else counts.astype(float)
frequencies.index = frequencies.index * direction_sector_size
return frequencies, labels, float(direction_sector_size)
def compute_daily_rainfall_totals(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
) -> pd.DataFrame:
"""
Convertit un taux de pluie (mm/h) en cumuls journaliers et cumulés.
"""
_ensure_datetime_index(df)
if rate_column not in df.columns:
raise KeyError(f"Colonne absente : {rate_column}")
series = df[rate_column].fillna(0.0).sort_index()
if series.empty:
return pd.DataFrame(columns=["daily_total", "cumulative_total"])
time_step = _infer_time_step(series.index)
diffs = series.index.to_series().diff()
diffs = diffs.fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = series.to_numpy(dtype=float) * hours.to_numpy(dtype=float)
rainfall_series = pd.Series(rainfall_mm, index=series.index)
daily_totals = rainfall_series.resample("1D").sum()
cumulative = daily_totals.cumsum()
result = pd.DataFrame(
{
"daily_total": daily_totals,
"cumulative_total": cumulative,
}
)
return result
def compute_binned_statistics(
df: pd.DataFrame,
*,
bin_source_column: str,
target_columns: Sequence[str],
bins: Sequence[float] | np.ndarray,
min_count: int = 30,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> BinnedStatistics:
"""
Calcule des statistiques (mean/median/quantiles) pour plusieurs colonnes
en regroupant les données selon des intervalles définis sur une colonne source.
"""
if bin_source_column not in df.columns:
raise KeyError(f"Colonne source absente : {bin_source_column}")
missing_targets = [col for col in target_columns if col not in df.columns]
if missing_targets:
raise KeyError(f"Colonnes cibles absentes : {missing_targets!r}")
subset_cols = [bin_source_column, *target_columns]
data = df[subset_cols].dropna(subset=[bin_source_column])
if data.empty:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
categories = pd.cut(data[bin_source_column], bins=bins, include_lowest=True)
grouped = data.groupby(categories, observed=False)
counts = grouped.size()
valid_mask = counts >= max(1, min_count)
valid_intervals = counts.index[valid_mask]
if len(valid_intervals) == 0:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
interval_index = pd.IntervalIndex(valid_intervals)
mean_df = grouped[target_columns].mean().loc[interval_index]
median_df = grouped[target_columns].median().loc[interval_index]
q_low = q_high = None
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped[target_columns].quantile(q_low).loc[interval_index]
if q_high is not None:
quantile_high_df = grouped[target_columns].quantile(q_high).loc[interval_index]
centers = np.array([interval.mid for interval in interval_index])
filtered_counts = counts.loc[interval_index]
return BinnedStatistics(
centers=centers,
intervals=interval_index,
counts=filtered_counts,
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def compute_rainfall_by_season(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
season_column: str = "season",
) -> pd.DataFrame:
"""
Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses.
"""
_ensure_datetime_index(df)
for col in (rate_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
data = df[[rate_column, season_column]].copy()
data[rate_column] = data[rate_column].fillna(0.0)
data = data.dropna(subset=[season_column])
if data.empty:
return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float)
time_step = _infer_time_step(data.index)
diffs = data.index.to_series().diff().fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float)
data["rainfall_mm"] = rainfall_mm
data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float)
agg = data.groupby(season_column).agg(
total_rain_mm=("rainfall_mm", "sum"),
rainy_hours=("rainy_hours", "sum"),
)
order = [season for season in SEASON_LABELS if season in agg.index]
agg = agg.loc[order]
return agg
def filter_by_condition(
df: pd.DataFrame,
*,
condition: pd.Series,
) -> pd.DataFrame:
"""
Renvoie une copie filtrée du DataFrame selon une condition booleenne alignée.
"""
mask = condition.reindex(df.index)
mask = mask.fillna(False)
return df.loc[mask]
def compute_monthly_climatology(
df: pd.DataFrame,
*,
columns: Sequence[str],
) -> pd.DataFrame:
"""
Moyenne par mois (112) pour les colonnes fournies.
"""
_ensure_datetime_index(df)
missing = [col for col in columns if col not in df.columns]
if missing:
raise KeyError(f"Colonnes absentes : {missing}")
grouped = df[list(columns)].groupby(df.index.month).mean()
grouped = grouped.reindex(MONTH_ORDER)
grouped.index.name = "month"
return grouped
def compute_monthly_means(
df: pd.DataFrame,
*,
columns: Sequence[str],
) -> pd.DataFrame:
"""
Moyennes calendaire par mois (indexé sur la fin de mois).
"""
_ensure_datetime_index(df)
missing = [col for col in columns if col not in df.columns]
if missing:
raise KeyError(f"Colonnes absentes : {missing}")
monthly = df[list(columns)].resample("1ME").mean()
return monthly.dropna(how="all")
def compute_seasonal_hourly_profile(
df: pd.DataFrame,
*,
value_column: str,
season_column: str = "season",
) -> pd.DataFrame:
"""
Retourne une matrice (heures x saisons) contenant la moyenne d'une variable.
"""
_ensure_datetime_index(df)
for col in (value_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
subset = df[[value_column, season_column]].dropna()
if subset.empty:
return pd.DataFrame(index=range(24))
grouped = subset.groupby([season_column, subset.index.hour])[value_column].mean()
pivot = grouped.unstack(season_column)
pivot = pivot.reindex(index=range(24))
order = [season for season in SEASON_LABELS if season in pivot.columns]
if order:
pivot = pivot[order]
pivot.index.name = "hour"
return pivot
def compute_monthly_daylight_hours(
df: pd.DataFrame,
*,
illuminance_column: str = "illuminance",
threshold_lux: float = 1000.0,
) -> pd.Series:
"""
Calcule la durée moyenne de luminosité (> threshold_lux) par mois (en heures par jour).
"""
_ensure_datetime_index(df)
if illuminance_column not in df.columns:
raise KeyError(f"Colonne absente : {illuminance_column}")
subset = df[[illuminance_column]].dropna()
if subset.empty:
return pd.Series(dtype=float)
time_step = _infer_time_step(subset.index)
hours_per_step = time_step.total_seconds() / 3600.0
daylight_flag = (subset[illuminance_column] >= threshold_lux).astype(float)
daylight_hours = daylight_flag * hours_per_step
daily_hours = daylight_hours.resample("1D").sum()
monthly_avg = daily_hours.resample("1ME").mean()
return monthly_avg.dropna()
def compute_mean_wind_components(
df: pd.DataFrame,
*,
freq: str = "1M",
) -> pd.DataFrame:
"""
Calcule les composantes zonale (u) et méridienne (v) du vent pour une fréquence donnée.
Retourne également la vitesse moyenne.
"""
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Les colonnes 'wind_speed' et 'wind_direction' sont requises.")
_ensure_datetime_index(df)
subset = df[["wind_speed", "wind_direction"]].dropna()
if subset.empty:
return pd.DataFrame(columns=["u", "v", "speed"])
radians = np.deg2rad(subset["wind_direction"].to_numpy(dtype=float))
speed = subset["wind_speed"].to_numpy(dtype=float)
u = speed * np.sin(radians) * -1 # composante est-ouest (positive vers l'est)
v = speed * np.cos(radians) * -1 # composante nord-sud (positive vers le nord)
vector_df = pd.DataFrame(
{
"u": u,
"v": v,
"speed": speed,
},
index=subset.index,
)
actual_freq = "1ME" if freq == "1M" else freq
grouped = vector_df.resample(actual_freq).mean()
return grouped.dropna(how="all")

View File

@ -0,0 +1,47 @@
"""Point d'entrée public regroupant les utilitaires analytiques de la librairie."""
from __future__ import annotations
from .core import BinnedStatistics, DiurnalCycleStats, MONTH_ORDER
from .correlations import (
compute_correlation_matrix,
compute_correlation_matrix_for_variables,
compute_lagged_correlation,
compute_rolling_correlation_series,
compute_rolling_correlations_for_pairs,
)
from .events import build_event_aligned_segments, detect_threshold_events
from .filters import filter_by_condition
from .rain import compute_daily_rainfall_totals, compute_rainfall_by_season
from .seasonal import (
compute_monthly_climatology,
compute_monthly_daylight_hours,
compute_monthly_means,
compute_seasonal_hourly_profile,
)
from .statistics import compute_binned_statistics, compute_diurnal_cycle_statistics
from .wind import compute_mean_wind_components, compute_wind_rose_distribution
__all__ = [
"BinnedStatistics",
"DiurnalCycleStats",
"MONTH_ORDER",
"compute_correlation_matrix",
"compute_correlation_matrix_for_variables",
"compute_lagged_correlation",
"compute_rolling_correlation_series",
"compute_rolling_correlations_for_pairs",
"build_event_aligned_segments",
"detect_threshold_events",
"filter_by_condition",
"compute_daily_rainfall_totals",
"compute_rainfall_by_season",
"compute_monthly_climatology",
"compute_monthly_daylight_hours",
"compute_monthly_means",
"compute_seasonal_hourly_profile",
"compute_binned_statistics",
"compute_diurnal_cycle_statistics",
"compute_mean_wind_components",
"compute_wind_rose_distribution",
]

55
meteo/analysis/core.py Normal file
View File

@ -0,0 +1,55 @@
"""Structures et helpers communs pour les analyses météorologiques."""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
import pandas as pd
__all__ = ['MONTH_ORDER', 'DiurnalCycleStats', 'BinnedStatistics']
MONTH_ORDER = list(range(1, 13))
@dataclass
class DiurnalCycleStats:
"""Conteneur pour les statistiques agrégées par heure (moyenne, médiane et quantiles optionnels)."""
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
@dataclass
class BinnedStatistics:
"""Structure englobant les résultats calculés sur des intervalles (bins) réguliers ou personnalisés."""
centers: np.ndarray
intervals: pd.IntervalIndex
counts: pd.Series
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
def _ensure_datetime_index(df: pd.DataFrame) -> pd.DatetimeIndex:
"""Valide la présence d'un index temporel et le retourne pour uniformiser les traitements."""
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("Cette fonction nécessite un DataFrame indexé par le temps.")
return df.index
def _infer_time_step(index: pd.DatetimeIndex) -> pd.Timedelta:
"""Estime la résolution temporelle représentative (médiane) d'un index daté."""
diffs = index.to_series().diff().dropna()
if diffs.empty:
return pd.Timedelta(minutes=1)
return diffs.median()

View File

@ -0,0 +1,201 @@
"""Calculs statistiques liés aux corrélations (instantanées, décalées, glissantes)."""
from __future__ import annotations
from typing import Literal, Sequence
import numpy as np
import pandas as pd
from meteo.variables import Variable
from .core import _ensure_datetime_index
__all__ = ['compute_correlation_matrix', 'compute_correlation_matrix_for_variables', 'compute_lagged_correlation', 'compute_rolling_correlation_series', 'compute_rolling_correlations_for_pairs']
def compute_correlation_matrix(
df: pd.DataFrame,
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la matrice de corrélation entre toutes les colonnes numériques
du DataFrame.
Attention :
- La direction du vent est traitée ici comme une variable scalaire 0360°,
ce qui n'est pas idéal pour une analyse circulaire. On affinera plus tard
si besoin (représentation en sin/cos).
"""
numeric_df = df.select_dtypes(include=["number"])
corr = numeric_df.corr(method=method)
return corr
def compute_correlation_matrix_for_variables(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la matrice de corrélation pour un sous-ensemble de variables,
dans un ordre bien défini.
Paramètres
----------
df :
DataFrame contenant les colonnes à analyser.
variables :
Séquence de Variable décrivant les colonnes à prendre en compte.
method :
Méthode de corrélation pandas (pearson, spearman, ...).
Retour
------
DataFrame :
Matrice de corrélation, index et colonnes dans le même ordre que
`variables`, avec les colonnes pandas correspondant aux noms de colonnes
du DataFrame (ex: "temperature", "humidity", ...).
"""
columns = [v.column for v in variables]
missing = [c for c in columns if c not in df.columns]
if missing:
raise KeyError(f"Colonnes manquantes dans le DataFrame : {missing!r}")
numeric_df = df[columns].astype(float)
corr = numeric_df.corr(method=method)
# On s'assure de l'ordre
corr = corr.loc[columns, columns]
return corr
def compute_lagged_correlation(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
max_lag_minutes: int = 360,
step_minutes: int = 10,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule la corrélation entre deux variables pour une série de décalages
temporels (lags).
Convention :
- lag > 0 : X "précède" Y de `lag` minutes.
On corrèle X(t) avec Y(t + lag).
- lag < 0 : Y "précède" X de |lag| minutes.
On corrèle X(t) avec Y(t + lag), lag étant négatif.
Implémentation :
- On utilise un DataFrame avec les deux colonnes,
puis on applique un `shift` sur Y.
"""
if var_x.column not in df.columns or var_y.column not in df.columns:
raise KeyError("Les colonnes demandées ne sont pas présentes dans le DataFrame.")
series_x = df[var_x.column]
series_y = df[var_y.column]
lags = range(-max_lag_minutes, max_lag_minutes + 1, step_minutes)
results: list[tuple[int, float]] = []
for lag in lags:
# Y décalé de -lag : pour lag positif, on corrèle X(t) à Y(t + lag)
shifted_y = series_y.shift(-lag)
pair = pd.concat([series_x, shifted_y], axis=1).dropna()
if pair.empty:
corr = np.nan
else:
corr = pair.iloc[:, 0].corr(pair.iloc[:, 1], method=method)
results.append((lag, corr))
lag_df = pd.DataFrame(results, columns=["lag_minutes", "correlation"])
lag_df = lag_df.set_index("lag_minutes")
return lag_df
def compute_rolling_correlation_series(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.Series:
"""
Calcule la corrélation glissante X/Y sur une fenêtre temporelle.
Retourne une série indexée par l'instant de fin de fenêtre.
"""
if not 0 < min_valid_fraction <= 1:
raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].")
for col in (var_x.column, var_y.column):
if col not in df.columns:
raise KeyError(f"Colonne absente du DataFrame : {col}")
_ensure_datetime_index(df)
pair = df[[var_x.column, var_y.column]].dropna().sort_index()
if pair.empty:
return pd.Series(dtype=float, name=f"{var_x.key}{var_y.key}")
window = f"{window_minutes}min"
min_periods = max(1, int(window_minutes * min_valid_fraction))
if method not in {"pearson"}:
raise NotImplementedError(
"Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'."
)
rolling_corr = pair[var_x.column].rolling(
window=window,
min_periods=min_periods,
).corr(pair[var_y.column])
rolling_corr = rolling_corr.dropna()
rolling_corr.name = f"{var_x.key}{var_y.key}"
if step_minutes and step_minutes > 1:
rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna()
return rolling_corr
def compute_rolling_correlations_for_pairs(
df: pd.DataFrame,
pairs: Sequence[tuple[Variable, Variable]],
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule les corrélations glissantes pour plusieurs paires et aligne les
résultats dans un DataFrame (index temps, colonnes = 'x→y').
"""
series_list: list[pd.Series] = []
for var_x, var_y in pairs:
corr = compute_rolling_correlation_series(
df=df,
var_x=var_x,
var_y=var_y,
window_minutes=window_minutes,
min_valid_fraction=min_valid_fraction,
step_minutes=step_minutes,
method=method,
)
if not corr.empty:
series_list.append(corr)
if not series_list:
return pd.DataFrame()
result = pd.concat(series_list, axis=1)
result = result.sort_index()
return result

111
meteo/analysis/events.py Normal file
View File

@ -0,0 +1,111 @@
"""Détection d'événements météorologiques et extraction de segments alignés."""
from __future__ import annotations
from typing import Sequence
import numpy as np
import pandas as pd
from .core import _ensure_datetime_index, _infer_time_step
__all__ = ['detect_threshold_events', 'build_event_aligned_segments']
def detect_threshold_events(
series: pd.Series,
*,
threshold: float,
min_duration: pd.Timedelta,
min_gap: pd.Timedelta,
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""
Détecte des événements `series > threshold` (après remplissage des NaN
par False) durant au moins `min_duration`. Les événements séparés d'un
intervalle < min_gap sont fusionnés.
"""
if not isinstance(series.index, pd.DatetimeIndex):
raise TypeError("series doit être indexée par le temps.")
mask = (series > threshold).fillna(False)
if not mask.any():
return []
groups = (mask != mask.shift()).cumsum()
time_step = _infer_time_step(series.index)
raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for group_id, group_mask in mask.groupby(groups):
if not group_mask.iloc[0]:
continue
start = group_mask.index[0]
end = group_mask.index[-1] + time_step
duration = end - start
if duration >= min_duration:
raw_events.append((start, end))
if not raw_events:
return []
merged: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for start, end in raw_events:
if not merged:
merged.append((start, end))
continue
prev_start, prev_end = merged[-1]
if start - prev_end < min_gap:
merged[-1] = (prev_start, max(prev_end, end))
else:
merged.append((start, end))
return merged
def build_event_aligned_segments(
df: pd.DataFrame,
events: Sequence[tuple[pd.Timestamp, pd.Timestamp]],
columns: Sequence[str],
*,
window_before_minutes: int,
window_after_minutes: int,
resample_minutes: int = 1,
) -> pd.DataFrame:
"""
Extrait, pour chaque événement, les séries centrées sur son début et
retourne un DataFrame MultiIndex (event_id, offset_minutes).
"""
if not events:
return pd.DataFrame(columns=columns)
index = _ensure_datetime_index(df)
data = df[columns].sort_index()
freq = pd.Timedelta(minutes=resample_minutes)
if resample_minutes > 1:
data = data.resample(freq).mean()
before = pd.Timedelta(minutes=window_before_minutes)
after = pd.Timedelta(minutes=window_after_minutes)
segments: list[pd.DataFrame] = []
for event_id, (start, _end) in enumerate(events):
window_start = start - before
window_end = start + after
window_index = pd.date_range(window_start, window_end, freq=freq)
segment = data.reindex(window_index)
if segment.empty:
continue
offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float)
multi_index = pd.MultiIndex.from_arrays(
[np.full(len(segment), event_id), offsets],
names=["event_id", "offset_minutes"],
)
segment.index = multi_index
segments.append(segment)
if not segments:
return pd.DataFrame(columns=columns)
aligned = pd.concat(segments)
return aligned

20
meteo/analysis/filters.py Normal file
View File

@ -0,0 +1,20 @@
"""Filtres simples appliqués aux DataFrames météo."""
from __future__ import annotations
import pandas as pd
__all__ = ['filter_by_condition']
def filter_by_condition(
df: pd.DataFrame,
*,
condition: pd.Series,
) -> pd.DataFrame:
"""
Renvoie une copie filtrée du DataFrame selon une condition booleenne alignée.
"""
mask = condition.reindex(df.index)
mask = mask.fillna(False)
return df.loc[mask]

86
meteo/analysis/rain.py Normal file
View File

@ -0,0 +1,86 @@
"""Conversions et agrégations des mesures de pluie."""
from __future__ import annotations
import numpy as np
import pandas as pd
from meteo.season import SEASON_LABELS
from .core import _ensure_datetime_index, _infer_time_step
__all__ = ['compute_daily_rainfall_totals', 'compute_rainfall_by_season']
def compute_daily_rainfall_totals(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
) -> pd.DataFrame:
"""
Convertit un taux de pluie (mm/h) en cumuls journaliers et cumulés.
"""
_ensure_datetime_index(df)
if rate_column not in df.columns:
raise KeyError(f"Colonne absente : {rate_column}")
series = df[rate_column].fillna(0.0).sort_index()
if series.empty:
return pd.DataFrame(columns=["daily_total", "cumulative_total"])
time_step = _infer_time_step(series.index)
diffs = series.index.to_series().diff()
diffs = diffs.fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = series.to_numpy(dtype=float) * hours.to_numpy(dtype=float)
rainfall_series = pd.Series(rainfall_mm, index=series.index)
daily_totals = rainfall_series.resample("1D").sum()
cumulative = daily_totals.cumsum()
result = pd.DataFrame(
{
"daily_total": daily_totals,
"cumulative_total": cumulative,
}
)
return result
def compute_rainfall_by_season(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
season_column: str = "season",
) -> pd.DataFrame:
"""
Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses.
"""
_ensure_datetime_index(df)
for col in (rate_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
data = df[[rate_column, season_column]].copy()
data[rate_column] = data[rate_column].fillna(0.0)
data = data.dropna(subset=[season_column])
if data.empty:
return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float)
time_step = _infer_time_step(data.index)
diffs = data.index.to_series().diff().fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float)
data["rainfall_mm"] = rainfall_mm
data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float)
agg = data.groupby(season_column).agg(
total_rain_mm=("rainfall_mm", "sum"),
rainy_hours=("rainy_hours", "sum"),
)
order = [season for season in SEASON_LABELS if season in agg.index]
agg = agg.loc[order]
return agg

102
meteo/analysis/seasonal.py Normal file
View File

@ -0,0 +1,102 @@
"""Outils de moyennage saisonnier/mensuel et de profils horaires."""
from __future__ import annotations
from typing import Sequence
import pandas as pd
from meteo.season import SEASON_LABELS
from .core import MONTH_ORDER, _ensure_datetime_index, _infer_time_step
__all__ = ['compute_monthly_climatology', 'compute_monthly_means', 'compute_seasonal_hourly_profile', 'compute_monthly_daylight_hours']
def compute_monthly_climatology(
df: pd.DataFrame,
*,
columns: Sequence[str],
) -> pd.DataFrame:
"""
Moyenne par mois (112) pour les colonnes fournies.
"""
_ensure_datetime_index(df)
missing = [col for col in columns if col not in df.columns]
if missing:
raise KeyError(f"Colonnes absentes : {missing}")
grouped = df[list(columns)].groupby(df.index.month).mean()
grouped = grouped.reindex(MONTH_ORDER)
grouped.index.name = "month"
return grouped
def compute_monthly_means(
df: pd.DataFrame,
*,
columns: Sequence[str],
) -> pd.DataFrame:
"""
Moyennes calendaire par mois (indexé sur la fin de mois).
"""
_ensure_datetime_index(df)
missing = [col for col in columns if col not in df.columns]
if missing:
raise KeyError(f"Colonnes absentes : {missing}")
monthly = df[list(columns)].resample("1ME").mean()
return monthly.dropna(how="all")
def compute_seasonal_hourly_profile(
df: pd.DataFrame,
*,
value_column: str,
season_column: str = "season",
) -> pd.DataFrame:
"""
Retourne une matrice (heures x saisons) contenant la moyenne d'une variable.
"""
_ensure_datetime_index(df)
for col in (value_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
subset = df[[value_column, season_column]].dropna()
if subset.empty:
return pd.DataFrame(index=range(24))
grouped = subset.groupby([season_column, subset.index.hour])[value_column].mean()
pivot = grouped.unstack(season_column)
pivot = pivot.reindex(index=range(24))
order = [season for season in SEASON_LABELS if season in pivot.columns]
if order:
pivot = pivot[order]
pivot.index.name = "hour"
return pivot
def compute_monthly_daylight_hours(
df: pd.DataFrame,
*,
illuminance_column: str = "illuminance",
threshold_lux: float = 1000.0,
) -> pd.Series:
"""
Calcule la durée moyenne de luminosité (> threshold_lux) par mois (en heures par jour).
"""
_ensure_datetime_index(df)
if illuminance_column not in df.columns:
raise KeyError(f"Colonne absente : {illuminance_column}")
subset = df[[illuminance_column]].dropna()
if subset.empty:
return pd.Series(dtype=float)
time_step = _infer_time_step(subset.index)
hours_per_step = time_step.total_seconds() / 3600.0
daylight_flag = (subset[illuminance_column] >= threshold_lux).astype(float)
daylight_hours = daylight_flag * hours_per_step
daily_hours = daylight_hours.resample("1D").sum()
monthly_avg = daily_hours.resample("1ME").mean()
return monthly_avg.dropna()

View File

@ -0,0 +1,140 @@
"""Statistiques descriptives utilisées par les tracés (cycle diurne, regroupements par bins)."""
from __future__ import annotations
from typing import Sequence
import numpy as np
import pandas as pd
from meteo.variables import Variable
from .core import BinnedStatistics, DiurnalCycleStats, _ensure_datetime_index
__all__ = ['compute_diurnal_cycle_statistics', 'compute_binned_statistics']
def compute_diurnal_cycle_statistics(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> DiurnalCycleStats:
"""
Agrège les variables par heure locale pour visualiser un cycle diurne moyen.
"""
_ensure_datetime_index(df)
columns = [v.column for v in variables]
grouped = df[columns].groupby(df.index.hour)
mean_df = grouped.mean()
median_df = grouped.median()
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
q_low = q_high = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped.quantile(q_low)
if q_high is not None:
quantile_high_df = grouped.quantile(q_high)
return DiurnalCycleStats(
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def compute_binned_statistics(
df: pd.DataFrame,
*,
bin_source_column: str,
target_columns: Sequence[str],
bins: Sequence[float] | np.ndarray,
min_count: int = 30,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> BinnedStatistics:
"""
Calcule des statistiques (mean/median/quantiles) pour plusieurs colonnes
en regroupant les données selon des intervalles définis sur une colonne source.
"""
if bin_source_column not in df.columns:
raise KeyError(f"Colonne source absente : {bin_source_column}")
missing_targets = [col for col in target_columns if col not in df.columns]
if missing_targets:
raise KeyError(f"Colonnes cibles absentes : {missing_targets!r}")
subset_cols = [bin_source_column, *target_columns]
data = df[subset_cols].dropna(subset=[bin_source_column])
if data.empty:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
categories = pd.cut(data[bin_source_column], bins=bins, include_lowest=True)
grouped = data.groupby(categories, observed=False)
counts = grouped.size()
valid_mask = counts >= max(1, min_count)
valid_intervals = counts.index[valid_mask]
if len(valid_intervals) == 0:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
interval_index = pd.IntervalIndex(valid_intervals)
mean_df = grouped[target_columns].mean().loc[interval_index]
median_df = grouped[target_columns].median().loc[interval_index]
q_low = q_high = None
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped[target_columns].quantile(q_low).loc[interval_index]
if q_high is not None:
quantile_high_df = grouped[target_columns].quantile(q_high).loc[interval_index]
centers = np.array([interval.mid for interval in interval_index])
filtered_counts = counts.loc[interval_index]
return BinnedStatistics(
centers=centers,
intervals=interval_index,
counts=filtered_counts,
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)

108
meteo/analysis/wind.py Normal file
View File

@ -0,0 +1,108 @@
"""Fonctions spécifiques aux analyses de vent (roses et composantes)."""
from __future__ import annotations
from typing import Sequence
import numpy as np
import pandas as pd
from .core import _ensure_datetime_index
__all__ = ['compute_wind_rose_distribution', 'compute_mean_wind_components']
def _format_speed_bin_labels(speed_bins: Sequence[float]) -> list[str]:
labels: list[str] = []
for i in range(len(speed_bins) - 1):
low = speed_bins[i]
high = speed_bins[i + 1]
if np.isinf(high):
labels.append(f"{low:g}")
else:
labels.append(f"{low:g}{high:g}")
return labels
def compute_wind_rose_distribution(
df: pd.DataFrame,
*,
direction_sector_size: int = 30,
speed_bins: Sequence[float] = (0, 10, 20, 30, 50, float("inf")),
) -> tuple[pd.DataFrame, list[str], float]:
"""
Regroupe la distribution vent/direction en secteurs angulaires et classes de vitesse.
Retourne un DataFrame indexé par le début du secteur (en degrés) et colonnes = classes de vitesse (%).
"""
if direction_sector_size <= 0 or direction_sector_size > 180:
raise ValueError("direction_sector_size doit être compris entre 1 et 180 degrés.")
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Le DataFrame doit contenir 'wind_speed' et 'wind_direction'.")
data = df[["wind_speed", "wind_direction"]].dropna()
if data.empty:
return pd.DataFrame(), [], float(direction_sector_size)
n_sectors = int(360 / direction_sector_size)
direction = data["wind_direction"].to_numpy(dtype=float) % 360.0
sector_indices = np.floor(direction / direction_sector_size).astype(int) % n_sectors
bins = list(speed_bins)
if not np.isinf(bins[-1]):
bins.append(float("inf"))
labels = _format_speed_bin_labels(bins)
speed_categories = pd.cut(
data["wind_speed"],
bins=bins,
right=False,
include_lowest=True,
labels=labels,
)
counts = (
pd.crosstab(sector_indices, speed_categories)
.reindex(range(n_sectors), fill_value=0)
.reindex(columns=labels, fill_value=0)
)
total = counts.values.sum()
frequencies = counts / total * 100.0 if total > 0 else counts.astype(float)
frequencies.index = frequencies.index * direction_sector_size
return frequencies, labels, float(direction_sector_size)
def compute_mean_wind_components(
df: pd.DataFrame,
*,
freq: str = "1M",
) -> pd.DataFrame:
"""
Calcule les composantes zonale (u) et méridienne (v) du vent pour une fréquence donnée.
Retourne également la vitesse moyenne.
"""
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Les colonnes 'wind_speed' et 'wind_direction' sont requises.")
_ensure_datetime_index(df)
subset = df[["wind_speed", "wind_direction"]].dropna()
if subset.empty:
return pd.DataFrame(columns=["u", "v", "speed"])
radians = np.deg2rad(subset["wind_direction"].to_numpy(dtype=float))
speed = subset["wind_speed"].to_numpy(dtype=float)
u = speed * np.sin(radians) * -1 # composante est-ouest (positive vers l'est)
v = speed * np.cos(radians) * -1 # composante nord-sud (positive vers le nord)
vector_df = pd.DataFrame(
{
"u": u,
"v": v,
"speed": speed,
},
index=subset.index,
)
actual_freq = "1ME" if freq == "1M" else freq
grouped = vector_df.resample(actual_freq).mean()
return grouped.dropna(how="all")

View File

@ -100,14 +100,9 @@ class StationLocation:
"pour calculer l'élévation solaire."
)
try:
latitude = float(lat)
longitude = float(lon)
elevation = float(elev) if elev else 0.0
except ValueError as exc:
raise RuntimeError(
"STATION_LATITUDE / STATION_LONGITUDE / STATION_ELEVATION doivent être des nombres valides."
) from exc
latitude = float(lat)
longitude = float(lon)
elevation = float(elev) if elev else 0.0
return cls(latitude=latitude, longitude=longitude, elevation_m=elevation)

File diff suppressed because it is too large Load Diff

50
meteo/plots/__init__.py Normal file
View File

@ -0,0 +1,50 @@
from __future__ import annotations
from .base import export_plot_dataset
from .calendar import plot_calendar_heatmap, plot_weekday_profiles
from .correlations import (
plot_correlation_heatmap,
plot_lagged_correlation,
plot_rolling_correlation_heatmap,
)
from .rain import plot_daily_rainfall_hyetograph, plot_rainfall_by_season
from .relationships import (
plot_event_composite,
plot_hexbin_with_third_variable,
plot_scatter_pair,
)
from .seasonal_profiles import (
plot_daylight_hours,
plot_diurnal_cycle,
plot_seasonal_hourly_profiles,
)
from .seasonal_stats import (
plot_binned_profiles,
plot_monthly_anomalies,
plot_monthly_boxplots,
plot_seasonal_boxplots,
)
from .wind import plot_wind_rose, plot_wind_vector_series
__all__ = [
"export_plot_dataset",
"plot_calendar_heatmap",
"plot_weekday_profiles",
"plot_correlation_heatmap",
"plot_lagged_correlation",
"plot_rolling_correlation_heatmap",
"plot_daily_rainfall_hyetograph",
"plot_rainfall_by_season",
"plot_event_composite",
"plot_hexbin_with_third_variable",
"plot_scatter_pair",
"plot_daylight_hours",
"plot_diurnal_cycle",
"plot_seasonal_hourly_profiles",
"plot_binned_profiles",
"plot_monthly_anomalies",
"plot_monthly_boxplots",
"plot_seasonal_boxplots",
"plot_wind_rose",
"plot_wind_vector_series",
]

50
meteo/plots/base.py Normal file
View File

@ -0,0 +1,50 @@
"""Fonctions utilitaires pour exporter les jeux de données associés aux figures."""
from __future__ import annotations
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
__all__ = ["export_plot_dataset"]
def export_plot_dataset(data: Any, output_path: str | Path, *, suffix: str = ".csv") -> Path | None:
"""
Sauvegarde, en regard du fichier image exporté, les données brutes ayant servi à construire la figure.
"""
if data is None:
return None
output_path = Path(output_path)
dataset_path = output_path.with_suffix(suffix)
dataset_path.parent.mkdir(parents=True, exist_ok=True)
def _normalize(value: Any, *, default_name: str = "value") -> pd.DataFrame:
if isinstance(value, pd.DataFrame):
return value.copy()
if isinstance(value, pd.Series):
return value.to_frame(name=value.name or default_name)
if isinstance(value, np.ndarray):
return pd.DataFrame(value)
return pd.DataFrame(value)
if isinstance(data, dict):
frames: list[pd.DataFrame] = []
for key, value in data.items():
if value is None:
continue
frame = _normalize(value, default_name=str(key))
frame = pd.concat({str(key): frame}, axis=1)
frames.append(frame)
if not frames:
return None
export_df = pd.concat(frames, axis=1)
else:
export_df = _normalize(data)
export_df.to_csv(dataset_path)
return dataset_path

114
meteo/plots/calendar.py Normal file
View File

@ -0,0 +1,114 @@
"""Tracés orientés calendrier (heatmaps quotidiennes et profils hebdomadaires)."""
from __future__ import annotations
from pathlib import Path
from typing import Sequence
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .base import export_plot_dataset
from meteo.variables import Variable
__all__ = ['plot_calendar_heatmap', 'plot_weekday_profiles']
def plot_calendar_heatmap(
matrix: pd.DataFrame,
output_path: str | Path,
*,
title: str,
cmap: str = "YlGnBu",
colorbar_label: str = "",
) -> Path:
"""
Affiche une heatmap calendrier (lignes = mois, colonnes = jours).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
export_plot_dataset(matrix, output_path)
if matrix.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données pour la heatmap.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots(figsize=(14, 6))
data = matrix.to_numpy(dtype=float)
im = ax.imshow(data, aspect="auto", cmap=cmap, interpolation="nearest")
ax.set_xticks(np.arange(matrix.shape[1]))
ax.set_xticklabels(matrix.columns, rotation=90)
ax.set_yticks(np.arange(matrix.shape[0]))
ax.set_yticklabels(matrix.index)
ax.set_xlabel("Jour du mois")
ax.set_ylabel("Mois")
ax.set_title(title)
cbar = fig.colorbar(im, ax=ax)
if colorbar_label:
cbar.set_label(colorbar_label)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_weekday_profiles(
weekday_df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
title: str,
) -> Path:
"""
Affiche les moyennes par jour de semaine pour plusieurs variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if weekday_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données hebdomadaires.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_plot_dataset(weekday_df, output_path)
weekday_labels = ["Lun", "Mar", "Mer", "Jeu", "Ven", "Sam", "Dim"]
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
x = np.arange(len(weekday_labels))
for ax, var in zip(axes, variables):
if var.column not in weekday_df.columns:
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
values = weekday_df[var.column].to_numpy(dtype=float)
ax.plot(x, values, marker="o", label=var.label)
ax.set_ylabel(f"{var.label} ({var.unit})" if var.unit else var.label)
ax.grid(True, linestyle=":", alpha=0.5)
ax.set_xticks(x)
ax.set_xticklabels(weekday_labels)
axes[-1].set_xlabel("Jour de semaine")
axes[0].legend(loc="upper right")
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

182
meteo/plots/correlations.py Normal file
View File

@ -0,0 +1,182 @@
"""Visualisations d'indicateurs de corrélation (heatmaps et séries décalées)."""
from __future__ import annotations
from pathlib import Path
from typing import Sequence
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .base import export_plot_dataset
from meteo.variables import Variable
__all__ = ['plot_lagged_correlation', 'plot_correlation_heatmap', 'plot_rolling_correlation_heatmap']
def plot_lagged_correlation(
lag_df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
output_path: str | Path,
) -> Path:
"""
Trace la corrélation en fonction du lag (en minutes) entre deux variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
export_plot_dataset(lag_df, output_path)
plt.figure()
plt.plot(lag_df.index, lag_df["correlation"])
plt.axvline(0, linestyle="--") # lag = 0
plt.xlabel("Décalage (minutes)\n(lag > 0 : X précède Y)")
plt.ylabel("Corrélation")
plt.title(f"Corrélation décalée : {var_x.label}{var_y.label}")
plt.grid(True)
plt.tight_layout()
plt.savefig(output_path, dpi=150)
plt.close()
return output_path.resolve()
def plot_correlation_heatmap(
corr: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
annotate: bool = True,
) -> Path:
"""
Trace une heatmap de la matrice de corrélation.
Paramètres
----------
corr :
Matrice de corrélation (index et colonnes doivent correspondre
aux noms de colonnes des variables).
variables :
Liste de Variable, dans l'ordre où elles doivent apparaître.
output_path :
Chemin du fichier image à écrire.
annotate :
Si True, affiche la valeur numérique dans chaque case.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
columns = [v.column for v in variables]
labels = [v.label for v in variables]
# On aligne la matrice sur l'ordre désiré
corr = corr.loc[columns, columns]
export_plot_dataset(corr, output_path)
data = corr.to_numpy()
fig, ax = plt.subplots()
im = ax.imshow(data, vmin=-1.0, vmax=1.0)
# Ticks et labels
ax.set_xticks(np.arange(len(labels)))
ax.set_yticks(np.arange(len(labels)))
ax.set_xticklabels(labels, rotation=45, ha="right")
ax.set_yticklabels(labels)
# Axe en haut/bas selon préférence (ici on laisse en bas)
ax.set_title("Matrice de corrélation (coef. de Pearson)")
# Barre de couleur
cbar = plt.colorbar(im, ax=ax)
cbar.set_label("Corrélation")
# Annotation des cases
if annotate:
n = data.shape[0]
for i in range(n):
for j in range(n):
if i == j:
text = ""
else:
val = data[i, j]
if np.isnan(val):
text = ""
else:
text = f"{val:.2f}"
ax.text(
j,
i,
text,
ha="center",
va="center",
)
plt.tight_layout()
plt.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_rolling_correlation_heatmap(
rolling_corr: pd.DataFrame,
output_path: str | Path,
*,
cmap: str = "coolwarm",
vmin: float = -1.0,
vmax: float = 1.0,
time_tick_count: int = 6,
) -> Path:
"""
Visualise l'évolution de corrélations glissantes pour plusieurs paires.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
export_plot_dataset(rolling_corr, output_path)
if rolling_corr.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Aucune donnée de corrélation glissante.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
labels = list(rolling_corr.columns)
data = rolling_corr.to_numpy().T
height = max(3.0, 0.6 * len(labels))
fig, ax = plt.subplots(figsize=(10, height))
im = ax.imshow(data, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax)
ax.set_yticks(np.arange(len(labels)))
ax.set_yticklabels(labels)
if isinstance(rolling_corr.index, pd.DatetimeIndex):
times = rolling_corr.index
if len(times) > 1:
tick_idx = np.linspace(0, len(times) - 1, num=min(time_tick_count, len(times)), dtype=int)
else:
tick_idx = np.array([0])
tick_labels = [times[i].strftime("%Y-%m-%d\n%H:%M") for i in tick_idx]
else:
tick_idx = np.linspace(0, len(rolling_corr.index) - 1, num=min(time_tick_count, len(rolling_corr.index)), dtype=int)
tick_labels = [str(rolling_corr.index[i]) for i in tick_idx]
ax.set_xticks(tick_idx)
ax.set_xticklabels(tick_labels, rotation=30, ha="right")
ax.set_xlabel("Temps (fin de fenêtre)")
ax.set_ylabel("Paire de variables")
ax.set_title("Corrélations glissantes")
cbar = fig.colorbar(im, ax=ax)
cbar.set_label("Coefficient de corrélation")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

142
meteo/plots/rain.py Normal file
View File

@ -0,0 +1,142 @@
"""Graphiques consacrés aux cumuls de pluie et à leur répartition temporelle."""
from __future__ import annotations
from pathlib import Path
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .base import export_plot_dataset
__all__ = ['plot_daily_rainfall_hyetograph', 'plot_rainfall_by_season']
def plot_daily_rainfall_hyetograph(
daily_rain: pd.DataFrame,
output_path: str | Path,
) -> Path:
"""
Affiche les cumuls quotidiens de pluie (barres) et le cumul annuel (ligne).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if daily_rain.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de précipitations disponibles.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_plot_dataset(daily_rain, output_path)
fig, ax1 = plt.subplots(figsize=(12, 5))
ax1.bar(
daily_rain.index,
daily_rain["daily_total"],
width=0.8,
color="tab:blue",
alpha=0.7,
label="Pluie quotidienne",
)
ax1.set_ylabel("Pluie quotidienne (mm)")
ax1.set_xlabel("Date")
ax1.grid(True, axis="y", linestyle=":", alpha=0.5)
ax2 = ax1.twinx()
ax2.plot(
daily_rain.index,
daily_rain["cumulative_total"],
color="tab:red",
linewidth=2,
label="Cumul annuel",
)
ax2.set_ylabel("Pluie cumulée (mm)")
locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
ax1.xaxis.set_major_locator(locator)
ax1.xaxis.set_major_formatter(formatter)
lines_labels = [
(ax1.get_legend_handles_labels()),
(ax2.get_legend_handles_labels()),
]
lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]
ax1.legend(lines, labels, loc="upper left")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_rainfall_by_season(
rainfall_df: pd.DataFrame,
output_path: str | Path,
*,
title: str = "Pluie cumulée par saison",
) -> Path:
"""
Affiche la pluie cumulée par saison ainsi que le nombre d'heures pluvieuses.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if rainfall_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de pluie saisonnière.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_plot_dataset(rainfall_df, output_path)
seasons = rainfall_df.index.tolist()
x = np.arange(len(seasons))
totals = rainfall_df["total_rain_mm"].to_numpy(dtype=float)
fig, ax1 = plt.subplots(figsize=(9, 4))
bars = ax1.bar(x, totals, color="tab:blue", alpha=0.7, label="Pluie cumulée")
ax1.set_ylabel("Pluie cumulée (mm)")
ax1.set_xlabel("Saison")
ax1.set_xticks(x)
ax1.set_xticklabels([season.capitalize() for season in seasons])
ax1.grid(True, axis="y", linestyle=":", alpha=0.5)
for rect, value in zip(bars, totals):
height = rect.get_height()
ax1.text(rect.get_x() + rect.get_width() / 2, height, f"{value:.0f}", ha="center", va="bottom", fontsize=8)
lines = []
labels = []
if "rainy_hours" in rainfall_df.columns:
ax2 = ax1.twinx()
rainy_hours = rainfall_df["rainy_hours"].to_numpy(dtype=float)
line = ax2.plot(
x,
rainy_hours,
color="tab:red",
marker="o",
label="Heures pluvieuses",
)[0]
ax2.set_ylabel("Heures pluvieuses")
lines.append(line)
labels.append("Heures pluvieuses")
handles, lbls = ax1.get_legend_handles_labels()
handles.extend(lines)
lbls.extend(labels)
if handles:
ax1.legend(handles, lbls, loc="upper left")
ax1.set_title(title)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

View File

@ -0,0 +1,345 @@
"""Fonctions de tracé pour comparer directement deux ou trois variables."""
from __future__ import annotations
from pathlib import Path
from typing import Callable, Sequence
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
import numpy as np
import pandas as pd
from .base import export_plot_dataset
from meteo.variables import Variable
__all__ = ['plot_scatter_pair', 'plot_hexbin_with_third_variable', 'plot_event_composite']
def plot_scatter_pair(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
output_path: str | Path,
*,
sample_step: int = 10,
color_by_time: bool = True,
cmap: str = "viridis",
) -> Path:
"""
Trace un nuage de points (scatter) pour une paire de variables.
- On sous-échantillonne les données avec `sample_step` (par exemple,
1 point sur 10) pour éviter un graphique illisible.
- Si `color_by_time` vaut True et que l'index est temporel, les points
sont colorés du plus ancien (sombre) au plus récent (clair).
- Lorsque l'axe Y correspond à la direction du vent, on bascule sur
un graphique polaire plus adapté (0° = Nord, sens horaire) avec
un rayon normalisé : centre = valeur minimale, bord = maximale.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# On ne garde que les colonnes pertinentes et les lignes complètes
df_pair = df[[var_x.column, var_y.column]].dropna()
if sample_step > 1:
df_pair = df_pair.iloc[::sample_step, :]
export_plot_dataset(df_pair, output_path)
direction_var: Variable | None = None
radial_var: Variable | None = None
direction_series: pd.Series | None = None
radial_series: pd.Series | None = None
if var_y.key == "wind_direction" and var_x.key != "wind_direction":
direction_var = var_y
direction_series = df_pair[var_y.column]
radial_var = var_x
radial_series = df_pair[var_x.column]
elif var_x.key == "wind_direction" and var_y.key != "wind_direction":
direction_var = var_x
direction_series = df_pair[var_x.column]
radial_var = var_y
radial_series = df_pair[var_y.column]
use_polar = direction_var is not None and radial_var is not None
if use_polar:
fig, ax = plt.subplots(subplot_kw={"projection": "polar"})
else:
fig, ax = plt.subplots()
scatter_kwargs: dict = {"s": 5, "alpha": 0.5}
colorbar_meta: dict | None = None
if color_by_time and isinstance(df_pair.index, pd.DatetimeIndex):
idx = df_pair.index
timestamps = idx.view("int64")
time_span = np.ptp(timestamps)
norm = (
Normalize(vmin=timestamps.min(), vmax=timestamps.max())
if time_span > 0
else None
)
scatter_kwargs |= {"c": timestamps, "cmap": cmap}
if norm is not None:
scatter_kwargs["norm"] = norm
colorbar_meta = {
"index": idx,
"timestamps": timestamps,
"time_span": time_span,
}
if use_polar:
assert direction_series is not None and radial_series is not None
assert direction_var is not None and radial_var is not None
theta = np.deg2rad(direction_series.to_numpy(dtype=float) % 360.0)
radius_raw = radial_series.to_numpy(dtype=float)
if radius_raw.size == 0:
radius = radius_raw
value_min = value_max = float("nan")
else:
value_min = float(np.min(radius_raw))
value_max = float(np.max(radius_raw))
if np.isclose(value_min, value_max):
radius = np.zeros_like(radius_raw)
else:
radius = (radius_raw - value_min) / (value_max - value_min)
scatter = ax.scatter(theta, radius, **scatter_kwargs)
cardinal_angles = np.deg2rad(np.arange(0, 360, 45))
cardinal_labels = ["N", "NE", "E", "SE", "S", "SO", "O", "NO"]
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
ax.set_xticks(cardinal_angles)
ax.set_xticklabels(cardinal_labels)
if radius_raw.size > 0:
if np.isclose(value_min, value_max):
radial_positions = [0.0]
else:
radial_positions = np.linspace(0.0, 1.0, num=5).tolist()
if np.isclose(value_min, value_max):
actual_values = [value_min]
else:
actual_values = [
value_min + pos * (value_max - value_min)
for pos in radial_positions
]
ax.set_yticks(radial_positions)
ax.set_yticklabels([f"{val:.1f}" for val in actual_values])
ax.set_rlabel_position(225)
ax.set_ylim(0.0, 1.0)
unit_suffix = f" {radial_var.unit}" if radial_var.unit else ""
ax.text(
0.5,
-0.1,
f"Centre = {value_min:.1f}{unit_suffix}, bord = {value_max:.1f}{unit_suffix}",
transform=ax.transAxes,
ha="center",
va="top",
fontsize=8,
)
radial_label = f"{radial_var.label} ({radial_var.unit})" if radial_var.unit else radial_var.label
ax.set_ylabel(radial_label, labelpad=20)
else:
scatter = ax.scatter(
df_pair[var_x.column],
df_pair[var_y.column],
**scatter_kwargs,
)
if colorbar_meta is not None:
cbar = fig.colorbar(scatter, ax=ax)
idx = colorbar_meta["index"]
timestamps = colorbar_meta["timestamps"]
time_span = colorbar_meta["time_span"]
def _format_tick_label(ts: pd.Timestamp) -> str:
base = f"{ts.strftime('%Y-%m-%d')}\n{ts.strftime('%H:%M')}"
tz_name = ts.tzname()
return f"{base} ({tz_name})" if tz_name else base
if time_span > 0:
tick_datetimes = pd.date_range(start=idx.min(), end=idx.max(), periods=5)
tick_positions = tick_datetimes.view("int64")
tick_labels = [_format_tick_label(ts) for ts in tick_datetimes]
cbar.set_ticks(tick_positions)
cbar.set_ticklabels(tick_labels)
else:
cbar.set_ticks([timestamps[0]])
ts = idx[0]
cbar.set_ticklabels([_format_tick_label(ts)])
cbar.set_label("Temps (ancien → récent)")
if use_polar:
assert direction_var is not None and radial_var is not None
ax.set_title(f"{radial_var.label} en fonction de {direction_var.label}")
else:
ax.set_xlabel(f"{var_x.label} ({var_x.unit})")
ax.set_ylabel(f"{var_y.label} ({var_y.unit})")
ax.set_title(f"{var_y.label} en fonction de {var_x.label}")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_hexbin_with_third_variable(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
var_color: Variable,
output_path: str | Path,
*,
gridsize: int = 60,
mincnt: int = 5,
reduce_func: Callable[[np.ndarray], float] | None = None,
reduce_func_label: str | None = None,
cmap: str = "viridis",
) -> Path:
"""
Trace une carte de densité hexbin la couleur encode une 3e variable.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
reduce_func = reduce_func or np.mean
df_xyz = df[[var_x.column, var_y.column, var_color.column]].dropna()
export_plot_dataset(df_xyz, output_path)
if df_xyz.empty:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Pas de données valides pour cette combinaison.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
fig, ax = plt.subplots()
hb = ax.hexbin(
df_xyz[var_x.column],
df_xyz[var_y.column],
C=df_xyz[var_color.column],
reduce_C_function=reduce_func,
gridsize=gridsize,
cmap=cmap,
mincnt=mincnt,
)
func_label = reduce_func_label or getattr(reduce_func, "__name__", "statistique")
colorbar_label = f"{func_label.capitalize()} de {var_color.label}"
cbar = fig.colorbar(hb, ax=ax)
cbar.set_label(colorbar_label)
ax.set_xlabel(f"{var_x.label} ({var_x.unit})")
ax.set_ylabel(f"{var_y.label} ({var_y.unit})")
ax.set_title(
f"{var_y.label} vs {var_x.label}\nCouleur : {func_label} de {var_color.label}"
)
ax.grid(False)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_event_composite(
aligned_segments: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
quantiles: tuple[float, float] = (0.25, 0.75),
baseline_label: str = "Début de l'événement",
) -> Path:
"""
Trace les moyennes/médianes autour d'événements détectés avec éventail inter-quantiles.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if aligned_segments.empty:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Aucun événement aligné à tracer.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
if "offset_minutes" not in aligned_segments.index.names:
raise ValueError("aligned_segments doit avoir un niveau 'offset_minutes'.")
group = aligned_segments.groupby(level="offset_minutes")
mean_df = group.mean()
median_df = group.median()
q_low, q_high = quantiles
quantile_low = group.quantile(q_low) if q_low is not None else None
quantile_high = group.quantile(q_high) if q_high is not None else None
export_plot_dataset(
{
"mean": mean_df,
"median": median_df,
"quantile_low": quantile_low,
"quantile_high": quantile_high,
},
output_path,
)
offsets = mean_df.index.to_numpy(dtype=float)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
col = var.column
ax.axvline(0, color="black", linestyle="--", linewidth=1, label=baseline_label)
ax.plot(offsets, mean_df[col], color="tab:blue", label="Moyenne")
ax.plot(offsets, median_df[col], color="tab:orange", linestyle="--", label="Médiane")
if quantile_low is not None and quantile_high is not None:
ax.fill_between(
offsets,
quantile_low[col],
quantile_high[col],
color="tab:blue",
alpha=0.2,
label=f"IQR {int(q_low*100)}{int(q_high*100)}%",
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Minutes autour de l'événement")
axes[0].legend(loc="upper right")
total_events = len(aligned_segments.index.get_level_values("event_id").unique())
fig.suptitle(f"Composites autour d'événements ({total_events} occurrences)")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

View File

@ -0,0 +1,151 @@
"""Profils horaires/saisonniers liés à l'irradiance et aux cycles diurnes."""
from __future__ import annotations
from pathlib import Path
from typing import Sequence
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .base import export_plot_dataset
from meteo.analysis import DiurnalCycleStats
from meteo.variables import Variable
__all__ = ['plot_diurnal_cycle', 'plot_seasonal_hourly_profiles', 'plot_daylight_hours']
def plot_diurnal_cycle(
stats: DiurnalCycleStats,
variables: Sequence[Variable],
output_path: str | Path,
) -> Path:
"""
Trace les cycles diurnes moyens (moyenne/médiane + quantiles).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
export_plot_dataset(
{
"mean": stats.mean,
"median": stats.median,
"quantile_low": stats.quantile_low,
"quantile_high": stats.quantile_high,
},
output_path,
)
hours = stats.mean.index.to_numpy(dtype=float)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
col = var.column
ax.plot(hours, stats.mean[col], label="Moyenne", color="tab:blue")
ax.plot(hours, stats.median[col], label="Médiane", color="tab:orange", linestyle="--")
if stats.quantile_low is not None and stats.quantile_high is not None:
ax.fill_between(
hours,
stats.quantile_low[col],
stats.quantile_high[col],
color="tab:blue",
alpha=0.15,
label=(
f"Quantiles {int(stats.quantile_low_level * 100)}{int(stats.quantile_high_level * 100)}%"
if stats.quantile_low_level is not None and stats.quantile_high_level is not None
else "Quantiles"
),
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Heure locale")
axes[0].legend(loc="upper right")
axes[-1].set_xticks(range(0, 24, 2))
axes[-1].set_xlim(0, 23)
fig.suptitle("Cycle diurne moyen")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_seasonal_hourly_profiles(
profile_df: pd.DataFrame,
output_path: str | Path,
*,
title: str,
ylabel: str,
) -> Path:
"""
Courbes moyennes par heure pour chaque saison.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if profile_df.empty or profile_df.isna().all().all():
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de profil saisonnier disponible.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_plot_dataset(profile_df, output_path)
hours = profile_df.index.to_numpy(dtype=float)
fig, ax = plt.subplots(figsize=(10, 4))
colors = plt.get_cmap("turbo")(np.linspace(0.1, 0.9, profile_df.shape[1]))
for color, season in zip(colors, profile_df.columns):
ax.plot(hours, profile_df[season], label=season.capitalize(), color=color)
ax.set_xlabel("Heure locale")
ax.set_ylabel(ylabel)
ax.set_title(title)
ax.grid(True, linestyle=":", alpha=0.5)
ax.legend()
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_daylight_hours(
monthly_series: pd.Series,
output_path: str | Path,
*,
title: str = "Durée moyenne de luminosité (> seuil)",
) -> Path:
"""
Représente la durée moyenne quotidienne de luminosité par mois.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if monthly_series.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données sur la luminosité.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_plot_dataset(monthly_series, output_path)
months = monthly_series.index
fig, ax = plt.subplots(figsize=(10, 4))
ax.bar(months, monthly_series.values, color="goldenrod", alpha=0.8)
ax.set_ylabel("Heures de luminosité par jour")
ax.set_xlabel("Mois")
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.ConciseDateFormatter(ax.xaxis.get_major_locator()))
ax.set_title(title)
ax.grid(True, axis="y", linestyle=":", alpha=0.5)
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

View File

@ -0,0 +1,351 @@
"""Visualisations statistiques agrégées par saison, mois ou intervalles spécialisés."""
from __future__ import annotations
import calendar
from pathlib import Path
from typing import Sequence
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .base import export_plot_dataset
from meteo.analysis import BinnedStatistics, MONTH_ORDER
from meteo.season import SEASON_LABELS
from meteo.variables import Variable
__all__ = ['plot_seasonal_boxplots', 'plot_monthly_boxplots', 'plot_binned_profiles', 'plot_monthly_anomalies']
def plot_seasonal_boxplots(
df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
season_column: str = "season",
season_order: Sequence[str] | None = None,
title: str | None = None,
) -> Path:
"""
Trace des boxplots par saison pour une sélection de variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if season_column not in df.columns:
raise KeyError(f"Colonne saison absente : {season_column}")
available = df[season_column].dropna().unique()
if season_order is None:
season_order = [season for season in SEASON_LABELS if season in available]
else:
season_order = [season for season in season_order if season in available]
if not season_order:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Aucune donnée saisonnière disponible.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
dataset_columns = [season_column] + [var.column for var in variables]
export_plot_dataset(df[dataset_columns], output_path)
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
colors = plt.get_cmap("Set3")(np.linspace(0.2, 0.8, len(season_order)))
labels = [season.capitalize() for season in season_order]
for ax, var in zip(axes, variables):
data = [
df.loc[df[season_column] == season, var.column].dropna().to_numpy()
for season in season_order
]
if not any(len(arr) > 0 for arr in data):
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
box = ax.boxplot(
data,
labels=labels,
showfliers=False,
patch_artist=True,
)
for patch, color in zip(box["boxes"], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Saison")
if title:
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.95])
else:
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_monthly_boxplots(
df: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
) -> Path:
"""
Boxplots par mois (janvier décembre) pour plusieurs variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("plot_monthly_boxplots nécessite un DatetimeIndex.")
value_columns = [var.column for var in variables]
dataset = df[value_columns].copy()
dataset.insert(0, "month", df.index.month)
export_plot_dataset(dataset, output_path)
month_labels = [calendar.month_abbr[m].capitalize() for m in MONTH_ORDER]
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(12, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
for ax, var in zip(axes, variables):
data = [
df.loc[df.index.month == month, var.column].dropna().to_numpy()
for month in MONTH_ORDER
]
if not any(len(arr) > 0 for arr in data):
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
box = ax.boxplot(
data,
labels=month_labels,
showfliers=False,
patch_artist=True,
)
colors = plt.get_cmap("Spectral")(np.linspace(0.2, 0.8, len(data)))
for patch, color in zip(box["boxes"], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel("Mois")
fig.suptitle("Distribution mensuelle")
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_binned_profiles(
stats: BinnedStatistics,
variables: Sequence[Variable],
output_path: str | Path,
*,
xlabel: str,
title: str,
show_counts: bool = False,
) -> Path:
"""
Trace les statistiques agrégées d'une ou plusieurs variables en fonction de bins.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if stats.centers.size == 0:
fig, ax = plt.subplots()
ax.text(
0.5,
0.5,
"Aucune donnée suffisante pour ces intervalles.",
ha="center",
va="center",
)
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
bin_summary = pd.DataFrame(
{
"bin_left": stats.intervals.left,
"bin_right": stats.intervals.right,
"center": stats.centers,
}
)
export_plot_dataset(
{
"bins": bin_summary,
"counts": stats.counts,
"mean": stats.mean,
"median": stats.median,
"quantile_low": stats.quantile_low,
"quantile_high": stats.quantile_high,
},
output_path,
)
base_axes = len(variables)
total_axes = base_axes + (1 if show_counts else 0)
fig, axes = plt.subplots(
total_axes,
1,
sharex=True,
figsize=(10, 3 * total_axes),
)
if total_axes == 1:
axes = [axes]
else:
axes = list(axes)
x_values = stats.centers
bin_widths = np.array([interval.length for interval in stats.intervals])
if show_counts:
count_ax = axes.pop(0)
count_ax.bar(
x_values,
stats.counts.to_numpy(dtype=float),
width=bin_widths,
color="lightgray",
edgecolor="gray",
align="center",
)
count_ax.set_ylabel("Nombre de points")
count_ax.grid(True, linestyle=":", alpha=0.4)
count_ax.set_title("Densité des observations par bin")
for ax, var in zip(axes, variables):
col = var.column
ax.plot(x_values, stats.mean[col], color="tab:blue", label="Moyenne")
ax.plot(x_values, stats.median[col], color="tab:orange", linestyle="--", label="Médiane")
if stats.quantile_low is not None and stats.quantile_high is not None:
ax.fill_between(
x_values,
stats.quantile_low[col],
stats.quantile_high[col],
color="tab:blue",
alpha=0.15,
label=(
f"Quantiles {int(stats.quantile_low_level * 100)}{int(stats.quantile_high_level * 100)}%"
if stats.quantile_low_level is not None and stats.quantile_high_level is not None
else "Quantiles"
),
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
axes[-1].set_xlabel(xlabel)
axes[0].legend(loc="upper right")
axes[-1].set_xlim(stats.intervals.left.min(), stats.intervals.right.max())
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()
def plot_monthly_anomalies(
monthly_means: pd.DataFrame,
climatology: pd.DataFrame,
variables: Sequence[Variable],
output_path: str | Path,
*,
title: str = "Moyennes mensuelles vs climatologie",
) -> Path:
"""
Compare les moyennes mensuelles observées à la climatologie pour plusieurs variables.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if monthly_means.empty or climatology.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données mensuelles disponibles.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_frames: list[pd.DataFrame] = []
n_vars = len(variables)
fig, axes = plt.subplots(n_vars, 1, figsize=(12, 3 * n_vars), sharex=True)
if n_vars == 1:
axes = [axes]
locator = mdates.AutoDateLocator()
formatter = mdates.ConciseDateFormatter(locator)
for ax, var in zip(axes, variables):
actual = monthly_means[var.column].dropna()
if actual.empty:
ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center")
ax.set_axis_off()
continue
months = actual.index.month
clim = climatology.loc[months, var.column].to_numpy(dtype=float)
anomaly = actual.to_numpy(dtype=float) - clim
clim_series = pd.Series(clim, index=actual.index, name="climatology")
frame = pd.DataFrame({"actual": actual, "climatology": clim_series})
frame["anomaly"] = frame["actual"] - frame["climatology"]
export_frames.append(pd.concat({var.column: frame}, axis=1))
ax.plot(actual.index, actual, color="tab:blue", label="Moyenne mensuelle")
ax.plot(actual.index, clim, color="tab:gray", linestyle="--", label="Climatologie")
ax.fill_between(
actual.index,
actual,
clim,
where=anomaly >= 0,
color="tab:blue",
alpha=0.15,
)
ax.fill_between(
actual.index,
actual,
clim,
where=anomaly < 0,
color="tab:red",
alpha=0.15,
)
ylabel = f"{var.label} ({var.unit})" if var.unit else var.label
ax.set_ylabel(ylabel)
ax.grid(True, linestyle=":", alpha=0.5)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
if export_frames:
export_plot_dataset(pd.concat(export_frames, axis=1), output_path)
axes[-1].set_xlabel("Date")
axes[0].legend(loc="upper right")
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

145
meteo/plots/wind.py Normal file
View File

@ -0,0 +1,145 @@
"""Tracés dédiés aux analyses du vent (roses et vecteurs agrégés)."""
from __future__ import annotations
from pathlib import Path
from typing import Sequence
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import numpy as np
import pandas as pd
from .base import export_plot_dataset
__all__ = ['plot_wind_rose', 'plot_wind_vector_series']
def plot_wind_rose(
frequencies: pd.DataFrame,
speed_bin_labels: Sequence[str],
output_path: str | Path,
*,
sector_size_deg: float,
cmap: str = "viridis",
) -> Path:
"""
Trace une rose des vents empilée par classes de vitesses (en % du temps).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if frequencies.empty:
fig, ax = plt.subplots(subplot_kw={"projection": "polar"})
ax.text(0.5, 0.5, "Données de vent insuffisantes.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
dataset = frequencies.copy()
dataset.insert(0, "sector_start_deg", frequencies.index)
dataset.insert(1, "sector_center_deg", frequencies.index + sector_size_deg / 2.0)
export_plot_dataset(dataset, output_path)
fig, ax = plt.subplots(subplot_kw={"projection": "polar"}, figsize=(6, 6))
cmap_obj = plt.get_cmap(cmap, len(speed_bin_labels))
colors = cmap_obj(np.linspace(0.2, 0.95, len(speed_bin_labels)))
angles = np.deg2rad(frequencies.index.to_numpy(dtype=float) + sector_size_deg / 2.0)
width = np.deg2rad(sector_size_deg)
bottom = np.zeros_like(angles, dtype=float)
for label, color in zip(speed_bin_labels, colors):
values = frequencies[label].to_numpy(dtype=float)
bars = ax.bar(
angles,
values,
width=width,
bottom=bottom,
color=color,
edgecolor="white",
linewidth=0.5,
align="center",
)
bottom += values
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
ax.set_xticks(np.deg2rad(np.arange(0, 360, 45)))
ax.set_xticklabels(["N", "NE", "E", "SE", "S", "SO", "O", "NO"])
max_radius = np.max(bottom)
ax.set_ylim(0, max(max_radius * 1.1, 1))
ax.yaxis.set_major_formatter(FuncFormatter(lambda val, _pos: f"{val:.0f}%"))
ax.set_title("Rose des vents (fréquence en %)")
legend_handles = [
plt.Line2D([0], [0], color=color, linewidth=6, label=label) for label, color in zip(speed_bin_labels, colors)
]
ax.legend(
handles=legend_handles,
loc="lower center",
bbox_to_anchor=(0.5, -0.15),
ncol=2,
title="Vitesses (km/h)",
)
fig.tight_layout()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
def plot_wind_vector_series(
vector_df: pd.DataFrame,
output_path: str | Path,
*,
title: str = "Vecteurs moyens du vent",
) -> Path:
"""
Représente les composantes moyennes du vent sous forme de flèches (u/v).
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if vector_df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Pas de données de vent.", ha="center", va="center")
ax.set_axis_off()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return output_path.resolve()
export_plot_dataset(vector_df, output_path)
times = vector_df.index
x = mdates.date2num(times)
u = vector_df["u"].to_numpy(dtype=float)
v = vector_df["v"].to_numpy(dtype=float)
speed = vector_df["speed"].to_numpy(dtype=float)
fig, ax = plt.subplots(figsize=(12, 4))
q = ax.quiver(
x,
np.zeros_like(x),
u,
v,
speed,
angles="xy",
scale_units="xy",
scale=1,
cmap="viridis",
)
ax.axhline(0, color="black", linewidth=0.5)
ax.set_ylim(-max(abs(v)) * 1.2 if np.any(v) else -1, max(abs(v)) * 1.2 if np.any(v) else 1)
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.ConciseDateFormatter(ax.xaxis.get_major_locator()))
ax.set_ylabel("Composante nord (v)")
ax.set_xlabel("Date")
ax.set_title(title)
cbar = fig.colorbar(q, ax=ax)
cbar.set_label("Vitesse moyenne (km/h)")
fig.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
return output_path.resolve()

View File

@ -27,11 +27,7 @@ def main() -> None:
print(f"Après resampling 60s : {len(df_min)} lignes")
hemisphere = "north"
try:
location = StationLocation.from_env(optional=True)
except RuntimeError as exc:
print(f"⚠ Coordonnées GPS invalides : {exc}")
location = None
location = StationLocation.from_env(optional=True)
if location is not None:
hemisphere = "south" if location.latitude < 0 else "north"

View File

@ -69,11 +69,7 @@ def iter_modules(selected: Iterable[str] | None) -> list[str]:
def run_module(module: str) -> bool:
cmd = [sys.executable, "-m", module]
print(f"\n=== {module} ===")
try:
result = subprocess.run(cmd, check=False)
except FileNotFoundError as exc: # pragma: no cover
print(f"⚠ Impossible de lancer {module} : {exc}")
return False
result = subprocess.run(cmd, check=False)
if result.returncode == 0:
print(f"{module} terminé avec succès.")

View File

@ -34,10 +34,7 @@ def main() -> None:
print("✔ Ping OK")
print("→ Requête de test sur le bucket…")
try:
tables = test_basic_query(client, settings.bucket)
except InfluxDBError as exc:
raise SystemExit(f"Erreur lors de la requête Flux : {exc}") from exc
tables = test_basic_query(client, settings.bucket)
# On fait un retour synthétique
nb_tables = len(tables)