158 lines
4.8 KiB
JavaScript
158 lines
4.8 KiB
JavaScript
const { InfluxDB } = require("@influxdata/influxdb-client");
|
|
const { DateTime } = require("luxon");
|
|
const { buildTimeWindow } = require("../time");
|
|
const { hasValue } = require("../constants");
|
|
|
|
function escapeValue(value) {
|
|
return String(value).replace(/\\/g, "\\\\").replace(/"/g, '\\"');
|
|
}
|
|
|
|
function buildFluxQuery(bucket, sensor, window) {
|
|
const startIso = window.start.toUTC().toISO();
|
|
const stopIso = window.end.toUTC().toISO();
|
|
const filters = [];
|
|
|
|
if (sensor.measurement) filters.push(`r._measurement == "${escapeValue(sensor.measurement)}"`);
|
|
if (sensor.field) filters.push(`r._field == "${escapeValue(sensor.field)}"`);
|
|
|
|
if (sensor.tags) {
|
|
for (const [key, value] of Object.entries(sensor.tags)) {
|
|
filters.push(`r.${key} == "${escapeValue(value)}"`);
|
|
}
|
|
}
|
|
|
|
let query = `from(bucket: "${escapeValue(bucket)}")\n |> range(start: time(v: "${startIso}"), stop: time(v: "${stopIso}"))`;
|
|
|
|
if (filters.length > 0) {
|
|
query += `\n |> filter(fn: (r) => ${filters.join(" and ")})`;
|
|
}
|
|
|
|
query += "\n |> keep(columns: [\"_time\", \"_value\"])";
|
|
|
|
return query;
|
|
}
|
|
|
|
function pickNearestRow(rows, target) {
|
|
let closest = null;
|
|
let smallestDiff = Number.POSITIVE_INFINITY;
|
|
|
|
for (const row of rows) {
|
|
if (!hasValue(row._time)) continue;
|
|
|
|
const rowTime = DateTime.fromISO(row._time);
|
|
if (!rowTime.isValid) continue;
|
|
|
|
const diff = Math.abs(rowTime.diff(target).as("milliseconds"));
|
|
|
|
if (diff < smallestDiff) {
|
|
smallestDiff = diff;
|
|
closest = { time: rowTime, value: row._value };
|
|
}
|
|
}
|
|
|
|
return closest;
|
|
}
|
|
|
|
function normalizeValue(key, rawValue, sensor, precipitationThreshold) {
|
|
if (!hasValue(rawValue)) return null;
|
|
|
|
let value = typeof rawValue === "number" ? rawValue : Number(rawValue);
|
|
|
|
if (Number.isNaN(value)) return null;
|
|
|
|
if (sensor.multiplier && Number.isFinite(sensor.multiplier)) {
|
|
value *= sensor.multiplier;
|
|
}
|
|
|
|
if (sensor.offset && Number.isFinite(sensor.offset)) {
|
|
value += sensor.offset;
|
|
}
|
|
|
|
if (key === "wind_speed" && sensor.unit === "mps") {
|
|
value *= 3.6;
|
|
}
|
|
|
|
if (key === "precipitations") {
|
|
const threshold = Number.isFinite(sensor.threshold) ? sensor.threshold : precipitationThreshold;
|
|
|
|
if (!Number.isFinite(threshold)) return null;
|
|
|
|
return value > threshold;
|
|
}
|
|
|
|
return value;
|
|
}
|
|
|
|
function createInfluxProvider(config = {}, globalConfig = {}) {
|
|
const { url, token, org, bucket, sensors } = config;
|
|
|
|
if (!url || !token || !org || !bucket || !sensors || Object.keys(sensors).length === 0) {
|
|
return null;
|
|
}
|
|
|
|
const timezone = config.timezone || globalConfig.timezone || "UTC";
|
|
const queryApi = new InfluxDB({ url, token }).getQueryApi(org);
|
|
const windowMinutes = config.windowMinutes ?? globalConfig.windowMinutes ?? 60;
|
|
const precipitationThreshold = config.precipitationThreshold ?? globalConfig.precipitationThreshold ?? 0.1;
|
|
|
|
async function fetch({ target }) {
|
|
const targetInZone = DateTime.isDateTime(target) ? target.setZone(timezone) : DateTime.fromJSDate(target, { zone: timezone });
|
|
const window = buildTimeWindow(targetInZone, windowMinutes);
|
|
const weather = {};
|
|
const contributed = new Set();
|
|
let hasMeasurement = false;
|
|
|
|
for (const [key, sensor] of Object.entries(sensors)) {
|
|
const query = buildFluxQuery(bucket, sensor, window);
|
|
|
|
try {
|
|
const rows = await queryApi.collectRows(query);
|
|
const nearest = pickNearestRow(rows, target);
|
|
|
|
if (!nearest) continue;
|
|
|
|
const value = normalizeValue(key, nearest.value, sensor, precipitationThreshold);
|
|
|
|
if (!hasValue(value)) continue;
|
|
|
|
weather[key] = value;
|
|
contributed.add("influxdb");
|
|
hasMeasurement = true;
|
|
} catch (error) {
|
|
console.error(`InfluxDB error for ${key}: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
if (!hasMeasurement) return null;
|
|
|
|
if (!hasValue(weather.illuminance) && sensors.illuminance) {
|
|
const hour = targetInZone.hour;
|
|
const isNight = hour < 6 || hour >= 18;
|
|
if (isNight) {
|
|
weather.illuminance = 0;
|
|
contributed.add("influxdb");
|
|
}
|
|
}
|
|
|
|
if (!hasValue(weather.precipitations) && sensors.precipitations) {
|
|
weather.precipitations = false;
|
|
contributed.add("influxdb");
|
|
}
|
|
|
|
if (Object.keys(weather).length === 0) return null;
|
|
|
|
weather.source = Array.from(contributed);
|
|
|
|
return weather;
|
|
}
|
|
|
|
return {
|
|
name: "influxdb",
|
|
fetch,
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
createInfluxProvider,
|
|
};
|