159 lines
4.4 KiB
Python
159 lines
4.4 KiB
Python
# meteo/export.py
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Iterable, Literal
|
|
|
|
import pandas as pd
|
|
from influxdb_client import InfluxDBClient
|
|
|
|
from .station_config import SensorDefinition, StationConfig
|
|
|
|
|
|
def _build_sensor_flux_query(
|
|
bucket: str,
|
|
sensor: SensorDefinition,
|
|
start: str,
|
|
stop: str | None,
|
|
) -> str:
|
|
"""
|
|
Construit une requête Flux pour un capteur donné (measurement + entity_id).
|
|
|
|
On ne récupère que le champ `value`, qui contient la valeur physique,
|
|
et on laisse Influx nous renvoyer tous les points bruts (chez vous : 1 point / minute).
|
|
"""
|
|
stop_clause = f", stop: {stop}" if stop is not None else ""
|
|
|
|
return f"""
|
|
from(bucket: "{bucket}")
|
|
|> range(start: {start}{stop_clause})
|
|
|> filter(fn: (r) => r._measurement == "{sensor.measurement}")
|
|
|> filter(fn: (r) => r["entity_id"] == "{sensor.entity_id}")
|
|
|> filter(fn: (r) => r._field == "value")
|
|
|> keep(columns: ["_time", "_value"])
|
|
|> sort(columns: ["_time"])
|
|
"""
|
|
|
|
|
|
def fetch_sensor_series(
|
|
client: InfluxDBClient,
|
|
bucket: str,
|
|
sensor: SensorDefinition,
|
|
*,
|
|
start: str,
|
|
stop: str | None = None,
|
|
) -> pd.Series:
|
|
"""
|
|
Récupère la série temporelle d'un capteur unique sous forme de Series pandas.
|
|
|
|
Paramètres
|
|
----------
|
|
client :
|
|
Client InfluxDB déjà configuré.
|
|
bucket :
|
|
Nom du bucket.
|
|
sensor :
|
|
Définition du capteur (measurement + entity_id).
|
|
start, stop :
|
|
Intervalle temporel au format Flux (ex: "-7d", "2025-01-01T00:00:00Z").
|
|
|
|
Retour
|
|
------
|
|
Series pandas indexée par le temps, nommée `sensor.name`.
|
|
"""
|
|
query_api = client.query_api()
|
|
flux_query = _build_sensor_flux_query(bucket, sensor, start=start, stop=stop)
|
|
result = query_api.query_data_frame(flux_query)
|
|
|
|
if isinstance(result, list):
|
|
df = pd.concat(result, ignore_index=True)
|
|
else:
|
|
df = result
|
|
|
|
if df.empty:
|
|
# Série vide : on renvoie une Series vide avec le bon nom.
|
|
return pd.Series(name=sensor.name, dtype="float64")
|
|
|
|
df = df.rename(columns={"_time": "time", "_value": sensor.name})
|
|
df["time"] = pd.to_datetime(df["time"])
|
|
df = df.set_index("time").sort_index()
|
|
|
|
# On ne retourne qu'une seule colonne en tant que Series
|
|
series = df[sensor.name]
|
|
series.name = sensor.name
|
|
return series
|
|
|
|
|
|
def export_station_data(
|
|
client: InfluxDBClient,
|
|
bucket: str,
|
|
config: StationConfig,
|
|
*,
|
|
start: str,
|
|
stop: str | None = None,
|
|
output_path: str | Path = "data/weather_raw.csv",
|
|
file_format: Literal["csv", "parquet"] = "csv",
|
|
) -> Path:
|
|
"""
|
|
Exporte les données de la station météo vers un fichier (CSV ou Parquet).
|
|
|
|
Pour chaque capteur de `config.sensors`, cette fonction récupère la série
|
|
temporelle correspondante, puis assemble le tout dans un DataFrame unique.
|
|
|
|
Paramètres
|
|
----------
|
|
client :
|
|
Client InfluxDB.
|
|
bucket :
|
|
Nom du bucket à interroger (ex: "weather").
|
|
config :
|
|
Configuration de la station (liste de capteurs).
|
|
start, stop :
|
|
Intervalle temporel en syntaxe Flux (ex: "-30d", "2024-01-01T00:00:00Z").
|
|
Si `stop` est None, Influx utilisera `now()`.
|
|
output_path :
|
|
Chemin du fichier de sortie.
|
|
file_format :
|
|
"csv" ou "parquet".
|
|
|
|
Retour
|
|
------
|
|
Path :
|
|
Chemin absolu du fichier écrit.
|
|
"""
|
|
output_path = Path(output_path)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
series_list: list[pd.Series] = []
|
|
|
|
for sensor in config.sensors:
|
|
series = fetch_sensor_series(
|
|
client,
|
|
bucket,
|
|
sensor,
|
|
start=start,
|
|
stop=stop,
|
|
)
|
|
if not series.empty:
|
|
series_list.append(series)
|
|
else:
|
|
# On pourrait logger un warning ici si vous le souhaitez.
|
|
pass
|
|
|
|
if not series_list:
|
|
raise RuntimeError(
|
|
"Aucune donnée récupérée pour la station sur l'intervalle demandé."
|
|
)
|
|
|
|
# Assemblage des séries : jointure externe sur l'index temps.
|
|
df = pd.concat(series_list, axis=1).sort_index()
|
|
|
|
if file_format == "csv":
|
|
df.to_csv(output_path, index_label="time")
|
|
elif file_format == "parquet":
|
|
df.to_parquet(output_path)
|
|
else:
|
|
raise ValueError(f"Format de fichier non supporté : {file_format!r}")
|
|
|
|
return output_path.resolve()
|