172 lines
5.4 KiB
Python
172 lines
5.4 KiB
Python
# meteo/dataset.py
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Literal
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
def fill_missing_with_previous(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Remplit les valeurs manquantes en propageant, pour chaque capteur, la
|
|
dernière valeur connue vers le bas (forward-fill).
|
|
|
|
C'est adapté au comportement de Home Assistant, qui n'écrit une nouvelle
|
|
valeur que lorsque l'état change.
|
|
|
|
- Les premières lignes, avant toute mesure pour un capteur donné,
|
|
resteront NaN ; on supprime les lignes qui sont NaN pour toutes
|
|
les colonnes.
|
|
"""
|
|
if not isinstance(df.index, pd.DatetimeIndex):
|
|
raise TypeError(
|
|
"fill_missing_with_previous nécessite un DataFrame avec un DatetimeIndex. "
|
|
"Utilisez d'abord load_raw_csv() ou imposez un index temporel."
|
|
)
|
|
|
|
df = df.sort_index()
|
|
|
|
# Propage la dernière valeur connue vers le bas
|
|
df_filled = df.ffill()
|
|
|
|
# Supprime les lignes vraiment vides (avant la première donnée)
|
|
df_filled = df_filled.dropna(how="all")
|
|
|
|
return df_filled
|
|
|
|
def _circular_mean_deg(series: pd.Series) -> float | np.floating | float("nan"):
|
|
"""
|
|
Calcule la moyenne d'un angle en degrés en tenant compte de la circularité.
|
|
|
|
Exemple : la moyenne de 350° et 10° = 0° (et pas 180°).
|
|
|
|
Retourne NaN si la série est vide ou entièrement NaN.
|
|
"""
|
|
values = series.dropna().to_numpy(dtype=float)
|
|
if values.size == 0:
|
|
return float("nan")
|
|
|
|
radians = np.deg2rad(values)
|
|
sin_mean = np.sin(radians).mean()
|
|
cos_mean = np.cos(radians).mean()
|
|
|
|
angle = np.rad2deg(np.arctan2(sin_mean, cos_mean))
|
|
if angle < 0:
|
|
angle += 360.0
|
|
return angle
|
|
|
|
|
|
def resample_to_minutes(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Ramène un DataFrame indexé par le temps à une granularité d'une minute.
|
|
|
|
Hypothèses :
|
|
- Index = DatetimeIndex (par exemple issu de load_raw_csv sur le CSV "formaté").
|
|
- Colonnes attendues :
|
|
temperature, humidity, pressure, illuminance,
|
|
wind_speed, wind_direction, rain_rate
|
|
|
|
Agrégation :
|
|
- Température, humidité, pression, illuminance, vitesse de vent :
|
|
moyenne sur la minute.
|
|
- Direction du vent :
|
|
moyenne circulaire en degrés.
|
|
- rain_rate (mm/h) :
|
|
moyenne sur la minute (on reste sur un taux, on ne convertit pas en cumul).
|
|
"""
|
|
if not isinstance(df.index, pd.DatetimeIndex):
|
|
raise TypeError(
|
|
"resample_to_minutes nécessite un DataFrame avec un DatetimeIndex. "
|
|
"Utilisez load_raw_csv() pour charger le CSV."
|
|
)
|
|
|
|
# On définit une stratégie d'agrégation par colonne
|
|
agg = {
|
|
"temperature": "mean",
|
|
"humidity": "mean",
|
|
"pressure": "mean",
|
|
"illuminance": "mean",
|
|
"wind_speed": "mean",
|
|
"wind_direction": _circular_mean_deg,
|
|
"rain_rate": "mean",
|
|
}
|
|
|
|
df_minutely = df.resample("60s").agg(agg)
|
|
|
|
# On supprime les minutes où il n'y a vraiment aucune donnée
|
|
df_minutely = df_minutely.dropna(how="all")
|
|
|
|
return df_minutely
|
|
|
|
|
|
def load_raw_csv(path: str | Path) -> pd.DataFrame:
|
|
"""
|
|
Charge le CSV brut exporté depuis InfluxDB et retourne un DataFrame
|
|
indexé par le temps.
|
|
|
|
- La colonne `time` est lue comme texte.
|
|
- On la parse explicitement en ISO8601 (gère les microsecondes optionnelles).
|
|
- `time` devient l'index.
|
|
- Les lignes sont triées par ordre chronologique.
|
|
"""
|
|
csv_path = Path(path)
|
|
|
|
# On lit sans parsing automatique pour garder le contrôle
|
|
df = pd.read_csv(csv_path, dtype={"time": "string"})
|
|
|
|
if "time" not in df.columns:
|
|
raise ValueError(
|
|
f"Le fichier {csv_path} ne contient pas de colonne 'time'. "
|
|
"Ce fichier n'a probablement pas été généré par export_station_data."
|
|
)
|
|
|
|
# Parsing robuste des timestamps ISO8601 (gère 2025-...SS+00:00 et 2025-...SS.ffffff+00:00)
|
|
df["time"] = pd.to_datetime(df["time"], format="ISO8601")
|
|
|
|
df = df.set_index("time").sort_index()
|
|
|
|
# On vérifie qu'on a bien un DatetimeIndex
|
|
if not isinstance(df.index, pd.DatetimeIndex):
|
|
raise TypeError(
|
|
f"L'index du fichier {csv_path} n'est pas un DatetimeIndex après parsing ISO8601."
|
|
)
|
|
|
|
return df
|
|
|
|
|
|
def combine_close_observations(
|
|
df: pd.DataFrame,
|
|
*,
|
|
freq: str = "1s",
|
|
agg: Literal["mean", "median", "first", "last"] = "mean",
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Combine les lignes dont les timestamps tombent dans la même fenêtre temporelle.
|
|
|
|
Typiquement, avec `freq="1s"`, toutes les mesures comprises entre
|
|
HH:MM:SS.000 et HH:MM:SS.999 seront regroupées en une seule ligne
|
|
datée HH:MM:SS.
|
|
"""
|
|
if not isinstance(df.index, pd.DatetimeIndex):
|
|
raise TypeError(
|
|
"combine_close_observations nécessite un DataFrame avec un DatetimeIndex. "
|
|
"Utilisez d'abord load_raw_csv() ou imposez un index temporel."
|
|
)
|
|
|
|
freq = freq.lower() # évite le FutureWarning sur 'S'
|
|
|
|
if agg == "mean":
|
|
df_resampled = df.resample(freq).mean()
|
|
elif agg == "median":
|
|
df_resampled = df.resample(freq).median()
|
|
elif agg == "first":
|
|
df_resampled = df.resample(freq).first()
|
|
elif agg == "last":
|
|
df_resampled = df.resample(freq).last()
|
|
else:
|
|
raise ValueError(f"Fonction d'agrégation non supportée : {agg!r}")
|
|
|
|
df_resampled = df_resampled.dropna(how="all")
|
|
return df_resampled
|