1
donnees_meteo/meteo/influx_client.py
2025-11-17 02:00:28 +01:00

42 lines
1.1 KiB
Python

# meteo/influx_client.py
from __future__ import annotations
from typing import Any
from influxdb_client import InfluxDBClient
from .config import InfluxSettings
def create_influx_client(settings: InfluxSettings) -> InfluxDBClient:
"""
Crée et retourne un client InfluxDB configuré.
Le client doit être fermé par l'appelant lorsqu'il n'est plus nécessaire.
"""
client = InfluxDBClient(
url=settings.url,
token=settings.token,
org=settings.org,
)
return client
def test_basic_query(client: InfluxDBClient, bucket: str) -> list[Any]:
"""
Exécute une requête Flux très simple sur le bucket donné pour vérifier
que la communication fonctionne et que le bucket est accessible.
Retourne la liste brute de tables renvoyées par InfluxDB.
Lève une exception en cas de problème réseau, d'authentification, etc.
"""
query_api = client.query_api()
flux_query = f"""
from(bucket: "{bucket}")
|> range(start: -1h)
|> limit(n: 5)
"""
tables = query_api.query(flux_query)
return tables