1
2025-11-19 17:01:45 +01:00

55 lines
1.5 KiB
Python

# tests/test_influx_schema.py
from __future__ import annotations
from pathlib import Path
import sys
from contextlib import closing
PROJECT_ROOT = Path(__file__).resolve().parents[3]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from meteo.config import InfluxSettings
from meteo.influx_client import create_influx_client
from meteo.schema import list_measurements, list_measurement_fields
def main() -> None:
"""
Explore le schéma du bucket InfluxDB configuré :
- liste des measurements disponibles
- pour chacun, liste des champs (_field) et de leur type
"""
settings = InfluxSettings.from_env()
print(f"Bucket InfluxDB : {settings.bucket}")
print()
with closing(create_influx_client(settings)) as client:
measurements = list_measurements(client, settings.bucket)
if not measurements:
print("⚠ Aucun measurement trouvé dans ce bucket.")
return
print("Measurements disponibles :")
for name in measurements:
print(f" - {name}")
print()
for name in measurements:
print(f"Champs pour measurement « {name} » :")
fields = list_measurement_fields(client, settings.bucket, name)
if not fields:
print(" (aucun champ trouvé)")
else:
for field in fields:
print(f" - {field.name} (type: {field.type})")
print()
if __name__ == "__main__":
main()