diff --git a/docs/08 - Enrichissement du jeu de données.md b/docs/08 - Enrichissement du jeu de données.md index 48f0bcd..4ac0043 100644 --- a/docs/08 - Enrichissement du jeu de données.md +++ b/docs/08 - Enrichissement du jeu de données.md @@ -1,3 +1,4 @@ # Enrichissement du jeu de données -- Élévation du soleil +- Élévation du soleil (sun_elevation) +- Saison météorologique (season) diff --git a/figures/seasonal/rainfall_by_season.png b/figures/seasonal/rainfall_by_season.png new file mode 100644 index 0000000..2e5c0ac Binary files /dev/null and b/figures/seasonal/rainfall_by_season.png differ diff --git a/figures/seasonal/seasonal_boxplots.png b/figures/seasonal/seasonal_boxplots.png new file mode 100644 index 0000000..94624d8 Binary files /dev/null and b/figures/seasonal/seasonal_boxplots.png differ diff --git a/meteo/analysis.py b/meteo/analysis.py index d1debe3..01fdc38 100644 --- a/meteo/analysis.py +++ b/meteo/analysis.py @@ -8,6 +8,7 @@ import numpy as np import pandas as pd from .variables import Variable +from .season import SEASON_LABELS def compute_correlation_matrix( @@ -559,3 +560,42 @@ def compute_binned_statistics( quantile_low_level=q_low, quantile_high_level=q_high, ) + + +def compute_rainfall_by_season( + df: pd.DataFrame, + *, + rate_column: str = "rain_rate", + season_column: str = "season", +) -> pd.DataFrame: + """ + Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses. + """ + _ensure_datetime_index(df) + + for col in (rate_column, season_column): + if col not in df.columns: + raise KeyError(f"Colonne absente : {col}") + + data = df[[rate_column, season_column]].copy() + data[rate_column] = data[rate_column].fillna(0.0) + data = data.dropna(subset=[season_column]) + if data.empty: + return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float) + + time_step = _infer_time_step(data.index) + diffs = data.index.to_series().diff().fillna(time_step) + hours = diffs.dt.total_seconds() / 3600.0 + + rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float) + data["rainfall_mm"] = rainfall_mm + data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float) + + agg = data.groupby(season_column).agg( + total_rain_mm=("rainfall_mm", "sum"), + rainy_hours=("rainy_hours", "sum"), + ) + + order = [season for season in SEASON_LABELS if season in agg.index] + agg = agg.loc[order] + return agg diff --git a/meteo/plots.py b/meteo/plots.py index 4b103b2..8909304 100644 --- a/meteo/plots.py +++ b/meteo/plots.py @@ -12,6 +12,7 @@ import numpy as np import pandas as pd from .analysis import DiurnalCycleStats, BinnedStatistics +from .season import SEASON_LABELS from .variables import Variable @@ -596,6 +597,81 @@ def plot_diurnal_cycle( return output_path.resolve() +def plot_seasonal_boxplots( + df: pd.DataFrame, + variables: Sequence[Variable], + output_path: str | Path, + *, + season_column: str = "season", + season_order: Sequence[str] | None = None, + title: str | None = None, +) -> Path: + """ + Trace des boxplots par saison pour une sélection de variables. + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + if season_column not in df.columns: + raise KeyError(f"Colonne saison absente : {season_column}") + + available = df[season_column].dropna().unique() + if season_order is None: + season_order = [season for season in SEASON_LABELS if season in available] + else: + season_order = [season for season in season_order if season in available] + + if not season_order: + fig, ax = plt.subplots() + ax.text(0.5, 0.5, "Aucune donnée saisonnière disponible.", ha="center", va="center") + ax.set_axis_off() + fig.savefig(output_path, dpi=150, bbox_inches="tight") + plt.close(fig) + return output_path.resolve() + + n_vars = len(variables) + fig, axes = plt.subplots(n_vars, 1, figsize=(10, 3 * n_vars), sharex=True) + if n_vars == 1: + axes = [axes] + + colors = plt.get_cmap("Set3")(np.linspace(0.2, 0.8, len(season_order))) + labels = [season.capitalize() for season in season_order] + + for ax, var in zip(axes, variables): + data = [ + df.loc[df[season_column] == season, var.column].dropna().to_numpy() + for season in season_order + ] + if not any(len(arr) > 0 for arr in data): + ax.text(0.5, 0.5, f"Aucune donnée pour {var.label}.", ha="center", va="center") + ax.set_axis_off() + continue + + box = ax.boxplot( + data, + labels=labels, + showfliers=False, + patch_artist=True, + ) + for patch, color in zip(box["boxes"], colors): + patch.set_facecolor(color) + patch.set_alpha(0.7) + + ylabel = f"{var.label} ({var.unit})" if var.unit else var.label + ax.set_ylabel(ylabel) + ax.grid(True, linestyle=":", alpha=0.5) + + axes[-1].set_xlabel("Saison") + if title: + fig.suptitle(title) + fig.tight_layout(rect=[0, 0, 1, 0.95]) + else: + fig.tight_layout() + fig.savefig(output_path, dpi=150) + plt.close(fig) + return output_path.resolve() + + def plot_binned_profiles( stats: BinnedStatistics, variables: Sequence[Variable], @@ -747,3 +823,69 @@ def plot_daily_rainfall_hyetograph( fig.savefig(output_path, dpi=150) plt.close(fig) return output_path.resolve() + + +def plot_rainfall_by_season( + rainfall_df: pd.DataFrame, + output_path: str | Path, + *, + title: str = "Pluie cumulée par saison", +) -> Path: + """ + Affiche la pluie cumulée par saison ainsi que le nombre d'heures pluvieuses. + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + if rainfall_df.empty: + fig, ax = plt.subplots() + ax.text(0.5, 0.5, "Pas de données de pluie saisonnière.", ha="center", va="center") + ax.set_axis_off() + fig.savefig(output_path, dpi=150, bbox_inches="tight") + plt.close(fig) + return output_path.resolve() + + seasons = rainfall_df.index.tolist() + x = np.arange(len(seasons)) + totals = rainfall_df["total_rain_mm"].to_numpy(dtype=float) + + fig, ax1 = plt.subplots(figsize=(9, 4)) + bars = ax1.bar(x, totals, color="tab:blue", alpha=0.7, label="Pluie cumulée") + ax1.set_ylabel("Pluie cumulée (mm)") + ax1.set_xlabel("Saison") + ax1.set_xticks(x) + ax1.set_xticklabels([season.capitalize() for season in seasons]) + ax1.grid(True, axis="y", linestyle=":", alpha=0.5) + + for rect, value in zip(bars, totals): + height = rect.get_height() + ax1.text(rect.get_x() + rect.get_width() / 2, height, f"{value:.0f}", ha="center", va="bottom", fontsize=8) + + lines = [] + labels = [] + + if "rainy_hours" in rainfall_df.columns: + ax2 = ax1.twinx() + rainy_hours = rainfall_df["rainy_hours"].to_numpy(dtype=float) + line = ax2.plot( + x, + rainy_hours, + color="tab:red", + marker="o", + label="Heures pluvieuses", + )[0] + ax2.set_ylabel("Heures pluvieuses") + lines.append(line) + labels.append("Heures pluvieuses") + + handles, lbls = ax1.get_legend_handles_labels() + handles.extend(lines) + lbls.extend(labels) + if handles: + ax1.legend(handles, lbls, loc="upper left") + + ax1.set_title(title) + fig.tight_layout() + fig.savefig(output_path, dpi=150) + plt.close(fig) + return output_path.resolve() diff --git a/meteo/season.py b/meteo/season.py new file mode 100644 index 0000000..0ec5310 --- /dev/null +++ b/meteo/season.py @@ -0,0 +1,84 @@ +# meteo/season.py +from __future__ import annotations + +from typing import Iterable, Sequence + +import numpy as np +import pandas as pd + + +SEASON_LABELS = np.array(["winter", "spring", "summer", "autumn"]) +MONTH_TO_SEASON_INDEX = { + 12: 0, + 1: 0, + 2: 0, + 3: 1, + 4: 1, + 5: 1, + 6: 2, + 7: 2, + 8: 2, + 9: 3, + 10: 3, + 11: 3, +} + + +def _ensure_datetime_index(index: pd.Index) -> pd.DatetimeIndex: + if not isinstance(index, pd.DatetimeIndex): + raise TypeError("Cette fonction nécessite un DatetimeIndex.") + return index + + +def _season_indices_for_month(months: np.ndarray, hemisphere: str) -> np.ndarray: + base_indices = np.vectorize(MONTH_TO_SEASON_INDEX.get)(months) + if hemisphere == "south": + return (base_indices + 2) % len(SEASON_LABELS) + return base_indices + + +def compute_season_series( + index: pd.Index, + *, + hemisphere: str = "north", + column_name: str = "season", +) -> pd.Series: + """ + Retourne une série catégorielle indiquant la saison météorologique pour chaque timestamp. + """ + hemisphere = hemisphere.lower() + if hemisphere not in {"north", "south"}: + raise ValueError("hemisphere doit valoir 'north' ou 'south'.") + + dt_index = _ensure_datetime_index(index) + month_array = dt_index.month.to_numpy() + season_indices = _season_indices_for_month(month_array, hemisphere) + labels = SEASON_LABELS[season_indices] + return pd.Series(labels, index=dt_index, name=column_name) + + +def add_season_column( + df: pd.DataFrame, + *, + hemisphere: str = "north", + column_name: str = "season", +) -> pd.DataFrame: + """ + Ajoute une colonne 'season' (winter/spring/summer/autumn) au DataFrame. + """ + series = compute_season_series(df.index, hemisphere=hemisphere, column_name=column_name) + df[column_name] = series + return df + + +def sort_season_labels( + labels: Iterable[str], + *, + order: Sequence[str] | None = None, +) -> list[str]: + """ + Trie la liste fournie en respectant l'ordre saisonnier par défaut. + """ + reference = [str(season) for season in (order if order is not None else SEASON_LABELS)] + label_set = {str(label) for label in labels if label} + return [season for season in reference if season in label_set] diff --git a/scripts/make_minutely_dataset.py b/scripts/make_minutely_dataset.py index ed51a20..2202e00 100644 --- a/scripts/make_minutely_dataset.py +++ b/scripts/make_minutely_dataset.py @@ -6,6 +6,7 @@ from pathlib import Path from meteo.dataset import load_raw_csv, resample_to_minutes from meteo.config import StationLocation from meteo.solar import add_solar_elevation_column +from meteo.season import add_season_column FORMATTED_CSV_PATH = Path("data/weather_filled_1s.csv") @@ -25,6 +26,7 @@ def main() -> None: df_min = resample_to_minutes(df_1s) print(f"Après resampling 60s : {len(df_min)} lignes") + hemisphere = "north" try: location = StationLocation.from_env(optional=True) except RuntimeError as exc: @@ -32,6 +34,7 @@ def main() -> None: location = None if location is not None: + hemisphere = "south" if location.latitude < 0 else "north" print( f"Ajout de l'élévation solaire (lat={location.latitude}, lon={location.longitude}, " f"alt={location.elevation_m} m)..." @@ -47,6 +50,9 @@ def main() -> None: "ℹ Coordonnées GPS non définies (STATION_LATITUDE / STATION_LONGITUDE). " "La colonne sun_elevation ne sera pas ajoutée." ) + print("ℹ Saison : hypothèse par défaut = hémisphère nord. Définissez STATION_LATITUDE pour adapter.") + + add_season_column(df_min, hemisphere=hemisphere) OUTPUT_CSV_PATH.parent.mkdir(parents=True, exist_ok=True) df_min.to_csv(OUTPUT_CSV_PATH, index_label="time") diff --git a/scripts/plot_seasonal_overview.py b/scripts/plot_seasonal_overview.py new file mode 100644 index 0000000..fe466a7 --- /dev/null +++ b/scripts/plot_seasonal_overview.py @@ -0,0 +1,66 @@ +# scripts/plot_seasonal_overview.py +from __future__ import annotations + +from pathlib import Path + +from meteo.dataset import load_raw_csv +from meteo.variables import VARIABLES_BY_KEY +from meteo.analysis import compute_rainfall_by_season +from meteo.plots import plot_seasonal_boxplots, plot_rainfall_by_season +from meteo.season import sort_season_labels, SEASON_LABELS + + +CSV_PATH = Path("data/weather_minutely.csv") +OUTPUT_DIR = Path("figures/seasonal") + +BOXPLOT_VARIABLES = ["temperature", "humidity", "pressure", "wind_speed"] + + +def infer_season_order(df) -> list[str]: + seasons = df["season"].dropna().unique() + order = sort_season_labels(seasons, order=SEASON_LABELS) + if not order: + order = list(SEASON_LABELS) + return order + + +def main() -> None: + if not CSV_PATH.exists(): + print(f"⚠ Fichier introuvable : {CSV_PATH}") + return + + df = load_raw_csv(CSV_PATH) + print(f"Dataset minuté chargé : {CSV_PATH}") + print(f" Lignes : {len(df)}") + print(f" Colonnes : {list(df.columns)}") + print() + + if "season" not in df.columns: + print("⚠ La colonne 'season' est absente. Relancez scripts.make_minutely_dataset.") + return + + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + season_order = infer_season_order(df) + print(f"Saisons détectées : {season_order}") + + variables = [VARIABLES_BY_KEY[key] for key in BOXPLOT_VARIABLES] + boxplot_path = OUTPUT_DIR / "seasonal_boxplots.png" + plot_seasonal_boxplots( + df=df, + variables=variables, + output_path=boxplot_path, + season_order=season_order, + title="Distribution des mesures par saison", + ) + print(f"✔ Boxplots saisonniers : {boxplot_path}") + + rainfall = compute_rainfall_by_season(df=df, rate_column="rain_rate", season_column="season") + rainfall_path = OUTPUT_DIR / "rainfall_by_season.png" + plot_rainfall_by_season(rainfall_df=rainfall, output_path=rainfall_path) + print(f"✔ Pluie saisonnière : {rainfall_path}") + + print("✔ Tous les graphiques saisonniers ont été générés.") + + +if __name__ == "__main__": + main()