const { InfluxDB } = require("@influxdata/influxdb-client"); const { DateTime } = require("luxon"); const { buildTimeWindow } = require("../time"); const { hasValue } = require("../constants"); function escapeValue(value) { return String(value).replace(/\\/g, "\\\\").replace(/"/g, '\\"'); } function buildFluxQuery(bucket, sensor, window) { const startIso = window.start.toUTC().toISO(); const stopIso = window.end.toUTC().toISO(); const filters = []; if (sensor.measurement) filters.push(`r._measurement == "${escapeValue(sensor.measurement)}"`); if (sensor.field) filters.push(`r._field == "${escapeValue(sensor.field)}"`); if (sensor.tags) { for (const [key, value] of Object.entries(sensor.tags)) { filters.push(`r.${key} == "${escapeValue(value)}"`); } } let query = `from(bucket: "${escapeValue(bucket)}")\n |> range(start: time(v: "${startIso}"), stop: time(v: "${stopIso}"))`; if (filters.length > 0) { query += `\n |> filter(fn: (r) => ${filters.join(" and ")})`; } query += "\n |> keep(columns: [\"_time\", \"_value\"])"; return query; } function pickNearestRow(rows, target) { let closest = null; let smallestDiff = Number.POSITIVE_INFINITY; for (const row of rows) { if (!hasValue(row._time)) continue; const rowTime = DateTime.fromISO(row._time); if (!rowTime.isValid) continue; const diff = Math.abs(rowTime.diff(target).as("milliseconds")); if (diff < smallestDiff) { smallestDiff = diff; closest = { time: rowTime, value: row._value }; } } return closest; } function normalizeValue(key, rawValue, sensor, precipitationThreshold) { if (!hasValue(rawValue)) return null; let value = typeof rawValue === "number" ? rawValue : Number(rawValue); if (Number.isNaN(value)) return null; if (sensor.multiplier && Number.isFinite(sensor.multiplier)) { value *= sensor.multiplier; } if (sensor.offset && Number.isFinite(sensor.offset)) { value += sensor.offset; } if (key === "wind_speed" && sensor.unit === "mps") { value *= 3.6; } if (key === "precipitations") { const threshold = Number.isFinite(sensor.threshold) ? sensor.threshold : precipitationThreshold; if (!Number.isFinite(threshold)) return null; return value > threshold; } return value; } function createInfluxProvider(config = {}, globalConfig = {}) { const { url, token, org, bucket, sensors } = config; if (!url || !token || !org || !bucket || !sensors || Object.keys(sensors).length === 0) { return null; } const queryApi = new InfluxDB({ url, token }).getQueryApi(org); const windowMinutes = config.windowMinutes ?? globalConfig.windowMinutes ?? 60; const precipitationThreshold = config.precipitationThreshold ?? globalConfig.precipitationThreshold ?? 0.1; async function fetch({ target }) { const window = buildTimeWindow(target, windowMinutes); const weather = {}; const contributed = new Set(); for (const [key, sensor] of Object.entries(sensors)) { const query = buildFluxQuery(bucket, sensor, window); try { const rows = await queryApi.collectRows(query); const nearest = pickNearestRow(rows, target); if (!nearest) continue; const value = normalizeValue(key, nearest.value, sensor, precipitationThreshold); if (!hasValue(value)) continue; weather[key] = value; contributed.add("influxdb"); } catch (error) { console.error(`InfluxDB error for ${key}: ${error.message}`); } } if (Object.keys(weather).length === 0) return null; weather.source = Array.from(contributed); return weather; } return { name: "influxdb", fetch, }; } module.exports = { createInfluxProvider, };