# tests/test_influx_connection.py from __future__ import annotations from contextlib import closing from influxdb_client.client.exceptions import InfluxDBError from meteo.config import InfluxSettings from meteo.influx_client import create_influx_client, test_basic_query def main() -> None: """ Teste la communication avec le serveur InfluxDB : 1. Chargement de la configuration depuis l'environnement. 2. Ping du serveur. 3. Exécution d'une requête simple sur le bucket configuré. """ settings = InfluxSettings.from_env() print("Configuration InfluxDB chargée :") print(f" URL : {settings.url}") print(f" Org : {settings.org}") print(f" Bucket : {settings.bucket}") print() # Utilisation de `closing` pour garantir la fermeture du client. with closing(create_influx_client(settings)) as client: print("→ Ping du serveur InfluxDB…") if not client.ping(): raise SystemExit("Échec du ping InfluxDB. Vérifiez l'URL et l'état du serveur.") print("✔ Ping OK") print("→ Requête de test sur le bucket…") tables = test_basic_query(client, settings.bucket) # On fait un retour synthétique nb_tables = len(tables) nb_records = sum(len(table.records) for table in tables) print(f"✔ Requête de test réussie : {nb_tables} table(s), {nb_records} enregistrement(s) trouvés.") if nb_records == 0: print("⚠ Le bucket est accessible, mais aucune donnée sur la dernière heure.") else: # Affiche un aperçu de la première table / premier record first_table = tables[0] first_record = first_table.records[0] print("Exemple de point :") print(f" time : {first_record.get_time()}") print(f" measurement : {first_record.get_measurement()}") print(f" field : {first_record.get_field()}") print(f" value : {first_record.get_value()}") if __name__ == "__main__": main()