Something went wrong on our end
Select Git revision
websocket.mjs
-
Volker Schukai authoredVolker Schukai authored
websocket.mjs 10.48 KiB
/**
* Copyright schukai GmbH and contributors 2022. All Rights Reserved.
* Node module: @schukai/monster
* This file is licensed under the AGPLv3 License.
* License text available at https://www.gnu.org/licenses/agpl-3.0.en.html
*/
import {internalSymbol, instanceSymbol} from "../../constants.mjs";
import {isInteger, isString, isObject} from "../../types/is.mjs";
import {Queue} from "../../types/queue.mjs";
import {Datasource} from "../datasource.mjs";
import {Pathfinder} from "../pathfinder.mjs";
import {Pipe} from "../pipe.mjs";
export {WebSocketDatasource}
/**
* @private
* @type {Symbol}
*/
const receiveQueueSymbol = Symbol("queue");
/**
* @private
* @type {Symbol}
*
* hint: this name is used in the tests. if you want to change it, please change it in the tests as well.
*/
const connectionSymbol = Symbol("connection");
/**
* @private
* @type {symbol}
*/
const manualCloseSymbol = Symbol("manualClose");
/**
* @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
* @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}}
*/
const connectionStatusCode = {
1000: "Normal closure",
1001: "Going away",
1002: "Protocol error",
1003: "Unsupported data",
1004: "Reserved",
1005: "No status code",
1006: "Connection closed abnormally",
1007: "Invalid frame payload data",
1008: "Policy violation",
1009: "Message too big",
1010: "Mandatory extension",
1011: "Internal server error",
1015: "TLS handshake"
};
/**
* The RestAPI is a class that enables a REST API server.
*
* @externalExample ../../../example/data/storage/restapi.mjs
* @license AGPLv3
* @since 3.1.0
* @copyright schukai GmbH
* @memberOf Monster.Data.Datasource
* @summary The LocalStorage class encapsulates the access to data objects.
*/
class WebSocketDatasource extends Datasource {
/**
*
* @param {Object} [options] options contains definitions for the datasource.
*/
constructor(options) {
super();
if (isString(options)) {
options = {url: options};
}
if (!isObject(options)) options = {};
this.setOptions(options);
this[receiveQueueSymbol] = new Queue();
this[connectionSymbol] = {};
this[connectionSymbol].socket = null;
this[connectionSymbol].reconnectCounter = 0;
this[manualCloseSymbol]=false;
}
/**
*
* @returns {Websocketdatasource}
* @throws {Error} No url defined for websocket datasource.
*/
connect() {
const self = this;
let connected = false;
let reconnectTimeout = self.getOption('reconnect.timeout');
if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000;
let reconnectAttempts = self.getOption('reconnect.attempts');
if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1;
let reconnectEnabled = self.getOption('reconnect.enabled');
if (reconnectEnabled !== true) reconnectEnabled = false;
self[manualCloseSymbol] = false;
self[connectionSymbol].reconnectCounter++;
if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) {
self[connectionSymbol].socket.close();
}
self[connectionSymbol].socket = null;
const url = self.getOption('url');
if (!url) throw new Error('No url defined for websocket datasource.');
self[connectionSymbol].socket = new WebSocket(url);
self[connectionSymbol].socket.onmessage = function (event) {
self[receiveQueueSymbol].add(event);
setTimeout(function () {
self.read();
}, 0);
};
self[connectionSymbol].socket.onopen = function () {
connected = true;
self[connectionSymbol].reconnectCounter = 0;
};
self[connectionSymbol].socket.close = function (event) {
if (self[manualCloseSymbol]) {
self[manualCloseSymbol] = false;
return;
}
if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) {
setTimeout(() => {
self.connect();
}, reconnectTimeout * this[connectionSymbol].reconnectCounter);
}
};
self[connectionSymbol].socket.onerror = (error) => {
if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) {
setTimeout(() => {
self.connect();
}, reconnectTimeout * this[connectionSymbol].reconnectCounter);
}
};
}
isConnected() {
return this[connectionSymbol].socket && this[connectionSymbol].socket.readyState === 1;
}
/**
* This method is called by the `instanceof` operator.
* @returns {symbol}
*/
static get [instanceSymbol]() {
return Symbol.for("@schukai/monster/data/datasource/websocket");
}
/**
* @property {string} url=undefined Defines the resource that you wish to fetch.
* @property {Number} reconnect.timeout The timeout in milliseconds for the reconnect.
* @property {Number} reconnect.attempts The maximum number of reconnects.
* @property {Bool} reconnect.enabled If the reconnect is enabled.
* @property {Object} write={} Options
* @property {Object} write.mapping the mapping is applied before writing.
* @property {String} write.mapping.transformer Transformer to select the appropriate entries
* @property {Monster.Data.Datasource~exampleCallback[]} write.mapping.callback with the help of the callback, the structures can be adjusted before writing.
* @property {Object} write.report
* @property {String} write.report.path Path to validations
* @property {Object} write.sheathing
* @property {Object} write.sheathing.object Object to be wrapped
* @property {string} write.sheathing.path Path to the data
* @property {Object} read={} Options
* @property {Object} read.mapping the mapping is applied after reading.
* @property {String} read.mapping.transformer Transformer to select the appropriate entries
* @property {Monster.Data.Datasource~exampleCallback[]} read.mapping.callback with the help of the callback, the structures can be adjusted after reading.
*/
get defaults() {
return Object.assign({}, super.defaults, {
url: undefined,
write: {
mapping: {
transformer: undefined,
callbacks: []
},
report: {
path: undefined
},
sheathing: {
object: undefined,
path: undefined,
},
},
read: {
mapping: {
transformer: undefined,
callbacks: []
},
},
reconnect: {
timeout: 1000,
attempts: 10,
enabled: true
}
});
}
/**
* This method closes the connection.
*
* @returns {Promise}
*/
close() {
this[manualCloseSymbol]=true;
if (this[connectionSymbol].socket) {
this[connectionSymbol].socket.close();
}
return this;
}
/**
* @return {Promise}
* @throws {Error} the options does not contain a valid json definition
* @throws {Error} the data cannot be read
* @throws {TypeError} value is not an object
*/
read() {
const self = this;
let response;
return new Promise((resolve, reject) => {
if (self[receiveQueueSymbol].isEmpty()) {
resolve();
}
while (!self[receiveQueueSymbol].isEmpty()) {
const event = self[receiveQueueSymbol].poll();
const body = event?.data;
if (!body) continue;
let obj;
try {
obj = JSON.parse(body);
} catch (e) {
let msg = 'the response does not contain a valid json (actual: ';
if (body.length > 100) {
msg += body.substring(0, 97) + '...';
} else {
msg += body;
}
msg += "; " + e.message + ')';
reject(msg);
}
let transformation = self.getOption('read.mapping.transformer');
if (transformation !== undefined) {
const pipe = new Pipe(transformation);
for (const callback of self.getOption('read.mapping.callbacks')) {
pipe.setCallback(callback.constructor.name, callback);
}
obj = pipe.run(obj);
}
self.set(obj);
return response;
}
})
}
/**
* @return {Promise}
*/
write() {
const self = this;
let obj = self.get();
let transformation = self.getOption('write.mapping.transformer');
if (transformation !== undefined) {
const pipe = new Pipe(transformation);
for (const callback of self.getOption('write.mapping.callbacks')) {
pipe.setCallback(callback.constructor.name, callback);
}
obj = pipe.run(obj);
}
let sheathingObject = self.getOption('write.sheathing.object');
let sheathingPath = self.getOption('write.sheathing.path');
let reportPath = self.getOption('write.report.path');
if (sheathingObject && sheathingPath) {
const sub = obj;
obj = sheathingObject;
(new Pathfinder(obj)).setVia(sheathingPath, sub);
}
return new Promise((resolve, reject) => {
if (self[connectionSymbol].socket.readyState !== 1) {
reject('the socket is not ready');
}
self[connectionSymbol].socket.send(JSON.stringify(obj))
resolve();
});
}
/**
* @return {RestAPI}
*/
getClone() {
const self = this;
return new WebSocketDatasource(self[internalSymbol].getRealSubject()['options']);
}
}