# meteo/influx_client.py from __future__ import annotations from typing import Any from influxdb_client import InfluxDBClient from .config import InfluxSettings def create_influx_client(settings: InfluxSettings) -> InfluxDBClient: """ Crée et retourne un client InfluxDB configuré. Le client doit être fermé par l'appelant lorsqu'il n'est plus nécessaire. """ client = InfluxDBClient( url=settings.url, token=settings.token, org=settings.org, ) return client def test_basic_query(client: InfluxDBClient, bucket: str) -> list[Any]: """ Exécute une requête Flux très simple sur le bucket donné pour vérifier que la communication fonctionne et que le bucket est accessible. Retourne la liste brute de tables renvoyées par InfluxDB. Lève une exception en cas de problème réseau, d'authentification, etc. """ query_api = client.query_api() flux_query = f""" from(bucket: "{bucket}") |> range(start: -1h) |> limit(n: 5) """ tables = query_api.query(flux_query) return tables