forked from curious.bio/iot-backend
316 lines
11 KiB
JavaScript
316 lines
11 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.Pool = exports.RequestError = exports.ServiceNotAvailableError = void 0;
|
|
const exponential_1 = require("./backoff/exponential");
|
|
const host_1 = require("./host");
|
|
const http = require("http");
|
|
const https = require("https");
|
|
const querystring = require("querystring");
|
|
/**
|
|
* Status codes that will cause a host to be marked as 'failed' if we get
|
|
* them from a request to Influx.
|
|
* @type {Array}
|
|
*/
|
|
const resubmitErrorCodes = [
|
|
"ETIMEDOUT",
|
|
"ESOCKETTIMEDOUT",
|
|
"ECONNRESET",
|
|
"ECONNREFUSED",
|
|
"EHOSTUNREACH",
|
|
];
|
|
/**
|
|
* An ServiceNotAvailableError is returned as an error from requests that
|
|
* result in a > 500 error code.
|
|
*/
|
|
class ServiceNotAvailableError extends Error {
|
|
constructor(message) {
|
|
super();
|
|
this.message = message;
|
|
Object.setPrototypeOf(this, ServiceNotAvailableError.prototype);
|
|
}
|
|
}
|
|
exports.ServiceNotAvailableError = ServiceNotAvailableError;
|
|
/**
|
|
* An RequestError is returned as an error from requests that
|
|
* result in a 300 <= error code <= 500.
|
|
*/
|
|
class RequestError extends Error {
|
|
constructor(req, res, body) {
|
|
super();
|
|
this.req = req;
|
|
this.res = res;
|
|
this.message = `A ${res.statusCode} ${res.statusMessage} error occurred: ${body}`;
|
|
Object.setPrototypeOf(this, RequestError.prototype);
|
|
}
|
|
static Create(req, res, callback) {
|
|
let body = "";
|
|
res.on("data", (str) => {
|
|
body += str.toString();
|
|
});
|
|
res.on("end", () => callback(new RequestError(req, res, body)));
|
|
}
|
|
}
|
|
exports.RequestError = RequestError;
|
|
/**
|
|
* Creates a function generation that returns a wrapper which only allows
|
|
* through the first call of any function that it generated.
|
|
*/
|
|
function doOnce() {
|
|
let handled = false;
|
|
return (fn) => {
|
|
return (arg) => {
|
|
if (handled) {
|
|
return;
|
|
}
|
|
handled = true;
|
|
fn(arg);
|
|
};
|
|
};
|
|
}
|
|
function setToArray(itemSet) {
|
|
const output = [];
|
|
itemSet.forEach((value) => {
|
|
output.push(value);
|
|
});
|
|
return output;
|
|
}
|
|
const request = (options, callback) => {
|
|
if (options.protocol === "https:") {
|
|
return https.request(options, callback);
|
|
}
|
|
return http.request(options, callback);
|
|
};
|
|
/**
|
|
*
|
|
* The Pool maintains a list available Influx hosts and dispatches requests
|
|
* to them. If there are errors connecting to hosts, it will disable that
|
|
* host for a period of time.
|
|
*/
|
|
class Pool {
|
|
/**
|
|
* Creates a new Pool instance.
|
|
* @param {IPoolOptions} options
|
|
*/
|
|
constructor(options) {
|
|
this._options = Object.assign({ backoff: new exponential_1.ExponentialBackoff({
|
|
initial: 300,
|
|
max: 10 * 1000,
|
|
random: 1,
|
|
}), maxRetries: 2, requestTimeout: 30 * 1000 }, options);
|
|
this._index = 0;
|
|
this._hostsAvailable = new Set();
|
|
this._hostsDisabled = new Set();
|
|
this._timeout = this._options.requestTimeout;
|
|
}
|
|
/**
|
|
* Returns a list of currently active hosts.
|
|
* @return {Host[]}
|
|
*/
|
|
getHostsAvailable() {
|
|
return setToArray(this._hostsAvailable);
|
|
}
|
|
/**
|
|
* Returns a list of hosts that are currently disabled due to network
|
|
* errors.
|
|
* @return {Host[]}
|
|
*/
|
|
getHostsDisabled() {
|
|
return setToArray(this._hostsDisabled);
|
|
}
|
|
/**
|
|
* Inserts a new host to the pool.
|
|
*/
|
|
addHost(url, options = {}) {
|
|
const host = new host_1.Host(url, this._options.backoff.reset(), options);
|
|
this._hostsAvailable.add(host);
|
|
return host;
|
|
}
|
|
/**
|
|
* Returns true if there's any host available to by queried.
|
|
* @return {Boolean}
|
|
*/
|
|
hostIsAvailable() {
|
|
return this._hostsAvailable.size > 0;
|
|
}
|
|
/**
|
|
* Makes a request and calls back with the response, parsed as JSON.
|
|
* An error is returned on a non-2xx status code or on a parsing exception.
|
|
*/
|
|
json(options) {
|
|
return this.text(options).then((res) => JSON.parse(res));
|
|
}
|
|
/**
|
|
* Makes a request and resolves with the plain text response,
|
|
* if possible. An error is raised on a non-2xx status code.
|
|
*/
|
|
text(options) {
|
|
return new Promise((resolve, reject) => {
|
|
this.stream(options, (err, res) => {
|
|
if (err) {
|
|
return reject(err);
|
|
}
|
|
let output = "";
|
|
res.on("data", (str) => {
|
|
output += str.toString();
|
|
});
|
|
res.on("end", () => resolve(output));
|
|
});
|
|
});
|
|
}
|
|
/**
|
|
* Makes a request and discards any response body it receives.
|
|
* An error is returned on a non-2xx status code.
|
|
*/
|
|
discard(options) {
|
|
return new Promise((resolve, reject) => {
|
|
this.stream(options, (err, res) => {
|
|
if (err) {
|
|
return reject(err);
|
|
}
|
|
res.on("data", () => {
|
|
/* ignore */
|
|
});
|
|
res.on("end", () => resolve());
|
|
});
|
|
});
|
|
}
|
|
/**
|
|
* Ping sends out a request to all available Influx servers, reporting on
|
|
* their response time and version number.
|
|
*/
|
|
ping(timeout, path = "/ping") {
|
|
const todo = [];
|
|
setToArray(this._hostsAvailable)
|
|
.concat(setToArray(this._hostsDisabled))
|
|
.forEach((host) => {
|
|
const start = Date.now();
|
|
const url = host.url;
|
|
const once = doOnce();
|
|
return todo.push(new Promise((resolve) => {
|
|
const req = request(Object.assign({ hostname: url.hostname, method: "GET", path, port: Number(url.port), protocol: url.protocol, timeout }, host.options), once((res) => {
|
|
resolve({
|
|
url,
|
|
res: res.resume(),
|
|
online: res.statusCode < 300,
|
|
rtt: Date.now() - start,
|
|
version: res.headers["x-influxdb-version"],
|
|
});
|
|
}));
|
|
const fail = once(() => {
|
|
req.abort();
|
|
resolve({
|
|
online: false,
|
|
res: null,
|
|
rtt: Infinity,
|
|
url,
|
|
version: null,
|
|
});
|
|
});
|
|
// Support older Nodes and polyfills which don't allow .timeout() in
|
|
// the request options, wrapped in a conditional for even worse
|
|
// polyfills. See: https://github.com/node-influx/node-influx/issues/221
|
|
if (typeof req.setTimeout === "function") {
|
|
req.setTimeout(timeout, () => {
|
|
fail.call(fail, arguments);
|
|
}); // Tslint:disable-line
|
|
}
|
|
req.on("timeout", fail);
|
|
req.on("error", fail);
|
|
req.end();
|
|
}));
|
|
});
|
|
return Promise.all(todo);
|
|
}
|
|
/**
|
|
* Makes a request and calls back with the IncomingMessage stream,
|
|
* if possible. An error is returned on a non-2xx status code.
|
|
*/
|
|
stream(options, callback) {
|
|
if (!this.hostIsAvailable()) {
|
|
return callback(new ServiceNotAvailableError("No host available"), null);
|
|
}
|
|
const once = doOnce();
|
|
const host = this._getHost();
|
|
let path = host.url.pathname === "/" ? "" : host.url.pathname;
|
|
path += options.path;
|
|
if (options.query) {
|
|
path += "?" + querystring.stringify(options.query);
|
|
}
|
|
const req = request(Object.assign({ headers: {
|
|
"content-length": options.body ? Buffer.from(options.body).length : 0,
|
|
}, hostname: host.url.hostname, method: options.method, path, port: Number(host.url.port), protocol: host.url.protocol, timeout: this._timeout }, host.options), once((res) => {
|
|
res.setEncoding("utf8");
|
|
if (res.statusCode >= 500) {
|
|
return this._handleRequestError(new ServiceNotAvailableError(res.statusMessage), host, options, callback);
|
|
}
|
|
if (res.statusCode >= 300) {
|
|
return RequestError.Create(req, res, (err) => callback(err, res));
|
|
}
|
|
host.success();
|
|
return callback(undefined, res);
|
|
}));
|
|
// Handle network or HTTP parsing errors:
|
|
req.on("error", once((err) => {
|
|
this._handleRequestError(err, host, options, callback);
|
|
}));
|
|
// Handle timeouts:
|
|
req.on("timeout", once(() => {
|
|
req.abort();
|
|
this._handleRequestError(new ServiceNotAvailableError("Request timed out"), host, options, callback);
|
|
}));
|
|
// Support older Nodes and polyfills which don't allow .timeout() in the
|
|
// request options, wrapped in a conditional for even worse polyfills. See:
|
|
// https://github.com/node-influx/node-influx/issues/221
|
|
if (typeof req.setTimeout === "function") {
|
|
req.setTimeout(host.options.timeout || this._timeout); // Tslint:disable-line
|
|
}
|
|
// Write out the body:
|
|
if (options.body) {
|
|
req.write(options.body);
|
|
}
|
|
req.end();
|
|
}
|
|
/**
|
|
* Returns the next available host for querying.
|
|
* @return {Host}
|
|
*/
|
|
_getHost() {
|
|
const available = setToArray(this._hostsAvailable);
|
|
const host = available[this._index];
|
|
this._index = (this._index + 1) % available.length;
|
|
return host;
|
|
}
|
|
/**
|
|
* Re-enables the provided host, returning it to the pool to query.
|
|
* @param {Host} host
|
|
*/
|
|
_enableHost(host) {
|
|
this._hostsDisabled.delete(host);
|
|
this._hostsAvailable.add(host);
|
|
}
|
|
/**
|
|
* Disables the provided host, removing it from the query pool. It will be
|
|
* re-enabled after a backoff interval
|
|
*/
|
|
_disableHost(host) {
|
|
this._hostsAvailable.delete(host);
|
|
this._hostsDisabled.add(host);
|
|
this._index %= Math.max(1, this._hostsAvailable.size);
|
|
setTimeout(() => this._enableHost(host), host.fail());
|
|
}
|
|
_handleRequestError(err, host, options, callback) {
|
|
if (!(err instanceof ServiceNotAvailableError) &&
|
|
!resubmitErrorCodes.includes(err.code)) {
|
|
return callback(err, null);
|
|
}
|
|
this._disableHost(host);
|
|
const retries = options.retries || 0;
|
|
if (retries < this._options.maxRetries && this.hostIsAvailable()) {
|
|
options.retries = retries + 1;
|
|
return this.stream(options, callback);
|
|
}
|
|
callback(err, null);
|
|
}
|
|
}
|
|
exports.Pool = Pool;
|