smart-energy-monitor/software/flow/node_modules/node-red-contrib-influxdb/influxdb.js

509 lines
18 KiB
JavaScript
Executable file

var _ = require('lodash');
module.exports = function (RED) {
"use strict";
var Influx = require('influx');
var { InfluxDB, Point } = require('@influxdata/influxdb-client');
const VERSION_1X = '1.x';
const VERSION_18_FLUX = '1.8-flux';
const VERSION_20 = '2.0';
/**
* Config node. Currently we only connect to one host.
*/
function InfluxConfigNode(n) {
RED.nodes.createNode(this, n);
this.hostname = n.hostname;
this.port = n.port;
this.database = n.database;
this.name = n.name;
var clientOptions = null;
if (!n.influxdbVersion) {
n.influxdbVersion = VERSION_1X;
}
if (n.influxdbVersion === VERSION_1X) {
this.usetls = n.usetls;
if (typeof this.usetls === 'undefined') {
this.usetls = false;
}
// for backward compatibility with old 'protocol' setting
if (n.protocol === 'https') {
this.usetls = true;
}
if (this.usetls && n.tls) {
var tlsNode = RED.nodes.getNode(n.tls);
if (tlsNode) {
this.hostOptions = {};
tlsNode.addTLSOptions(this.hostOptions);
}
}
this.client = new Influx.InfluxDB({
hosts: [{
host: this.hostname,
port: this.port,
protocol: this.usetls ? "https" : "http",
options: this.hostOptions
}],
database: this.database,
username: this.credentials.username,
password: this.credentials.password
});
} else if (n.influxdbVersion === VERSION_18_FLUX || n.influxdbVersion === VERSION_20) {
const token = n.influxdbVersion === VERSION_18_FLUX ?
`${this.credentials.username}:${this.credentials.password}` :
this.credentials.token;
clientOptions = {
url: n.url,
rejectUnauthorized: n.rejectUnauthorized,
token
}
this.client = new InfluxDB(clientOptions);
}
this.influxdbVersion = n.influxdbVersion;
}
RED.nodes.registerType("influxdb", InfluxConfigNode, {
credentials: {
username: { type: "text" },
password: { type: "password" },
token: { type: "password" }
}
});
function isIntegerString(value) {
return /^-?\d+i$/.test(value);
}
function setFieldIntegers(fields) {
for (const prop in fields) {
const value = fields[prop];
if (isIntegerString(value)) {
fields[prop] = parseInt(value.substring(0,value.length-1));
}
}
}
function addFieldToPoint(point, name, value) {
if (name === 'time') {
point.timestamp(value);
} else if (typeof value === 'number') {
point.floatField(name, value);
} else if (typeof value === 'string') {
// string values with numbers ending with 'i' are considered integers
if (isIntegerString(value)) {
value = parseInt(value.substring(0,value.length-1));
point.intField(name, value);
} else {
point.stringField(name, value);
}
} else if (typeof value === 'boolean') {
point.booleanField(name, value);
}
}
function addFieldsToPoint(point, fields) {
for (const prop in fields) {
const value = fields[prop];
addFieldToPoint(point, prop, value);
}
}
// write using influx-client-js
function writePoints(msg, node, done) {
var measurement = msg.hasOwnProperty('measurement') ? msg.measurement : node.measurement;
if (!measurement) {
return done(RED._("influxdb.errors.nomeasurement"));
}
try {
if (_.isArray(msg.payload) && msg.payload.length > 0) {
// array of arrays: multiple points with fields and tags
if (_.isArray(msg.payload[0]) && msg.payload[0].length > 0) {
msg.payload.forEach(element => {
let point = new Point(measurement);
let fields = element[0];
addFieldsToPoint(point, fields);
let tags = element[1];
for (const prop in tags) {
point.tag(prop, tags[prop]);
}
node.client.writePoint(point);
});
} else {
// array of non-arrays: one point with both fields and tags
let point = new Point(measurement);
let fields = msg.payload[0];
addFieldsToPoint(point, fields);
const tags = msg.payload[1];
for (const prop in tags) {
point.tag(prop, tags[prop]);
}
node.client.writePoint(point)
}
} else {
// single object: fields only
if (_.isPlainObject(msg.payload)) {
let point = new Point(measurement);
let fields = msg.payload;
addFieldsToPoint(point, fields);
node.client.writePoint(point);
} else {
// just a value
let point = new Point(measurement);
let value = msg.payload;
addFieldToPoint(point, 'value', value);
node.client.writePoint(point);
}
}
node.client.flush(true).then(() => {
done();
}).catch(error => {
msg.influx_error = {
errorMessage: error
};
done(error);
});
} catch (error) {
msg.influx_error = {
errorMessage: error
};
done(error);
}
}
/**
* Output node to write to a single influxdb measurement
*/
function InfluxOutNode(n) {
RED.nodes.createNode(this, n);
this.measurement = n.measurement;
this.influxdb = n.influxdb;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
this.precision = n.precision;
this.retentionPolicy = n.retentionPolicy;
// 1.8 and 2.0 only
this.database = n.database;
this.precisionV18FluxV20 = n.precisionV18FluxV20;
this.retentionPolicyV18Flux = n.retentionPolicyV18Flux;
this.org = n.org;
this.bucket = n.bucket;
if (!this.influxdbConfig) {
this.error(RED._("influxdb.errors.missingconfig"));
return;
}
let version = this.influxdbConfig.influxdbVersion;
var node = this;
if (version === VERSION_1X) {
var client = this.influxdbConfig.client;
node.on("input", function (msg, send, done) {
var measurement;
var writeOptions = {};
var measurement = msg.hasOwnProperty('measurement') ? msg.measurement : node.measurement;
if (!measurement) {
return done(RED._("influxdb.errors.nomeasurement"));
}
var precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision;
var retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy;
if (precision) {
writeOptions.precision = precision;
}
if (retentionPolicy) {
writeOptions.retentionPolicy = retentionPolicy;
}
// format payload to match new writePoints API
var points = [];
var point;
if (_.isArray(msg.payload) && msg.payload.length > 0) {
// array of arrays
if (_.isArray(msg.payload[0]) && msg.payload[0].length > 0) {
msg.payload.forEach(function (nodeRedPoint) {
let fields = _.clone(nodeRedPoint[0])
point = {
measurement: measurement,
fields,
tags: nodeRedPoint[1]
}
setFieldIntegers(point.fields)
if (point.fields.time) {
point.timestamp = point.fields.time;
delete point.fields.time;
}
points.push(point);
});
} else {
// array of non-arrays, assume one point with both fields and tags
let fields = _.clone(msg.payload[0])
point = {
measurement: measurement,
fields,
tags: msg.payload[1]
};
setFieldIntegers(point.fields)
if (point.fields.time) {
point.timestamp = point.fields.time;
delete point.fields.time;
}
points.push(point);
}
} else {
// fields only
if (_.isPlainObject(msg.payload)) {
let fields = _.clone(msg.payload)
point = {
measurement: measurement,
fields,
};
setFieldIntegers(point.fields)
if (point.fields.time) {
point.timestamp = point.fields.time;
delete point.fields.time;
}
} else {
// just a value
point = {
measurement: measurement,
fields: { value: msg.payload }
};
setFieldIntegers(point.fields)
}
points.push(point);
}
client.writePoints(points, writeOptions).then(() => {
done();
}).catch(function (err) {
msg.influx_error = {
statusCode: err.res ? err.res.statusCode : 503
}
done(err);
});
});
} else if (version === VERSION_18_FLUX || version === VERSION_20) {
let bucket = this.bucket;
if (version === VERSION_18_FLUX) {
let retentionPolicy = this.retentionPolicyV18Flux ? this.retentionPolicyV18Flux : 'autogen';
bucket = `${this.database}/${retentionPolicy}`;
}
let org = version === VERSION_18_FLUX ? '' : this.org;
this.client = this.influxdbConfig.client.getWriteApi(org, bucket, this.precisionV18FluxV20);
node.on("input", function (msg, send, done) {
writePoints(msg, node, done);
});
}
}
RED.nodes.registerType("influxdb out", InfluxOutNode);
/**
* Output node to write to multiple InfluxDb measurements
*/
function InfluxBatchNode(n) {
RED.nodes.createNode(this, n);
this.influxdb = n.influxdb;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
this.precision = n.precision;
this.retentionPolicy = n.retentionPolicy;
// 1.8 and 2.0
this.database = n.database;
this.precisionV18FluxV20 = n.precisionV18FluxV20;
this.retentionPolicyV18Flux = n.retentionPolicyV18Flux;
this.org = n.org;
this.bucket = n.bucket;
if (!this.influxdbConfig) {
this.error(RED._("influxdb.errors.missingconfig"));
return;
}
let version = this.influxdbConfig.influxdbVersion;
var node = this;
if (version === VERSION_1X) {
var client = this.influxdbConfig.client;
node.on("input", function (msg, send, done) {
var writeOptions = {};
var precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision;
var retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy;
if (precision) {
writeOptions.precision = precision;
}
if (retentionPolicy) {
writeOptions.retentionPolicy = retentionPolicy;
}
client.writePoints(msg.payload, writeOptions).then(() => {
done();
}).catch(function (err) {
msg.influx_error = {
statusCode: err.res ? err.res.statusCode : 503
}
done(err);
});
});
} else if (version === VERSION_18_FLUX || version === VERSION_20) {
let bucket = node.bucket;
if (version === VERSION_18_FLUX) {
let retentionPolicy = this.retentionPolicyV18Flux ? this.retentionPolicyV18Flux : 'autogen';
bucket = `${this.database}/${retentionPolicy}`;
}
let org = version === VERSION_18_FLUX ? '' : this.org;
var client = this.influxdbConfig.client.getWriteApi(org, bucket, this.precisionV18FluxV20);
node.on("input", function (msg, send, done) {
msg.payload.forEach(element => {
let point = new Point(element.measurement);
// time is reserved as a field name still! will be overridden by the timestamp below.
addFieldsToPoint(point, element.fields);
let tags = element.tags;
if (tags) {
for (const prop in tags) {
point.tag(prop, tags[prop]);
}
}
if (element.timestamp) {
point.timestamp(element.timestamp);
}
client.writePoint(point);
});
// ensure we write everything including scheduled retries
client.flush(true).then(() => {
done();
}).catch(error => {
msg.influx_error = {
errorMessage: error
};
done(error);
});
});
}
}
RED.nodes.registerType("influxdb batch", InfluxBatchNode);
/**
* Input node to make queries to influxdb
*/
function InfluxInNode(n) {
RED.nodes.createNode(this, n);
this.influxdb = n.influxdb;
this.query = n.query;
this.precision = n.precision;
this.retentionPolicy = n.retentionPolicy;
this.rawOutput = n.rawOutput;
this.influxdbConfig = RED.nodes.getNode(this.influxdb);
this.org = n.org;
if (!this.influxdbConfig) {
this.error(RED._("influxdb.errors.missingconfig"));
return;
}
let version = this.influxdbConfig.influxdbVersion
if (version === VERSION_1X) {
var node = this;
var client = this.influxdbConfig.client;
node.on("input", function (msg, send, done) {
var query;
var rawOutput;
var queryOptions = {};
var precision;
var retentionPolicy;
query = msg.hasOwnProperty('query') ? msg.query : node.query;
if (!query) {
return done(RED._("influxdb.errors.noquery"));
}
rawOutput = msg.hasOwnProperty('rawOutput') ? msg.rawOutput : node.rawOutput;
precision = msg.hasOwnProperty('precision') ? msg.precision : node.precision;
retentionPolicy = msg.hasOwnProperty('retentionPolicy') ? msg.retentionPolicy : node.retentionPolicy;
if (precision) {
queryOptions.precision = precision;
}
if (retentionPolicy) {
queryOptions.retentionPolicy = retentionPolicy;
}
if (rawOutput) {
var queryPromise = client.queryRaw(query, queryOptions);
} else {
var queryPromise = client.query(query, queryOptions);
}
queryPromise.then(function (results) {
msg.payload = results;
send(msg);
done();
}).catch(function (err) {
msg.influx_error = {
statusCode: err.res ? err.res.statusCode : 503
}
done(err);
});
});
} else if (version === VERSION_18_FLUX || version === VERSION_20) {
let org = version === VERSION_20 ? this.org : ''
this.client = this.influxdbConfig.client.getQueryApi(org);
var node = this;
node.on("input", function (msg, send, done) {
var query = msg.hasOwnProperty('query') ? msg.query : node.query;
if (!query) {
return done(RED._("influxdb.errors.noquery"));
}
var output = [];
node.client.queryRows(query, {
next(row, tableMeta) {
var o = tableMeta.toObject(row)
output.push(o);
},
error(error) {
msg.influx_error = {
errorMessage: error
};
done(error);
},
complete() {
msg.payload = output;
send(msg);
done();
},
});
});
}
}
RED.nodes.registerType("influxdb in", InfluxInNode);
}