/** * Copyright schukai GmbH and contributors 2022. All Rights Reserved. * Node module: @schukai/monster * This file is licensed under the AGPLv3 License. * License text available at https://www.gnu.org/licenses/agpl-3.0.en.html */ import {internalSymbol, instanceSymbol} from "../../constants.mjs"; import {isInteger, isString, isObject} from "../../types/is.mjs"; import {Queue} from "../../types/queue.mjs"; import {Datasource} from "../datasource.mjs"; import {Pathfinder} from "../pathfinder.mjs"; import {Pipe} from "../pipe.mjs"; export {WebSocketDatasource} /** * @private * @type {Symbol} */ const receiveQueueSymbol = Symbol("queue"); /** * @private * @type {Symbol} * * hint: this name is used in the tests. if you want to change it, please change it in the tests as well. */ const connectionSymbol = Symbol("connection"); /** * @private * @type {symbol} */ const manualCloseSymbol = Symbol("manualClose"); /** * @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1 * @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}} */ const connectionStatusCode = { 1000: "Normal closure", 1001: "Going away", 1002: "Protocol error", 1003: "Unsupported data", 1004: "Reserved", 1005: "No status code", 1006: "Connection closed abnormally", 1007: "Invalid frame payload data", 1008: "Policy violation", 1009: "Message too big", 1010: "Mandatory extension", 1011: "Internal server error", 1015: "TLS handshake" }; /** * The RestAPI is a class that enables a REST API server. * * @externalExample ../../../example/data/storage/restapi.mjs * @license AGPLv3 * @since 3.1.0 * @copyright schukai GmbH * @memberOf Monster.Data.Datasource * @summary The LocalStorage class encapsulates the access to data objects. */ class WebSocketDatasource extends Datasource { /** * * @param {Object} [options] options contains definitions for the datasource. */ constructor(options) { super(); if (isString(options)) { options = {url: options}; } if (!isObject(options)) options = {}; this.setOptions(options); this[receiveQueueSymbol] = new Queue(); this[connectionSymbol] = {}; this[connectionSymbol].socket = null; this[connectionSymbol].reconnectCounter = 0; this[manualCloseSymbol]=false; } /** * * @returns {Websocketdatasource} * @throws {Error} No url defined for websocket datasource. */ connect() { const self = this; let connected = false; let reconnectTimeout = self.getOption('reconnect.timeout'); if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000; let reconnectAttempts = self.getOption('reconnect.attempts'); if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1; let reconnectEnabled = self.getOption('reconnect.enabled'); if (reconnectEnabled !== true) reconnectEnabled = false; self[manualCloseSymbol] = false; self[connectionSymbol].reconnectCounter++; if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) { self[connectionSymbol].socket.close(); } self[connectionSymbol].socket = null; const url = self.getOption('url'); if (!url) throw new Error('No url defined for websocket datasource.'); self[connectionSymbol].socket = new WebSocket(url); self[connectionSymbol].socket.onmessage = function (event) { self[receiveQueueSymbol].add(event); setTimeout(function () { self.read(); }, 0); }; self[connectionSymbol].socket.onopen = function () { connected = true; self[connectionSymbol].reconnectCounter = 0; }; self[connectionSymbol].socket.close = function (event) { if (self[manualCloseSymbol]) { self[manualCloseSymbol] = false; return; } if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) { setTimeout(() => { self.connect(); }, reconnectTimeout * this[connectionSymbol].reconnectCounter); } }; self[connectionSymbol].socket.onerror = (error) => { if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) { setTimeout(() => { self.connect(); }, reconnectTimeout * this[connectionSymbol].reconnectCounter); } }; } isConnected() { return this[connectionSymbol].socket && this[connectionSymbol].socket.readyState === 1; } /** * This method is called by the `instanceof` operator. * @returns {symbol} */ static get [instanceSymbol]() { return Symbol.for("@schukai/monster/data/datasource/websocket"); } /** * @property {string} url=undefined Defines the resource that you wish to fetch. * @property {Number} reconnect.timeout The timeout in milliseconds for the reconnect. * @property {Number} reconnect.attempts The maximum number of reconnects. * @property {Bool} reconnect.enabled If the reconnect is enabled. * @property {Object} write={} Options * @property {Object} write.mapping the mapping is applied before writing. * @property {String} write.mapping.transformer Transformer to select the appropriate entries * @property {Monster.Data.Datasource~exampleCallback[]} write.mapping.callback with the help of the callback, the structures can be adjusted before writing. * @property {Object} write.report * @property {String} write.report.path Path to validations * @property {Object} write.sheathing * @property {Object} write.sheathing.object Object to be wrapped * @property {string} write.sheathing.path Path to the data * @property {Object} read={} Options * @property {Object} read.mapping the mapping is applied after reading. * @property {String} read.mapping.transformer Transformer to select the appropriate entries * @property {Monster.Data.Datasource~exampleCallback[]} read.mapping.callback with the help of the callback, the structures can be adjusted after reading. */ get defaults() { return Object.assign({}, super.defaults, { url: undefined, write: { mapping: { transformer: undefined, callbacks: [] }, report: { path: undefined }, sheathing: { object: undefined, path: undefined, }, }, read: { mapping: { transformer: undefined, callbacks: [] }, }, reconnect: { timeout: 1000, attempts: 10, enabled: true } }); } /** * This method closes the connection. * * @returns {Promise} */ close() { this[manualCloseSymbol]=true; if (this[connectionSymbol].socket) { this[connectionSymbol].socket.close(); } return this; } /** * @return {Promise} * @throws {Error} the options does not contain a valid json definition * @throws {Error} the data cannot be read * @throws {TypeError} value is not an object */ read() { const self = this; let response; return new Promise((resolve, reject) => { if (self[receiveQueueSymbol].isEmpty()) { resolve(); } while (!self[receiveQueueSymbol].isEmpty()) { const event = self[receiveQueueSymbol].poll(); const body = event?.data; if (!body) continue; let obj; try { obj = JSON.parse(body); } catch (e) { let msg = 'the response does not contain a valid json (actual: '; if (body.length > 100) { msg += body.substring(0, 97) + '...'; } else { msg += body; } msg += "; " + e.message + ')'; reject(msg); } let transformation = self.getOption('read.mapping.transformer'); if (transformation !== undefined) { const pipe = new Pipe(transformation); for (const callback of self.getOption('read.mapping.callbacks')) { pipe.setCallback(callback.constructor.name, callback); } obj = pipe.run(obj); } self.set(obj); return response; } }) } /** * @return {Promise} */ write() { const self = this; let obj = self.get(); let transformation = self.getOption('write.mapping.transformer'); if (transformation !== undefined) { const pipe = new Pipe(transformation); for (const callback of self.getOption('write.mapping.callbacks')) { pipe.setCallback(callback.constructor.name, callback); } obj = pipe.run(obj); } let sheathingObject = self.getOption('write.sheathing.object'); let sheathingPath = self.getOption('write.sheathing.path'); let reportPath = self.getOption('write.report.path'); if (sheathingObject && sheathingPath) { const sub = obj; obj = sheathingObject; (new Pathfinder(obj)).setVia(sheathingPath, sub); } return new Promise((resolve, reject) => { if (self[connectionSymbol].socket.readyState !== 1) { reject('the socket is not ready'); } self[connectionSymbol].socket.send(JSON.stringify(obj)) resolve(); }); } /** * @return {RestAPI} */ getClone() { const self = this; return new WebSocketDatasource(self[internalSymbol].getRealSubject()['options']); } }