1

Compare commits

...

7 Commits

74 changed files with 3065 additions and 10 deletions

View File

@ -2,3 +2,6 @@ INFLUXDB_URL=http://
INFLUXDB_TOKEN= INFLUXDB_TOKEN=
INFLUXDB_ORG= INFLUXDB_ORG=
INFLUXDB_BUCKET=weather INFLUXDB_BUCKET=weather
STATION_LATITUDE=
STATION_LONGITUDE=
STATION_ELEVATION=

View File

@ -199,6 +199,7 @@ Top 10 des gaps les plus longs :
``` ```
Ces trous dans les données peuvent correspondre à des pannes de connexion entre la station et mon réseau, un redémarrage de mon serveur (physique ou logiciel), au redémarrage de la box ou du point d'accès sans-fil, etc. Ces trous dans les données peuvent correspondre à des pannes de connexion entre la station et mon réseau, un redémarrage de mon serveur (physique ou logiciel), au redémarrage de la box ou du point d'accès sans-fil, etc.
Ils peuvent aussi correspondre aux modifications opérées dans les scripts précédents.
Ces scripts sont intéressants parce qu'ils mettent en évidence des facteurs indirects, contribuant à la qualité des données soumise. Ces scripts sont intéressants parce qu'ils mettent en évidence des facteurs indirects, contribuant à la qualité des données soumise.
On peut prendre toutes les précautions, on peut avoir l'intuition d'avoir tout géré, et se rassurer parce qu'on utilise des outils fiables, mais il existera toujours des manques dans les données. On peut prendre toutes les précautions, on peut avoir l'intuition d'avoir tout géré, et se rassurer parce qu'on utilise des outils fiables, mais il existera toujours des manques dans les données.

View File

@ -0,0 +1,4 @@
# Enrichissement du jeu de données
- Élévation du soleil (sun_elevation)
- Saison météorologique (season)

Binary file not shown.

After

Width:  |  Height:  |  Size: 167 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 189 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 342 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 362 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 173 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 167 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 317 KiB

After

Width:  |  Height:  |  Size: 529 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 240 KiB

After

Width:  |  Height:  |  Size: 430 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 41 KiB

After

Width:  |  Height:  |  Size: 89 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 380 KiB

After

Width:  |  Height:  |  Size: 623 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 207 KiB

After

Width:  |  Height:  |  Size: 414 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 344 KiB

After

Width:  |  Height:  |  Size: 400 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 216 KiB

After

Width:  |  Height:  |  Size: 383 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 238 KiB

After

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

After

Width:  |  Height:  |  Size: 115 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 303 KiB

After

Width:  |  Height:  |  Size: 497 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 180 KiB

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

After

Width:  |  Height:  |  Size: 116 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

After

Width:  |  Height:  |  Size: 116 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 279 KiB

After

Width:  |  Height:  |  Size: 458 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 286 KiB

After

Width:  |  Height:  |  Size: 454 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 222 KiB

After

Width:  |  Height:  |  Size: 349 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

After

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 377 KiB

After

Width:  |  Height:  |  Size: 503 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 197 KiB

After

Width:  |  Height:  |  Size: 374 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 200 KiB

After

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 255 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 107 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 195 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 224 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

View File

