Skip to content
Snippets Groups Projects
Select Git revision
  • 27d74911f60a11b7e8e693c0268bf1fd884a5b97
  • master default protected
  • 1.31
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
  • 4.32.0
  • 4.31.0
  • 4.30.1
  • 4.30.0
  • 4.29.1
  • 4.29.0
  • 4.28.0
  • 4.27.0
  • 4.26.0
  • 4.25.5
  • 4.25.4
  • 4.25.3
  • 4.25.2
  • 4.25.1
23 results

websocket.mjs

Blame
  • websocket.mjs 10.48 KiB
    /**
     * Copyright schukai GmbH and contributors 2022. All Rights Reserved.
     * Node module: @schukai/monster
     * This file is licensed under the AGPLv3 License.
     * License text available at https://www.gnu.org/licenses/agpl-3.0.en.html
     */
    
    import {internalSymbol, instanceSymbol} from "../../constants.mjs";
    import {isInteger, isString, isObject} from "../../types/is.mjs";
    import {Queue} from "../../types/queue.mjs";
    import {Datasource} from "../datasource.mjs";
    import {Pathfinder} from "../pathfinder.mjs";
    import {Pipe} from "../pipe.mjs";
    
    export {WebSocketDatasource}
    
    /**
     * @private
     * @type {Symbol}
     */
    const receiveQueueSymbol = Symbol("queue");
    
    
    /**
     * @private
     * @type {Symbol}
     * 
     * hint: this name is used in the tests. if you want to change it, please change it in the tests as well.
     */
    const connectionSymbol = Symbol("connection");
    
    /**
     * @private
     * @type {symbol}
     */
    const manualCloseSymbol = Symbol("manualClose");
    
    /**
     * @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
     * @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}}
     */
    const connectionStatusCode = {
        1000: "Normal closure",
        1001: "Going away",
        1002: "Protocol error",
        1003: "Unsupported data",
        1004: "Reserved",
        1005: "No status code",
        1006: "Connection closed abnormally",
        1007: "Invalid frame payload data",
        1008: "Policy violation",
        1009: "Message too big",
        1010: "Mandatory extension",
        1011: "Internal server error",
        1015: "TLS handshake"
    };
    
    /**
     * The RestAPI is a class that enables a REST API server.
     *
     * @externalExample ../../../example/data/storage/restapi.mjs
     * @license AGPLv3
     * @since 3.1.0
     * @copyright schukai GmbH
     * @memberOf Monster.Data.Datasource
     * @summary The LocalStorage class encapsulates the access to data objects.
     */
    class WebSocketDatasource extends Datasource {
    
        /**
         *
         * @param {Object} [options] options contains definitions for the datasource.
         */
        constructor(options) {
            super();
    
            if (isString(options)) {
                options = {url: options};
            }
    
            if (!isObject(options)) options = {};
            this.setOptions(options);
            this[receiveQueueSymbol] = new Queue();
    
            this[connectionSymbol] = {};
            this[connectionSymbol].socket = null;
            this[connectionSymbol].reconnectCounter = 0;
            this[manualCloseSymbol]=false;
        }
    
        /**
         *
         * @returns {Websocketdatasource}
         * @throws {Error} No url defined for websocket datasource.
         */
        connect() {
            const self = this;
    
            let connected = false;
            let reconnectTimeout = self.getOption('reconnect.timeout');
            if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000;
            let reconnectAttempts = self.getOption('reconnect.attempts');
            if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1;
            let reconnectEnabled = self.getOption('reconnect.enabled');
            if (reconnectEnabled !== true) reconnectEnabled = false;
    
            self[manualCloseSymbol] = false;
            self[connectionSymbol].reconnectCounter++;
    
            if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) {
                self[connectionSymbol].socket.close();
            }
            self[connectionSymbol].socket = null;
    
            const url = self.getOption('url');
            if (!url) throw new Error('No url defined for websocket datasource.');
    
            self[connectionSymbol].socket = new WebSocket(url);
    
            self[connectionSymbol].socket.onmessage = function (event) {
                self[receiveQueueSymbol].add(event);
                setTimeout(function () {
                    self.read();
                }, 0);
            };
    
            self[connectionSymbol].socket.onopen = function () {
                connected = true;
                self[connectionSymbol].reconnectCounter = 0;
            };
    
            self[connectionSymbol].socket.close = function (event) {
                
                if (self[manualCloseSymbol]) {
                    self[manualCloseSymbol] = false;
                    return;
                }
    
                if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) {
                    setTimeout(() => {
                        self.connect();
                    }, reconnectTimeout * this[connectionSymbol].reconnectCounter);
                }
    
            };
    
            self[connectionSymbol].socket.onerror = (error) => {
    
                if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) {
                    setTimeout(() => {
                        self.connect();
                    }, reconnectTimeout * this[connectionSymbol].reconnectCounter);
                }
    
            };
        }
    
        isConnected() {
            return this[connectionSymbol].socket && this[connectionSymbol].socket.readyState === 1;
        }
    
        /**
         * This method is called by the `instanceof` operator.
         * @returns {symbol}
         */
        static get [instanceSymbol]() {
            return Symbol.for("@schukai/monster/data/datasource/websocket");
        }
    
        /**
         * @property {string} url=undefined Defines the resource that you wish to fetch.
         * @property {Number} reconnect.timeout The timeout in milliseconds for the reconnect.
         * @property {Number} reconnect.attempts The maximum number of reconnects.
         * @property {Bool} reconnect.enabled If the reconnect is enabled.
         * @property {Object} write={} Options
         * @property {Object} write.mapping the mapping is applied before writing.
         * @property {String} write.mapping.transformer Transformer to select the appropriate entries
         * @property {Monster.Data.Datasource~exampleCallback[]} write.mapping.callback with the help of the callback, the structures can be adjusted before writing.
         * @property {Object} write.report
         * @property {String} write.report.path Path to validations
         * @property {Object} write.sheathing
         * @property {Object} write.sheathing.object Object to be wrapped
         * @property {string} write.sheathing.path Path to the data
         * @property {Object} read={} Options
         * @property {Object} read.mapping the mapping is applied after reading.
         * @property {String} read.mapping.transformer Transformer to select the appropriate entries
         * @property {Monster.Data.Datasource~exampleCallback[]} read.mapping.callback with the help of the callback, the structures can be adjusted after reading.
         */
        get defaults() {
            return Object.assign({}, super.defaults, {
                url: undefined,
                write: {
                    mapping: {
                        transformer: undefined,
                        callbacks: []
                    },
                    report: {
                        path: undefined
                    },
                    sheathing: {
                        object: undefined,
                        path: undefined,
                    },
                },
                read: {
                    mapping: {
                        transformer: undefined,
                        callbacks: []
                    },
                },
                reconnect: {
                    timeout: 1000,
                    attempts: 10,
                    enabled: true
                }
            });
        }
    
        /**
         * This method closes the connection.
         * 
         * @returns {Promise}
         */
        close() {
            this[manualCloseSymbol]=true;
            if (this[connectionSymbol].socket) {
                this[connectionSymbol].socket.close();
            }
            return this;
        }
    
        /**
         * @return {Promise}
         * @throws {Error} the options does not contain a valid json definition
         * @throws {Error} the data cannot be read
         * @throws {TypeError} value is not an object
         */
        read() {
            const self = this;
            let response;
    
            return new Promise((resolve, reject) => {
                if (self[receiveQueueSymbol].isEmpty()) {
                    resolve();
                }
    
                while (!self[receiveQueueSymbol].isEmpty()) {
                    
                    const event = self[receiveQueueSymbol].poll();
                    const body = event?.data;
                    if (!body) continue;
    
                    let obj;
                    try {
                        obj = JSON.parse(body);
                    } catch (e) {
    
                        let msg = 'the response does not contain a valid json (actual: ';
    
                        if (body.length > 100) {
                            msg += body.substring(0, 97) + '...';
                        } else {
                            msg += body;
                        }
    
                        msg += "; " + e.message + ')';
    
                        reject(msg);
                    }
    
                    let transformation = self.getOption('read.mapping.transformer');
                    if (transformation !== undefined) {
                        const pipe = new Pipe(transformation);
    
                        for (const callback of self.getOption('read.mapping.callbacks')) {
                            pipe.setCallback(callback.constructor.name, callback);
                        }
    
                        obj = pipe.run(obj);
                    }
    
                    self.set(obj);
                    return response;
                }
            })
        }
    
        /**
         * @return {Promise}
         */
        write() {
            const self = this;
    
            let obj = self.get();
            let transformation = self.getOption('write.mapping.transformer');
            if (transformation !== undefined) {
                const pipe = new Pipe(transformation);
    
                for (const callback of self.getOption('write.mapping.callbacks')) {
                    pipe.setCallback(callback.constructor.name, callback);
                }
    
                obj = pipe.run(obj);
            }
    
            let sheathingObject = self.getOption('write.sheathing.object');
            let sheathingPath = self.getOption('write.sheathing.path');
            let reportPath = self.getOption('write.report.path');
    
            if (sheathingObject && sheathingPath) {
                const sub = obj;
                obj = sheathingObject;
                (new Pathfinder(obj)).setVia(sheathingPath, sub);
            }
    
            return new Promise((resolve, reject) => {
                
                if (self[connectionSymbol].socket.readyState !== 1) {
                    reject('the socket is not ready');
                }
    
                self[connectionSymbol].socket.send(JSON.stringify(obj))
                resolve();
            });
        }
    
    
        /**
         * @return {RestAPI}
         */
        getClone() {
            const self = this;
            return new WebSocketDatasource(self[internalSymbol].getRealSubject()['options']);
        }
    
    }