@ -1,12 +1,16 @@
# meteo/analysis.py # meteo/analysis.py
from __future__ import annotations from __future__ import annotations
from typing import Literal from dataclasses import dataclass
from typing import Literal, Sequence
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from .variables import Variable from .variables import Variable
from .season import SEASON_LABELS
MONTH_ORDER = list(range(1, 13))
def compute_correlation_matrix( def compute_correlation_matrix(
@ -115,3 +119,627 @@ def compute_lagged_correlation(
lag_df = lag_df.set_index("lag_minutes") lag_df = lag_df.set_index("lag_minutes")
return lag_df return lag_df
def _ensure_datetime_index(df: pd.DataFrame) -> pd.DatetimeIndex:
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("Cette fonction nécessite un DataFrame indexé par le temps.")
return df.index
@dataclass
class DiurnalCycleStats:
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
@dataclass
class BinnedStatistics:
centers: np.ndarray
intervals: pd.IntervalIndex
counts: pd.Series
mean: pd.DataFrame
median: pd.DataFrame
quantile_low: pd.DataFrame | None
quantile_high: pd.DataFrame | None
quantile_low_level: float | None = None
quantile_high_level: float | None = None
def compute_rolling_correlation_series(
df: pd.DataFrame,
var_x: Variable,
var_y: Variable,
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.Series:
"""
Calcule la corrélation glissante X/Y sur une fenêtre temporelle.
Retourne une série indexée par l'instant de fin de fenêtre.
"""
if not 0 < min_valid_fraction <= 1:
raise ValueError("min_valid_fraction doit être dans l'intervalle ]0, 1].")
for col in (var_x.column, var_y.column):
if col not in df.columns:
raise KeyError(f"Colonne absente du DataFrame : {col}")
_ensure_datetime_index(df)
pair = df[[var_x.column, var_y.column]].dropna().sort_index()
if pair.empty:
return pd.Series(dtype=float, name=f"{var_x.key}{var_y.key}")
window = f"{window_minutes}min"
min_periods = max(1, int(window_minutes * min_valid_fraction))
if method not in {"pearson"}:
raise NotImplementedError(
"Les corrélations glissantes ne supportent actuellement que la méthode 'pearson'."
)
rolling_corr = pair[var_x.column].rolling(
window=window,
min_periods=min_periods,
).corr(pair[var_y.column])
rolling_corr = rolling_corr.dropna()
rolling_corr.name = f"{var_x.key}{var_y.key}"
if step_minutes and step_minutes > 1:
rolling_corr = rolling_corr.resample(f"{step_minutes}min").mean().dropna()
return rolling_corr
def compute_rolling_correlations_for_pairs(
df: pd.DataFrame,
pairs: Sequence[tuple[Variable, Variable]],
*,
window_minutes: int,
min_valid_fraction: float = 0.6,
step_minutes: int | None = None,
method: Literal["pearson", "spearman"] = "pearson",
) -> pd.DataFrame:
"""
Calcule les corrélations glissantes pour plusieurs paires et aligne les
résultats dans un DataFrame (index temps, colonnes = 'x→y').
"""
series_list: list[pd.Series] = []
for var_x, var_y in pairs:
corr = compute_rolling_correlation_series(
df=df,
var_x=var_x,
var_y=var_y,
window_minutes=window_minutes,
min_valid_fraction=min_valid_fraction,
step_minutes=step_minutes,
method=method,
)
if not corr.empty:
series_list.append(corr)
if not series_list:
return pd.DataFrame()
result = pd.concat(series_list, axis=1)
result = result.sort_index()
return result
def _infer_time_step(index: pd.DatetimeIndex) -> pd.Timedelta:
diffs = index.to_series().diff().dropna()
if diffs.empty:
return pd.Timedelta(minutes=1)
return diffs.median()
def detect_threshold_events(
series: pd.Series,
*,
threshold: float,
min_duration: pd.Timedelta,
min_gap: pd.Timedelta,
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""
Détecte des événements `series > threshold` (après remplissage des NaN
par False) durant au moins `min_duration`. Les événements séparés d'un
intervalle < min_gap sont fusionnés.
"""
if not isinstance(series.index, pd.DatetimeIndex):
raise TypeError("series doit être indexée par le temps.")
mask = (series > threshold).fillna(False)
if not mask.any():
return []
groups = (mask != mask.shift()).cumsum()
time_step = _infer_time_step(series.index)
raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for group_id, group_mask in mask.groupby(groups):
if not group_mask.iloc[0]:
continue
start = group_mask.index[0]
end = group_mask.index[-1] + time_step
duration = end - start
if duration >= min_duration:
raw_events.append((start, end))
if not raw_events:
return []
merged: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for start, end in raw_events:
if not merged:
merged.append((start, end))
continue
prev_start, prev_end = merged[-1]
if start - prev_end < min_gap:
merged[-1] = (prev_start, max(prev_end, end))
else:
merged.append((start, end))
return merged
def build_event_aligned_segments(
df: pd.DataFrame,
events: Sequence[tuple[pd.Timestamp, pd.Timestamp]],
columns: Sequence[str],
*,
window_before_minutes: int,
window_after_minutes: int,
resample_minutes: int = 1,
) -> pd.DataFrame:
"""
Extrait, pour chaque événement, les séries centrées sur son début et
retourne un DataFrame MultiIndex (event_id, offset_minutes).
"""
if not events:
return pd.DataFrame(columns=columns)
index = _ensure_datetime_index(df)
data = df[columns].sort_index()
freq = pd.Timedelta(minutes=resample_minutes)
if resample_minutes > 1:
data = data.resample(freq).mean()
before = pd.Timedelta(minutes=window_before_minutes)
after = pd.Timedelta(minutes=window_after_minutes)
segments: list[pd.DataFrame] = []
for event_id, (start, _end) in enumerate(events):
window_start = start - before
window_end = start + after
window_index = pd.date_range(window_start, window_end, freq=freq)
segment = data.reindex(window_index)
if segment.empty:
continue
offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float)
multi_index = pd.MultiIndex.from_arrays(
[np.full(len(segment), event_id), offsets],
names=["event_id", "offset_minutes"],
)
segment.index = multi_index
segments.append(segment)
if not segments:
return pd.DataFrame(columns=columns)
aligned = pd.concat(segments)
return aligned
def compute_diurnal_cycle_statistics(
df: pd.DataFrame,
variables: Sequence[Variable],
*,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> DiurnalCycleStats:
"""
Agrège les variables par heure locale pour visualiser un cycle diurne moyen.
"""
_ensure_datetime_index(df)
columns = [v.column for v in variables]
grouped = df[columns].groupby(df.index.hour)
mean_df = grouped.mean()
median_df = grouped.median()
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
q_low = q_high = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped.quantile(q_low)
if q_high is not None:
quantile_high_df = grouped.quantile(q_high)
return DiurnalCycleStats(
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def _format_speed_bin_labels(speed_bins: Sequence[float]) -> list[str]:
labels: list[str] = []
for i in range(len(speed_bins) - 1):
low = speed_bins[i]
high = speed_bins[i + 1]
if np.isinf(high):
labels.append(f"{low:g}")
else:
labels.append(f"{low:g}{high:g}")
return labels
def compute_wind_rose_distribution(
df: pd.DataFrame,
*,
direction_sector_size: int = 30,
speed_bins: Sequence[float] = (0, 10, 20, 30, 50, float("inf")),
) -> tuple[pd.DataFrame, list[str], float]:
"""
Regroupe la distribution vent/direction en secteurs angulaires et classes de vitesse.
Retourne un DataFrame indexé par le début du secteur (en degrés) et colonnes = classes de vitesse (%).
"""
if direction_sector_size <= 0 or direction_sector_size > 180:
raise ValueError("direction_sector_size doit être compris entre 1 et 180 degrés.")
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Le DataFrame doit contenir 'wind_speed' et 'wind_direction'.")
data = df[["wind_speed", "wind_direction"]].dropna()
if data.empty:
return pd.DataFrame(), [], float(direction_sector_size)
n_sectors = int(360 / direction_sector_size)
direction = data["wind_direction"].to_numpy(dtype=float) % 360.0
sector_indices = np.floor(direction / direction_sector_size).astype(int) % n_sectors
bins = list(speed_bins)
if not np.isinf(bins[-1]):
bins.append(float("inf"))
labels = _format_speed_bin_labels(bins)
speed_categories = pd.cut(
data["wind_speed"],
bins=bins,
right=False,
include_lowest=True,
labels=labels,
)
counts = (
pd.crosstab(sector_indices, speed_categories)
.reindex(range(n_sectors), fill_value=0)
.reindex(columns=labels, fill_value=0)
)
total = counts.values.sum()
frequencies = counts / total * 100.0 if total > 0 else counts.astype(float)
frequencies.index = frequencies.index * direction_sector_size
return frequencies, labels, float(direction_sector_size)
def compute_daily_rainfall_totals(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
) -> pd.DataFrame:
"""
Convertit un taux de pluie (mm/h) en cumuls journaliers et cumulés.
"""
_ensure_datetime_index(df)
if rate_column not in df.columns:
raise KeyError(f"Colonne absente : {rate_column}")
series = df[rate_column].fillna(0.0).sort_index()
if series.empty:
return pd.DataFrame(columns=["daily_total", "cumulative_total"])
time_step = _infer_time_step(series.index)
diffs = series.index.to_series().diff()
diffs = diffs.fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = series.to_numpy(dtype=float) * hours.to_numpy(dtype=float)
rainfall_series = pd.Series(rainfall_mm, index=series.index)
daily_totals = rainfall_series.resample("1D").sum()
cumulative = daily_totals.cumsum()
result = pd.DataFrame(
{
"daily_total": daily_totals,
"cumulative_total": cumulative,
}
)
return result
def compute_binned_statistics(
df: pd.DataFrame,
*,
bin_source_column: str,
target_columns: Sequence[str],
bins: Sequence[float] | np.ndarray,
min_count: int = 30,
quantiles: tuple[float, float] | None = (0.25, 0.75),
) -> BinnedStatistics:
"""
Calcule des statistiques (mean/median/quantiles) pour plusieurs colonnes
en regroupant les données selon des intervalles définis sur une colonne source.
"""
if bin_source_column not in df.columns:
raise KeyError(f"Colonne source absente : {bin_source_column}")
missing_targets = [col for col in target_columns if col not in df.columns]
if missing_targets:
raise KeyError(f"Colonnes cibles absentes : {missing_targets!r}")
subset_cols = [bin_source_column, *target_columns]
data = df[subset_cols].dropna(subset=[bin_source_column])
if data.empty:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
categories = pd.cut(data[bin_source_column], bins=bins, include_lowest=True)
grouped = data.groupby(categories, observed=False)
counts = grouped.size()
valid_mask = counts >= max(1, min_count)
valid_intervals = counts.index[valid_mask]
if len(valid_intervals) == 0:
empty_interval_index = pd.IntervalIndex([])
empty_df = pd.DataFrame(columns=target_columns)
empty_counts = pd.Series(dtype=int)
return BinnedStatistics(
centers=np.array([]),
intervals=empty_interval_index,
counts=empty_counts,
mean=empty_df,
median=empty_df,
quantile_low=None,
quantile_high=None,
)
interval_index = pd.IntervalIndex(valid_intervals)
mean_df = grouped[target_columns].mean().loc[interval_index]
median_df = grouped[target_columns].median().loc[interval_index]
q_low = q_high = None
quantile_low_df: pd.DataFrame | None = None
quantile_high_df: pd.DataFrame | None = None
if quantiles is not None:
q_low, q_high = quantiles
if q_low is not None:
quantile_low_df = grouped[target_columns].quantile(q_low).loc[interval_index]
if q_high is not None:
quantile_high_df = grouped[target_columns].quantile(q_high).loc[interval_index]
centers = np.array([interval.mid for interval in interval_index])
filtered_counts = counts.loc[interval_index]
return BinnedStatistics(
centers=centers,
intervals=interval_index,
counts=filtered_counts,
mean=mean_df,
median=median_df,
quantile_low=quantile_low_df,
quantile_high=quantile_high_df,
quantile_low_level=q_low,
quantile_high_level=q_high,
)
def compute_rainfall_by_season(
df: pd.DataFrame,
*,
rate_column: str = "rain_rate",
season_column: str = "season",
) -> pd.DataFrame:
"""
Calcule la pluie totale par saison (mm) ainsi que le nombre d'heures pluvieuses.
"""
_ensure_datetime_index(df)
for col in (rate_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
data = df[[rate_column, season_column]].copy()
data[rate_column] = data[rate_column].fillna(0.0)
data = data.dropna(subset=[season_column])
if data.empty:
return pd.DataFrame(columns=["total_rain_mm", "rainy_hours"]).astype(float)
time_step = _infer_time_step(data.index)
diffs = data.index.to_series().diff().fillna(time_step)
hours = diffs.dt.total_seconds() / 3600.0
rainfall_mm = data[rate_column].to_numpy(dtype=float) * hours.to_numpy(dtype=float)
data["rainfall_mm"] = rainfall_mm
data["rainy_hours"] = (rainfall_mm > 0).astype(float) * hours.to_numpy(dtype=float)
agg = data.groupby(season_column).agg(
total_rain_mm=("rainfall_mm", "sum"),
rainy_hours=("rainy_hours", "sum"),
)
order = [season for season in SEASON_LABELS if season in agg.index]
agg = agg.loc[order]
return agg
def filter_by_condition(
df: pd.DataFrame,
*,
condition: pd.Series,
) -> pd.DataFrame:
"""
Renvoie une copie filtrée du DataFrame selon une condition booleenne alignée.
"""
mask = condition.reindex(df.index)
mask = mask.fillna(False)
return df.loc[mask]
def compute_monthly_climatology(
df: pd.DataFrame,
*,
columns: Sequence[str],
) -> pd.DataFrame:
"""
Moyenne par mois (112) pour les colonnes fournies.
"""
_ensure_datetime_index(df)
missing = [col for col in columns if col not in df.columns]
if missing:
raise KeyError(f"Colonnes absentes : {missing}")
grouped = df[list(columns)].groupby(df.index.month).mean()
grouped = grouped.reindex(MONTH_ORDER)
grouped.index.name = "month"
return grouped
def compute_monthly_means(
df: pd.DataFrame,
*,
columns: Sequence[str],
) -> pd.DataFrame:
"""
Moyennes calendaire par mois (indexé sur la fin de mois).
"""
_ensure_datetime_index(df)
missing = [col for col in columns if col not in df.columns]
if missing:
raise KeyError(f"Colonnes absentes : {missing}")
monthly = df[list(columns)].resample("1ME").mean()
return monthly.dropna(how="all")
def compute_seasonal_hourly_profile(
df: pd.DataFrame,
*,
value_column: str,
season_column: str = "season",
) -> pd.DataFrame:
"""
Retourne une matrice (heures x saisons) contenant la moyenne d'une variable.
"""
_ensure_datetime_index(df)
for col in (value_column, season_column):
if col not in df.columns:
raise KeyError(f"Colonne absente : {col}")
subset = df[[value_column, season_column]].dropna()
if subset.empty:
return pd.DataFrame(index=range(24))
grouped = subset.groupby([season_column, subset.index.hour])[value_column].mean()
pivot = grouped.unstack(season_column)
pivot = pivot.reindex(index=range(24))
order = [season for season in SEASON_LABELS if season in pivot.columns]
if order:
pivot = pivot[order]
pivot.index.name = "hour"
return pivot
def compute_monthly_daylight_hours(
df: pd.DataFrame,
*,
illuminance_column: str = "illuminance",
threshold_lux: float = 1000.0,
) -> pd.Series:
"""
Calcule la durée moyenne de luminosité (> threshold_lux) par mois (en heures par jour).
"""
_ensure_datetime_index(df)
if illuminance_column not in df.columns:
raise KeyError(f"Colonne absente : {illuminance_column}")
subset = df[[illuminance_column]].dropna()
if subset.empty:
return pd.Series(dtype=float)
time_step = _infer_time_step(subset.index)
hours_per_step = time_step.total_seconds() / 3600.0
daylight_flag = (subset[illuminance_column] >= threshold_lux).astype(float)
daylight_hours = daylight_flag * hours_per_step
daily_hours = daylight_hours.resample("1D").sum()
monthly_avg = daily_hours.resample("1ME").mean()
return monthly_avg.dropna()
def compute_mean_wind_components(
df: pd.DataFrame,
*,
freq: str = "1M",
) -> pd.DataFrame:
"""
Calcule les composantes zonale (u) et méridienne (v) du vent pour une fréquence donnée.
Retourne également la vitesse moyenne.
"""
if "wind_speed" not in df.columns or "wind_direction" not in df.columns:
raise KeyError("Les colonnes 'wind_speed' et 'wind_direction' sont requises.")
_ensure_datetime_index(df)
subset = df[["wind_speed", "wind_direction"]].dropna()
if subset.empty:
return pd.DataFrame(columns=["u", "v", "speed"])
radians = np.deg2rad(subset["wind_direction"].to_numpy(dtype=float))
speed = subset["wind_speed"].to_numpy(dtype=float)
u = speed * np.sin(radians) * -1 # composante est-ouest (positive vers l'est)
v = speed * np.cos(radians) * -1 # composante nord-sud (positive vers le nord)
vector_df = pd.DataFrame(
{
"u": u,
"v": v,
"speed": speed,
},
index=subset.index,
)
actual_freq = "1ME" if freq == "1M" else freq
grouped = vector_df.resample(actual_freq).mean()
return grouped.dropna(how="all")

View File

@ -65,3 +65,58 @@ class InfluxSettings:
org=org, # type: ignore[arg-type] org=org, # type: ignore[arg-type]
bucket=bucket, # type: ignore[arg-type] bucket=bucket, # type: ignore[arg-type]
) )
@dataclass(frozen=True)
class StationLocation:
"""
Décrit la position géographique de la station météo.
Utilisée pour les calculs astronomiques (ex: élévation du soleil).
"""
latitude: float
longitude: float
elevation_m: float = 0.0
@classmethod
def from_env(cls, *, optional: bool = False) -> Self | None:
"""
Charge les coordonnées GPS depuis les variables d'environnement :
- STATION_LATITUDE (obligatoire)
- STATION_LONGITUDE (obligatoire)
- STATION_ELEVATION (optionnelle, en mètres)
"""
load_dotenv()
lat = os.getenv("STATION_LATITUDE")
lon = os.getenv("STATION_LONGITUDE")
elev = os.getenv("STATION_ELEVATION")
if not lat or not lon:
if optional:
return None
raise RuntimeError(
"Les variables STATION_LATITUDE et STATION_LONGITUDE doivent être définies "
"pour calculer l'élévation solaire."
)
try:
latitude = float(lat)
longitude = float(lon)
elevation = float(elev) if elev else 0.0
except ValueError as exc:
raise RuntimeError(
"STATION_LATITUDE / STATION_LONGITUDE / STATION_ELEVATION doivent être des nombres valides."
) from exc
return cls(latitude=latitude, longitude=longitude, elevation_m=elevation)
def to_astral_observer_kwargs(self) -> dict[str, float]:
"""
Prépare les arguments attendus par astral.Observer.
"""
return {
"latitude": self.latitude,
"longitude": self.longitude,
"elevation": self.elevation_m,
}

File diff suppressed because it is too large Load Diff

84
meteo/season.py Normal file
View File

@ -0,0 +1,84 @@
# meteo/season.py
from __future__ import annotations
from typing import Iterable, Sequence
import numpy as np
import pandas as pd
SEASON_LABELS = np.array(["winter", "spring", "summer", "autumn"])
MONTH_TO_SEASON_INDEX = {
12: 0,
1: 0,
2: 0,
3: 1,
4: 1,
5: 1,
6: 2,
7: 2,
8: 2,
9: 3,
10: 3,
11: 3,
}
def _ensure_datetime_index(index: pd.Index) -> pd.DatetimeIndex:
if not isinstance(index, pd.DatetimeIndex):
raise TypeError("Cette fonction nécessite un DatetimeIndex.")
return index
def _season_indices_for_month(months: np.ndarray, hemisphere: str) -> np.ndarray:
base_indices = np.vectorize(MONTH_TO_SEASON_INDEX.get)(months)
if hemisphere == "south":
return (base_indices + 2) % len(SEASON_LABELS)
return base_indices
def compute_season_series(
index: pd.Index,
*,
hemisphere: str = "north",
column_name: str = "season",
) -> pd.Series:
"""
Retourne une série catégorielle indiquant la saison météorologique pour chaque timestamp.
"""
hemisphere = hemisphere.lower()
if hemisphere not in {"north", "south"}:
raise ValueError("hemisphere doit valoir 'north' ou 'south'.")
dt_index = _ensure_datetime_index(index)
month_array = dt_index.month.to_numpy()
season_indices = _season_indices_for_month(month_array, hemisphere)
labels = SEASON_LABELS[season_indices]
return pd.Series(labels, index=dt_index, name=column_name)
def add_season_column(
df: pd.DataFrame,
*,
hemisphere: str = "north",
column_name: str = "season",
) -> pd.DataFrame:
"""
Ajoute une colonne 'season' (winter/spring/summer/autumn) au DataFrame.
"""
series = compute_season_series(df.index, hemisphere=hemisphere, column_name=column_name)
df[column_name] = series
return df
def sort_season_labels(
labels: Iterable[str],
*,
order: Sequence[str] | None = None,
) -> list[str]:
"""
Trie la liste fournie en respectant l'ordre saisonnier par défaut.
"""
reference = [str(season) for season in (order if order is not None else SEASON_LABELS)]
label_set = {str(label) for label in labels if label}
return [season for season in reference if season in label_set]

66
meteo/solar.py Normal file
View File

@ -0,0 +1,66 @@
# meteo/solar.py
from __future__ import annotations
import pandas as pd
from astral import Observer
from astral.sun import elevation
def _ensure_datetime_index(index: pd.Index) -> pd.DatetimeIndex:
if not isinstance(index, pd.DatetimeIndex):
raise TypeError("Un DatetimeIndex est requis pour calculer l'élévation solaire.")
return index
def _prepare_index(index: pd.DatetimeIndex) -> pd.DatetimeIndex:
"""
Retourne une version timezone-aware (en UTC) du DatetimeIndex fourni.
"""
if index.tz is None:
return index.tz_localize("UTC")
return index.tz_convert("UTC")
def compute_solar_elevation_series(
index: pd.Index,
*,
latitude: float,
longitude: float,
elevation_m: float = 0.0,
series_name: str = "sun_elevation",
) -> pd.Series:
"""
Calcule l'élévation du soleil (en degrés) pour chaque timestamp de l'index.
"""
dt_index = _ensure_datetime_index(index)
observer = Observer(latitude=latitude, longitude=longitude, elevation=elevation_m)
utc_index = _prepare_index(dt_index)
values = [
float(elevation(observer, ts.to_pydatetime()))
for ts in utc_index
]
return pd.Series(values, index=dt_index, name=series_name)
def add_solar_elevation_column(
df: pd.DataFrame,
*,
latitude: float,
longitude: float,
elevation_m: float = 0.0,
column_name: str = "sun_elevation",
) -> pd.DataFrame:
"""
Ajoute une colonne `column_name` contenant l'élévation du soleil en degrés.
"""
series = compute_solar_elevation_series(
df.index,
latitude=latitude,
longitude=longitude,
elevation_m=elevation_m,
series_name=column_name,
)
df[column_name] = series
return df

View File

@ -65,6 +65,12 @@ VARIABLES: List[Variable] = [
label="Direction du vent", label="Direction du vent",
unit="°", unit="°",
), ),
Variable(
key="sun_elevation",
column="sun_elevation",
label="Élévation solaire",
unit="°",
),
] ]
VARIABLES_BY_KEY: Dict[str, Variable] = {v.key: v for v in VARIABLES} VARIABLES_BY_KEY: Dict[str, Variable] = {v.key: v for v in VARIABLES}

View File

@ -9,6 +9,9 @@ numpy
matplotlib matplotlib
seaborn seaborn
# Astronomie / position du soleil
astral
# Modèles statistiques / ML # Modèles statistiques / ML
scikit-learn scikit-learn
statsmodels statsmodels

View File

@ -4,6 +4,9 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from meteo.dataset import load_raw_csv, resample_to_minutes from meteo.dataset import load_raw_csv, resample_to_minutes
from meteo.config import StationLocation
from meteo.solar import add_solar_elevation_column
from meteo.season import add_season_column
FORMATTED_CSV_PATH = Path("data/weather_filled_1s.csv") FORMATTED_CSV_PATH = Path("data/weather_filled_1s.csv")
@ -23,6 +26,34 @@ def main() -> None:
df_min = resample_to_minutes(df_1s) df_min = resample_to_minutes(df_1s)
print(f"Après resampling 60s : {len(df_min)} lignes") print(f"Après resampling 60s : {len(df_min)} lignes")
hemisphere = "north"
try:
location = StationLocation.from_env(optional=True)
except RuntimeError as exc:
print(f"⚠ Coordonnées GPS invalides : {exc}")
location = None
if location is not None:
hemisphere = "south" if location.latitude < 0 else "north"
print(
f"Ajout de l'élévation solaire (lat={location.latitude}, lon={location.longitude}, "
f"alt={location.elevation_m} m)..."
)
add_solar_elevation_column(
df_min,
latitude=location.latitude,
longitude=location.longitude,
elevation_m=location.elevation_m,
)
else:
print(
" Coordonnées GPS non définies (STATION_LATITUDE / STATION_LONGITUDE). "
"La colonne sun_elevation ne sera pas ajoutée."
)
print(" Saison : hypothèse par défaut = hémisphère nord. Définissez STATION_LATITUDE pour adapter.")
add_season_column(df_min, hemisphere=hemisphere)
OUTPUT_CSV_PATH.parent.mkdir(parents=True, exist_ok=True) OUTPUT_CSV_PATH.parent.mkdir(parents=True, exist_ok=True)
df_min.to_csv(OUTPUT_CSV_PATH, index_label="time") df_min.to_csv(OUTPUT_CSV_PATH, index_label="time")
print(f"✔ Dataset minuté écrit dans : {OUTPUT_CSV_PATH.resolve()}") print(f"✔ Dataset minuté écrit dans : {OUTPUT_CSV_PATH.resolve()}")

View File

@ -0,0 +1,213 @@
# scripts/plot_calendar_overview.py
from __future__ import annotations
from pathlib import Path
import calendar
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from meteo.dataset import load_raw_csv
from meteo.analysis import compute_daily_rainfall_totals
from meteo.plots import plot_calendar_heatmap, plot_weekday_profiles
from meteo.variables import VARIABLES_BY_KEY
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/calendar")
WEEKDAY_VARIABLE_KEYS = ["temperature", "humidity", "wind_speed", "illuminance"]
def _format_calendar_matrix(series: pd.Series, year: int, agg_label: str) -> pd.DataFrame:
"""
Transforme une série quotidienne en matrice mois x jours (1-31).
"""
start = pd.Timestamp(year=year, month=1, day=1, tz=series.index.tz)
end = pd.Timestamp(year=year, month=12, day=31, tz=series.index.tz)
filtered = series.loc[(series.index >= start) & (series.index <= end)]
matrix = pd.DataFrame(
np.nan,
index=[calendar.month_name[m][:3] for m in range(1, 13)],
columns=list(range(1, 32)),
)
for timestamp, value in filtered.items():
month = timestamp.month
day = timestamp.day
matrix.at[calendar.month_name[month][:3], day] = value
matrix.index.name = f"{agg_label} ({year})"
return matrix
def compute_daily_mean(df: pd.DataFrame, column: str) -> pd.Series:
return df[column].resample("1D").mean()
def plot_combined_calendar(
matrices: dict[str, pd.DataFrame],
output_path: Path,
*,
title: str,
) -> None:
if not matrices:
return
n = len(matrices)
fig, axes = plt.subplots(n, 1, figsize=(14, 4 * n), sharex=True)
if n == 1:
axes = [axes]
for ax, (label, matrix) in zip(axes, matrices.items()):
data = matrix.to_numpy(dtype=float)
im = ax.imshow(data, aspect="auto", interpolation="nearest", cmap=matrix.attrs.get("cmap", "viridis"))
ax.set_xticks(np.arange(matrix.shape[1]))
ax.set_xticklabels(matrix.columns, rotation=90)
ax.set_yticks(np.arange(matrix.shape[0]))
ax.set_yticklabels(matrix.index)
ax.set_ylabel(label)
cbar = fig.colorbar(im, ax=ax)
if matrix.attrs.get("colorbar_label"):
cbar.set_label(matrix.attrs["colorbar_label"])
axes[-1].set_xlabel("Jour du mois")
fig.suptitle(title)
fig.tight_layout(rect=[0, 0, 1, 0.97])
fig.savefig(output_path, dpi=150)
plt.close(fig)
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
if df.empty:
print("⚠ Dataset vide.")
return
if not isinstance(df.index, pd.DatetimeIndex):
print("⚠ Le dataset doit avoir un index temporel.")
return
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
latest_year = df.index.year.max()
print(f"Année retenue pour le calendrier : {latest_year}")
daily_totals = compute_daily_rainfall_totals(df=df)
daily_rain = daily_totals["daily_total"]
rain_matrix = _format_calendar_matrix(daily_rain, latest_year, "Pluie (mm)")
rain_matrix.attrs["cmap"] = "Blues"
rain_matrix.attrs["colorbar_label"] = "mm"
rain_path = OUTPUT_DIR / f"calendar_rain_{latest_year}.png"
plot_calendar_heatmap(
matrix=rain_matrix,
output_path=rain_path,
title=f"Pluie quotidienne - {latest_year}",
cmap="Blues",
colorbar_label="mm",
)
print(f"✔ Heatmap pluie {latest_year} : {rain_path}")
daily_temp = compute_daily_mean(df, "temperature")
temp_matrix = _format_calendar_matrix(daily_temp, latest_year, "Température (°C)")
temp_matrix.attrs["cmap"] = "coolwarm"
temp_matrix.attrs["colorbar_label"] = "°C"
temp_path = OUTPUT_DIR / f"calendar_temperature_{latest_year}.png"
plot_calendar_heatmap(
matrix=temp_matrix,
output_path=temp_path,
title=f"Température moyenne quotidienne - {latest_year}",
cmap="coolwarm",
colorbar_label="°C",
)
print(f"✔ Heatmap température {latest_year} : {temp_path}")
matrices_for_combined = {
"Pluie (mm)": rain_matrix,
"Température (°C)": temp_matrix,
}
if "pressure" in df.columns:
daily_pressure = compute_daily_mean(df, "pressure")
pressure_matrix = _format_calendar_matrix(daily_pressure, latest_year, "Pression (hPa)")
pressure_matrix.attrs["cmap"] = "Greens"
pressure_matrix.attrs["colorbar_label"] = "hPa"
pressure_path = OUTPUT_DIR / f"calendar_pressure_{latest_year}.png"
plot_calendar_heatmap(
matrix=pressure_matrix,
output_path=pressure_path,
title=f"Pression moyenne quotidienne - {latest_year}",
cmap="Greens",
colorbar_label="hPa",
)
print(f"✔ Heatmap pression {latest_year} : {pressure_path}")
matrices_for_combined["Pression (hPa)"] = pressure_matrix
if "illuminance" in df.columns:
daily_lux = compute_daily_mean(df, "illuminance")
lux_matrix = _format_calendar_matrix(daily_lux, latest_year, "Illuminance (lux)")
lux_matrix.attrs["cmap"] = "YlOrBr"
lux_matrix.attrs["colorbar_label"] = "lux"
lux_path = OUTPUT_DIR / f"calendar_illuminance_{latest_year}.png"
plot_calendar_heatmap(
matrix=lux_matrix,
output_path=lux_path,
title=f"Illuminance moyenne quotidienne - {latest_year}",
cmap="YlOrBr",
colorbar_label="lux",
)
print(f"✔ Heatmap illuminance {latest_year} : {lux_path}")
matrices_for_combined["Illuminance (lux)"] = lux_matrix
if "wind_speed" in df.columns:
daily_wind = compute_daily_mean(df, "wind_speed")
wind_matrix = _format_calendar_matrix(daily_wind, latest_year, "Vent (km/h)")
wind_matrix.attrs["cmap"] = "Purples"
wind_matrix.attrs["colorbar_label"] = "km/h"
wind_path = OUTPUT_DIR / f"calendar_wind_{latest_year}.png"
plot_calendar_heatmap(
matrix=wind_matrix,
output_path=wind_path,
title=f"Vitesse moyenne du vent - {latest_year}",
cmap="Purples",
colorbar_label="km/h",
)
print(f"✔ Heatmap vent {latest_year} : {wind_path}")
matrices_for_combined["Vent (km/h)"] = wind_matrix
combined_path = OUTPUT_DIR / f"calendar_combined_{latest_year}.png"
plot_combined_calendar(
matrices=matrices_for_combined,
output_path=combined_path,
title=f"Calendrier combiné {latest_year}",
)
print(f"✔ Calendrier combiné : {combined_path}")
hourly = df[WEEKDAY_VARIABLE_KEYS].resample("1h").mean()
weekday_stats = hourly.groupby(hourly.index.dayofweek).mean()
weekday_path = OUTPUT_DIR / "weekday_profiles.png"
variables = [VARIABLES_BY_KEY[key] for key in WEEKDAY_VARIABLE_KEYS]
plot_weekday_profiles(
weekday_df=weekday_stats,
variables=variables,
output_path=weekday_path,
title="Profils moyens par jour de semaine",
)
print(f"✔ Profils hebdomadaires : {weekday_path}")
print("✔ Graphiques calendrier générés.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,46 @@
# scripts/plot_diurnal_cycle.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.variables import VARIABLES_BY_KEY
from meteo.analysis import compute_diurnal_cycle_statistics
from meteo.plots import plot_diurnal_cycle
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_PATH = Path("figures/diurnal_cycle/diurnal_cycle.png")
VARIABLE_KEYS = ["temperature", "humidity", "pressure", "wind_speed", "illuminance"]
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
variables = [VARIABLES_BY_KEY[key] for key in VARIABLE_KEYS]
stats = compute_diurnal_cycle_statistics(
df=df,
variables=variables,
quantiles=(0.25, 0.75),
)
output_path = plot_diurnal_cycle(
stats=stats,
variables=variables,
output_path=OUTPUT_PATH,
)
print(f"✔ Cycle diurne sauvegardé : {output_path}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,128 @@
# scripts/plot_hexbin_explorations.py
from __future__ import annotations
from pathlib import Path
from typing import Callable
import numpy as np
from meteo.dataset import load_raw_csv
from meteo.variables import VARIABLES_BY_KEY
from meteo.plots import plot_hexbin_with_third_variable
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/hexbin_explorations")
REDUCE_FUNCTIONS: dict[str, Callable[[np.ndarray], float]] = {
"mean": np.mean,
"median": np.median,
"max": np.max,
}
REDUCE_LABEL_FR: dict[str, str] = {
"mean": "moyenne",
"median": "médiane",
"max": "maximum",
}
# Chaque scénario illustre soit une corrélation bien connue,
# soit l'absence de structure entre variables.
HEXBIN_SCENARIOS: list[dict[str, object]] = [
{
"x": "temperature",
"y": "humidity",
"color": "rain_rate",
"filename": "hexbin_temp_humidity_color_rain.png",
"description": (
"Mettre en évidence comment l'humidité relative plafonne lorsque la température chute "
"et comment les épisodes de pluie se situent dans une bande restreinte."
),
"reduce": "max",
"gridsize": 50,
"mincnt": 8,
},
{
"x": "pressure",
"y": "rain_rate",
"color": "wind_speed",
"filename": "hexbin_pressure_rain_color_wind.png",
"description": (
"Vérifier si des rafales accompagnent vraiment les chutes de pression. "
"On s'attend à voir beaucoup de cases vides : la corrélation est loin d'être systématique."
),
"reduce": "median",
"gridsize": 45,
"mincnt": 5,
},
{
"x": "illuminance",
"y": "humidity",
"color": "temperature",
"filename": "hexbin_lux_humidity_color_temp.png",
"description": (
"Explorer le cycle jour/nuit : l'humidité monte quand l'illuminance chute, "
"mais cela n'implique pas toujours une baisse rapide de température."
),
"reduce": "mean",
"gridsize": 55,
"mincnt": 6,
},
]
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
for scenario in HEXBIN_SCENARIOS:
key_x = scenario["x"]
key_y = scenario["y"]
key_color = scenario["color"]
var_x = VARIABLES_BY_KEY[key_x]
var_y = VARIABLES_BY_KEY[key_y]
var_color = VARIABLES_BY_KEY[key_color]
filename = scenario["filename"]
output_path = OUTPUT_DIR / filename
reduce_name = scenario.get("reduce", "mean")
reduce_func = REDUCE_FUNCTIONS.get(reduce_name, np.mean)
reduce_label = REDUCE_LABEL_FR.get(reduce_name, reduce_name)
gridsize = int(scenario.get("gridsize", 60))
mincnt = int(scenario.get("mincnt", 5))
description = scenario["description"]
print(f"→ Hexbin {var_y.key} vs {var_x.key} (couleur = {var_color.key})")
print(f" {description}")
plot_hexbin_with_third_variable(
df=df,
var_x=var_x,
var_y=var_y,
var_color=var_color,
output_path=output_path,
gridsize=gridsize,
mincnt=mincnt,
reduce_func=reduce_func,
reduce_func_label=reduce_label,
cmap="magma",
)
print(f" ✔ Graphique enregistré : {output_path}")
print()
print("✔ Tous les graphiques hexbin ont été générés.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,64 @@
# scripts/plot_illuminance_focus.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.analysis import compute_seasonal_hourly_profile, compute_monthly_daylight_hours
from meteo.plots import plot_seasonal_hourly_profiles, plot_daylight_hours
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/illuminance")
DAYLIGHT_THRESHOLD_LUX = 1000.0
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
if "illuminance" not in df.columns:
print("⚠ La colonne 'illuminance' est absente du dataset.")
return
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
seasonal_profile = compute_seasonal_hourly_profile(
df=df,
value_column="illuminance",
season_column="season",
)
seasonal_path = OUTPUT_DIR / "seasonal_diurnal_illuminance.png"
plot_seasonal_hourly_profiles(
profile_df=seasonal_profile,
output_path=seasonal_path,
title="Illuminance moyenne par heure et par saison",
ylabel="Illuminance (lux)",
)
print(f"✔ Profil saisonnier de l'illuminance : {seasonal_path}")
daylight_hours = compute_monthly_daylight_hours(
df=df,
illuminance_column="illuminance",
threshold_lux=DAYLIGHT_THRESHOLD_LUX,
)
daylight_path = OUTPUT_DIR / "monthly_daylight_hours.png"
plot_daylight_hours(
monthly_series=daylight_hours,
output_path=daylight_path,
title=f"Durée moyenne quotidienne > {DAYLIGHT_THRESHOLD_LUX:.0f} lx",
)
print(f"✔ Durée de luminosité mensuelle : {daylight_path}")
print("✔ Graphiques dédiés à l'illuminance générés.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,54 @@
# scripts/plot_monthly_patterns.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.variables import VARIABLES_BY_KEY
from meteo.analysis import compute_monthly_climatology, compute_monthly_means
from meteo.plots import plot_monthly_boxplots, plot_monthly_anomalies
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/monthly")
BOXPLOT_KEYS = ["temperature", "humidity", "pressure", "wind_speed", "illuminance"]
ANOMALY_KEYS = ["temperature", "humidity", "illuminance"]
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
box_vars = [VARIABLES_BY_KEY[key] for key in BOXPLOT_KEYS]
boxplot_path = OUTPUT_DIR / "monthly_boxplots.png"
plot_monthly_boxplots(df=df, variables=box_vars, output_path=boxplot_path)
print(f"✔ Boxplots mensuels : {boxplot_path}")
anomaly_vars = [VARIABLES_BY_KEY[key] for key in ANOMALY_KEYS]
monthly_means = compute_monthly_means(df=df, columns=[v.column for v in anomaly_vars])
climatology = compute_monthly_climatology(df=df, columns=[v.column for v in anomaly_vars])
anomaly_path = OUTPUT_DIR / "monthly_anomalies.png"
plot_monthly_anomalies(
monthly_means=monthly_means,
climatology=climatology,
variables=anomaly_vars,
output_path=anomaly_path,
)
print(f"✔ Anomalies mensuelles : {anomaly_path}")
print("✔ Graphiques mensuels générés.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,85 @@
# scripts/plot_rain_event_composites.py
from __future__ import annotations
from pathlib import Path
from typing import Sequence
import pandas as pd
from meteo.dataset import load_raw_csv
from meteo.variables import Variable, VARIABLES_BY_KEY
from meteo.analysis import detect_threshold_events, build_event_aligned_segments
from meteo.plots import plot_event_composite
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_PATH = Path("figures/event_composites/rain_event_composites.png")
RAIN_THRESHOLD = 0.2 # mm/h : au-dessous on considère qu'il ne pleut pas vraiment
MIN_EVENT_DURATION = 5 # minutes
MIN_EVENT_GAP = 20 # minutes nécessaires pour considérer un nouvel événement
WINDOW_BEFORE = 120 # minutes affichées avant le début de la pluie
WINDOW_AFTER = 240 # minutes après le déclenchement
COMPOSITE_VARIABLE_KEYS: Sequence[str] = [
"pressure",
"temperature",
"humidity",
"wind_speed",
]
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
rain_series = df["rain_rate"]
events = detect_threshold_events(
rain_series,
threshold=RAIN_THRESHOLD,
min_duration=pd.Timedelta(minutes=MIN_EVENT_DURATION),
min_gap=pd.Timedelta(minutes=MIN_EVENT_GAP),
)
if not events:
print("⚠ Aucun événement de pluie détecté avec les paramètres actuels.")
return
print(f"Nombre d'événements détectés : {len(events)}")
variables: list[Variable] = [VARIABLES_BY_KEY[key] for key in COMPOSITE_VARIABLE_KEYS]
columns = [v.column for v in variables]
aligned_segments = build_event_aligned_segments(
df=df,
events=events,
columns=columns,
window_before_minutes=WINDOW_BEFORE,
window_after_minutes=WINDOW_AFTER,
resample_minutes=1,
)
if aligned_segments.empty:
print("⚠ Les segments alignés sont vides (période manquante ?).")
return
output_path = plot_event_composite(
aligned_segments=aligned_segments,
variables=variables,
output_path=OUTPUT_PATH,
quantiles=(0.2, 0.8),
baseline_label="Début de la pluie",
)
print(f"✔ Graphique composite pluie sauvegardé : {output_path}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,41 @@
# scripts/plot_rain_hyetograph.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.analysis import compute_daily_rainfall_totals
from meteo.plots import plot_daily_rainfall_hyetograph
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_PATH = Path("figures/rainfall_hyetograph/daily_rainfall_hyetograph.png")
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
daily_totals = compute_daily_rainfall_totals(df=df, rate_column="rain_rate")
if daily_totals.empty:
print("⚠ Aucune donnée de pluie cumule à afficher.")
return
output_path = plot_daily_rainfall_hyetograph(
daily_rain=daily_totals,
output_path=OUTPUT_PATH,
)
print(f"✔ Hyétographe quotidien exporté : {output_path}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,65 @@
# scripts/plot_rolling_correlation_heatmap.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.variables import VARIABLES_BY_KEY
from meteo.analysis import compute_rolling_correlations_for_pairs
from meteo.plots import plot_rolling_correlation_heatmap
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_PATH = Path("figures/rolling_correlations/rolling_correlation_heatmap.png")
ROLLING_PAIRS: list[tuple[str, str]] = [
("temperature", "humidity"),
("pressure", "rain_rate"),
("pressure", "wind_speed"),
("illuminance", "temperature"),
("humidity", "rain_rate"),
]
WINDOW_MINUTES = 180 # 3 heures pour observer les tendances synoptiques
STEP_MINUTES = 30 # on n'échantillonne qu'un point sur 30 minutes
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
pairs = [(VARIABLES_BY_KEY[a], VARIABLES_BY_KEY[b]) for a, b in ROLLING_PAIRS]
rolling_df = compute_rolling_correlations_for_pairs(
df=df,
pairs=pairs,
window_minutes=WINDOW_MINUTES,
min_valid_fraction=0.7,
step_minutes=STEP_MINUTES,
method="pearson",
)
if rolling_df.empty:
print("⚠ Impossible de calculer les corrélations glissantes (données insuffisantes).")
return
output_path = plot_rolling_correlation_heatmap(
rolling_corr=rolling_df,
output_path=OUTPUT_PATH,
cmap="coolwarm",
vmin=-1.0,
vmax=1.0,
)
print(f"✔ Heatmap de corrélations glissantes enregistrée : {output_path}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
# scripts/plot_seasonal_overview.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.variables import VARIABLES_BY_KEY
from meteo.analysis import compute_rainfall_by_season
from meteo.plots import plot_seasonal_boxplots, plot_rainfall_by_season
from meteo.season import sort_season_labels, SEASON_LABELS
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/seasonal")
BOXPLOT_VARIABLES = ["temperature", "humidity", "pressure", "wind_speed", "illuminance"]
def infer_season_order(df) -> list[str]:
seasons = df["season"].dropna().unique()
order = sort_season_labels(seasons, order=SEASON_LABELS)
if not order:
order = list(SEASON_LABELS)
return order
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
if "season" not in df.columns:
print("⚠ La colonne 'season' est absente. Relancez scripts.make_minutely_dataset.")
return
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
season_order = infer_season_order(df)
print(f"Saisons détectées : {season_order}")
variables = [VARIABLES_BY_KEY[key] for key in BOXPLOT_VARIABLES]
boxplot_path = OUTPUT_DIR / "seasonal_boxplots.png"
plot_seasonal_boxplots(
df=df,
variables=variables,
output_path=boxplot_path,
season_order=season_order,
title="Distribution des mesures par saison",
)
print(f"✔ Boxplots saisonniers : {boxplot_path}")
rainfall = compute_rainfall_by_season(df=df, rate_column="rain_rate", season_column="season")
rainfall_path = OUTPUT_DIR / "rainfall_by_season.png"
plot_rainfall_by_season(rainfall_df=rainfall, output_path=rainfall_path)
print(f"✔ Pluie saisonnière : {rainfall_path}")
print("✔ Tous les graphiques saisonniers ont été générés.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,128 @@
# scripts/plot_sun_elevation_relationships.py
from __future__ import annotations
from pathlib import Path
import numpy as np
from meteo.dataset import load_raw_csv
from meteo.variables import VARIABLES_BY_KEY
from meteo.analysis import compute_binned_statistics
from meteo.plots import plot_binned_profiles, plot_hexbin_with_third_variable
from meteo.config import StationLocation
from meteo.solar import add_solar_elevation_column
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/sun")
def ensure_sun_elevation(df):
if "sun_elevation" in df.columns:
return df
print(" La colonne 'sun_elevation' est absente, tentative de calcul à la volée.")
location = StationLocation.from_env(optional=True)
if location is None:
print(
"⚠ Impossible de calculer l'élévation solaire : définissez STATION_LATITUDE et STATION_LONGITUDE "
"puis regénérez le dataset (scripts/make_minutely_dataset)."
)
return None
print(
f"→ Calcul d'élévation solaire avec lat={location.latitude}, lon={location.longitude}, "
f"alt={location.elevation_m} m."
)
add_solar_elevation_column(
df,
latitude=location.latitude,
longitude=location.longitude,
elevation_m=location.elevation_m,
)
return df
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
df = ensure_sun_elevation(df)
if df is None or "sun_elevation" not in df.columns:
return
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
profile_keys = ["temperature", "humidity", "illuminance"]
profile_vars = [VARIABLES_BY_KEY[key] for key in profile_keys]
bins = np.arange(-90, 95, 5) # bins de 5°
stats = compute_binned_statistics(
df=df,
bin_source_column="sun_elevation",
target_columns=[v.column for v in profile_vars],
bins=bins,
min_count=100,
quantiles=(0.2, 0.8),
)
profile_output = OUTPUT_DIR / "sun_elevation_profiles.png"
plot_binned_profiles(
stats=stats,
variables=profile_vars,
output_path=profile_output,
xlabel="Élévation solaire (°)",
title="Profils moyens en fonction de l'élévation solaire",
show_counts=True,
)
print(f"✔ Profils sun vs variables : {profile_output}")
hexbin_scenarios = [
{
"x": "sun_elevation",
"y": "illuminance",
"color": "temperature",
"filename": "hexbin_sun_elevation_vs_illuminance.png",
"description": "Illuminance en fonction de l'élévation du soleil, couleur = température.",
},
{
"x": "sun_elevation",
"y": "temperature",
"color": "humidity",
"filename": "hexbin_sun_elevation_vs_temperature.png",
"description": "Température en fonction de l'élévation, couleur = humidité relative.",
},
]
for scenario in hexbin_scenarios:
var_x = VARIABLES_BY_KEY[scenario["x"]]
var_y = VARIABLES_BY_KEY[scenario["y"]]
var_color = VARIABLES_BY_KEY[scenario["color"]]
output_path = OUTPUT_DIR / scenario["filename"]
print(f"{scenario['description']}")
plot_hexbin_with_third_variable(
df=df,
var_x=var_x,
var_y=var_y,
var_color=var_color,
output_path=output_path,
gridsize=60,
mincnt=10,
reduce_func_label="moyenne",
cmap="cividis",
)
print(f" ✔ Hexbin enregistré : {output_path}")
print("✔ Tous les graphiques liés à l'élévation solaire ont été produits.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,86 @@
# scripts/plot_wind_conditionals.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.analysis import (
compute_wind_rose_distribution,
filter_by_condition,
compute_mean_wind_components,
)
from meteo.plots import plot_wind_rose, plot_wind_vector_series
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_DIR = Path("figures/wind_conditionals")
RAIN_THRESHOLD = 0.2 # mm/h
def _export_wind_rose(df, label: str, filename: str) -> None:
if df.empty:
print(f"⚠ Pas de données pour {label}.")
return
frequencies, speed_labels, sector_size = compute_wind_rose_distribution(
df=df,
direction_sector_size=30,
speed_bins=(0, 5, 15, 30, 50, float("inf")),
)
if frequencies.empty:
print(f"⚠ Impossible de construire la rose pour {label}.")
return
output_path = OUTPUT_DIR / filename
plot_wind_rose(
frequencies=frequencies,
speed_bin_labels=speed_labels,
output_path=output_path,
sector_size_deg=sector_size,
cmap="plasma",
)
print(f"✔ Rose des vents ({label}) : {output_path}")
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
if df.empty:
print("⚠ Dataset vide.")
return
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
if "rain_rate" not in df.columns:
print("⚠ Colonne 'rain_rate' absente.")
return
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
rain_condition = df["rain_rate"].fillna(0.0) >= RAIN_THRESHOLD
dry_condition = df["rain_rate"].fillna(0.0) < RAIN_THRESHOLD
_export_wind_rose(df, "toutes conditions", "wind_rose_all.png")
_export_wind_rose(filter_by_condition(df, condition=rain_condition), "pluie", "wind_rose_rain.png")
_export_wind_rose(filter_by_condition(df, condition=dry_condition), "temps sec", "wind_rose_dry.png")
# Vecteurs moyens par mois
vector_df = compute_mean_wind_components(df=df, freq="1M")
vector_path = OUTPUT_DIR / "wind_vectors_monthly.png"
plot_wind_vector_series(
vector_df=vector_df,
output_path=vector_path,
title="Vecteurs moyens du vent (mensuel)",
)
print(f"✔ Vecteurs de vent mensuels : {vector_path}")
print("✔ Graphiques vent/pluie conditionnels générés.")
if __name__ == "__main__":
main()

48
scripts/plot_wind_rose.py Normal file
View File

@ -0,0 +1,48 @@
# scripts/plot_wind_rose.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.analysis import compute_wind_rose_distribution
from meteo.plots import plot_wind_rose
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_PATH = Path("figures/wind_rose/wind_rose.png")
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
frequencies, labels, sector_size = compute_wind_rose_distribution(
df=df,
direction_sector_size=30,
speed_bins=(0, 5, 15, 30, 50, float("inf")),
)
if frequencies.empty:
print("⚠ Pas assez de données pour construire une rose des vents.")
return
output_path = plot_wind_rose(
frequencies=frequencies,
speed_bin_labels=labels,
output_path=OUTPUT_PATH,
sector_size_deg=sector_size,
cmap="plasma",
)
print(f"✔ Rose des vents exportée : {output_path}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,55 @@
# scripts/plot_wind_rose_rain.py
from __future__ import annotations
from pathlib import Path
from meteo.dataset import load_raw_csv
from meteo.analysis import compute_wind_rose_distribution
from meteo.plots import plot_wind_rose
CSV_PATH = Path("data/weather_minutely.csv")
OUTPUT_PATH = Path("figures/wind_rose/wind_rose_during_rain.png")
RAIN_THRESHOLD = 0.2 # mm/h, pour considérer qu'il pleut réellement
def main() -> None:
if not CSV_PATH.exists():
print(f"⚠ Fichier introuvable : {CSV_PATH}")
return
df = load_raw_csv(CSV_PATH)
print(f"Dataset minuté chargé : {CSV_PATH}")
print(f" Lignes : {len(df)}")
print(f" Colonnes : {list(df.columns)}")
print()
rainy_df = df[df["rain_rate"].fillna(0.0) >= RAIN_THRESHOLD]
print(f"Lignes avec pluie ≥ {RAIN_THRESHOLD} mm/h : {len(rainy_df)}")
if rainy_df.empty:
print("⚠ Aucun événement pluvieux ne dépasse ce seuil, abandon.")
return
frequencies, labels, sector_size = compute_wind_rose_distribution(
df=rainy_df,
direction_sector_size=30,
speed_bins=(0, 5, 15, 30, 50, float("inf")),
)
if frequencies.empty:
print("⚠ Pas assez de données pour construire une rose des vents pendant la pluie.")
return
output_path = plot_wind_rose(
frequencies=frequencies,
speed_bin_labels=labels,
output_path=OUTPUT_PATH,
sector_size_deg=sector_size,
cmap="plasma",
)
print(f"✔ Rose des vents pendant la pluie exportée : {output_path}")
if __name__ == "__main__":
main()