From f2efc18dc73d4c02b8af069a860eaa1e595f17fe Mon Sep 17 00:00:00 2001
From: Volker Schukai <volker.schukai@schukai.com>
Date: Fri, 6 Jan 2023 13:19:23 +0100
Subject: [PATCH] feat: new websocket datasource

---
 .../source/data/datasource/websocket.mjs      | 337 ++++++++++++++++++
 .../test/cases/data/datasource/websocket.mjs  |  96 +++++
 development/test/util/websocket.mjs           |  22 ++
 3 files changed, 455 insertions(+)
 create mode 100644 application/source/data/datasource/websocket.mjs
 create mode 100644 development/test/cases/data/datasource/websocket.mjs
 create mode 100644 development/test/util/websocket.mjs

diff --git a/application/source/data/datasource/websocket.mjs b/application/source/data/datasource/websocket.mjs
new file mode 100644
index 000000000..28d60db79
--- /dev/null
+++ b/application/source/data/datasource/websocket.mjs
@@ -0,0 +1,337 @@
+/**
+ * Copyright schukai GmbH and contributors 2022. All Rights Reserved.
+ * Node module: @schukai/monster
+ * This file is licensed under the AGPLv3 License.
+ * License text available at https://www.gnu.org/licenses/agpl-3.0.en.html
+ */
+
+import {internalSymbol, instanceSymbol} from "../../constants.mjs";
+import {isInteger, isString, isObject} from "../../types/is.mjs";
+import {Queue} from "../../types/queue.mjs";
+import {Datasource} from "../datasource.mjs";
+import {Pathfinder} from "../pathfinder.mjs";
+import {Pipe} from "../pipe.mjs";
+
+export {WebSocketDatasource}
+
+/**
+ * @private
+ * @type {symbol}
+ */
+const receiveQueueSymbol = Symbol("queue");
+
+
+/**
+ * @private
+ * @type {symbol}
+ * 
+ * hint: this name is used in the tests. if you want to change it, please change it in the tests as well.
+ */
+const connectionSymbol = Symbol("connection");
+
+/**
+ * @private
+ * @type {symbol}
+ */
+const manualCloseSymbol = Symbol("manualClose");
+
+/**
+ * @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
+ * @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}}
+ */
+const connectionStatusCode = {
+    1000: "Normal closure",
+    1001: "Going away",
+    1002: "Protocol error",
+    1003: "Unsupported data",
+    1004: "Reserved",
+    1005: "No status code",
+    1006: "Connection closed abnormally",
+    1007: "Invalid frame payload data",
+    1008: "Policy violation",
+    1009: "Message too big",
+    1010: "Mandatory extension",
+    1011: "Internal server error",
+    1015: "TLS handshake"
+};
+
+/**
+ * The RestAPI is a class that enables a REST API server.
+ *
+ * @externalExample ../../../example/data/storage/restapi.mjs
+ * @license AGPLv3
+ * @since 3.1.0
+ * @copyright schukai GmbH
+ * @memberOf Monster.Data.Datasource
+ * @summary The LocalStorage class encapsulates the access to data objects.
+ */
+class WebSocketDatasource extends Datasource {
+
+    /**
+     *
+     * @param {Object} [options] options contains definitions for the datasource.
+     */
+    constructor(options) {
+        super();
+
+        if (isString(options)) {
+            options = {url: options};
+        }
+
+        if (!isObject(options)) options = {};
+        this.setOptions(options);
+        this[receiveQueueSymbol] = new Queue();
+
+        this[connectionSymbol] = {};
+        this[connectionSymbol].socket = null;
+        this[connectionSymbol].reconnectCounter = 0;
+        this[manualCloseSymbol]=false;
+    }
+
+    /**
+     *
+     * @returns {Websocketdatasource}
+     * @throws {Error} No url defined for websocket datasource.
+     */
+    connect() {
+        const self = this;
+
+        let connected = false;
+        let reconnectTimeout = self.getOption('reconnect.timeout');
+        if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000;
+        let reconnectAttempts = self.getOption('reconnect.attempts');
+        if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1;
+        let reconnectEnabled = self.getOption('reconnect.enabled');
+        if (reconnectEnabled !== true) reconnectEnabled = false;
+
+        self[manualCloseSymbol] = false;
+        self[connectionSymbol].reconnectCounter++;
+
+        if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) {
+            self[connectionSymbol].socket.close();
+        }
+        self[connectionSymbol].socket = null;
+
+        const url = self.getOption('url');
+        if (!url) throw new Error('No url defined for websocket datasource.');
+
+        self[connectionSymbol].socket = new WebSocket(url);
+
+        self[connectionSymbol].socket.onmessage = function (event) {
+            self[receiveQueueSymbol].add(event);
+            setTimeout(function () {
+                self.read();
+            }, 0);
+        };
+
+        self[connectionSymbol].socket.onopen = function () {
+            connected = true;
+            self[connectionSymbol].reconnectCounter = 0;
+        };
+
+        self[connectionSymbol].socket.close = function (event) {
+            
+            if (self[manualCloseSymbol]) {
+                self[manualCloseSymbol] = false;
+                return;
+            }
+
+            if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) {
+                setTimeout(() => {
+                    self.connect();
+                }, reconnectTimeout * this[connectionSymbol].reconnectCounter);
+            }
+
+        };
+
+        self[connectionSymbol].socket.onerror = (error) => {
+
+            if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) {
+                setTimeout(() => {
+                    self.connect();
+                }, reconnectTimeout * this[connectionSymbol].reconnectCounter);
+            }
+
+        };
+    }
+
+    isConnected() {
+        return this[connectionSymbol].socket && this[connectionSymbol].socket.readyState === 1;
+    }
+
+    /**
+     * This method is called by the `instanceof` operator.
+     * @returns {symbol}
+     */
+    static get [instanceSymbol]() {
+        return Symbol.for("@schukai/monster/data/datasource/websocket");
+    }
+
+    /**
+     * @property {string} url=undefined Defines the resource that you wish to fetch.
+     * @property {Number} reconnect.timeout The timeout in milliseconds for the reconnect.
+     * @property {Number} reconnect.attempts The maximum number of reconnects.
+     * @property {Bool} reconnect.enabled If the reconnect is enabled.
+     * @property {Object} write={} Options
+     * @property {Object} write.mapping the mapping is applied before writing.
+     * @property {String} write.mapping.transformer Transformer to select the appropriate entries
+     * @property {Monster.Data.Datasource~exampleCallback[]} write.mapping.callback with the help of the callback, the structures can be adjusted before writing.
+     * @property {Object} write.report
+     * @property {String} write.report.path Path to validations
+     * @property {Object} write.sheathing
+     * @property {Object} write.sheathing.object Object to be wrapped
+     * @property {string} write.sheathing.path Path to the data
+     * @property {Object} read={} Options
+     * @property {Object} read.mapping the mapping is applied after reading.
+     * @property {String} read.mapping.transformer Transformer to select the appropriate entries
+     * @property {Monster.Data.Datasource~exampleCallback[]} read.mapping.callback with the help of the callback, the structures can be adjusted after reading.
+     */
+    get defaults() {
+        return Object.assign({}, super.defaults, {
+            url: undefined,
+            write: {
+                mapping: {
+                    transformer: undefined,
+                    callbacks: []
+                },
+                report: {
+                    path: undefined
+                },
+                sheathing: {
+                    object: undefined,
+                    path: undefined,
+                },
+            },
+            read: {
+                mapping: {
+                    transformer: undefined,
+                    callbacks: []
+                },
+            },
+            reconnect: {
+                timeout: 1000,
+                attempts: 10,
+                enabled: true
+            }
+        });
+    }
+
+    /**
+     * This method closes the connection.
+     * 
+     * @returns {Promise}
+     */
+    close() {
+        this[manualCloseSymbol]=true;
+        if (this[connectionSymbol].socket) {
+            this[connectionSymbol].socket.close();
+        }
+        return this;
+    }
+
+    /**
+     * @return {Promise}
+     * @throws {Error} the options does not contain a valid json definition
+     * @throws {Error} the data cannot be read
+     * @throws {TypeError} value is not an object
+     */
+    read() {
+        const self = this;
+        let response;
+
+        return new Promise((resolve, reject) => {
+            if (self[receiveQueueSymbol].isEmpty()) {
+                resolve();
+            }
+
+            while (!self[receiveQueueSymbol].isEmpty()) {
+                
+                const event = self[receiveQueueSymbol].poll();
+                const body = event?.data;
+                if (!body) continue;
+
+                let obj;
+                try {
+                    obj = JSON.parse(body);
+                } catch (e) {
+
+                    let msg = 'the response does not contain a valid json (actual: ';
+
+                    if (body.length > 100) {
+                        msg += body.substring(0, 97) + '...';
+                    } else {
+                        msg += body;
+                    }
+
+                    msg += "; " + e.message + ')';
+
+                    reject(msg);
+                }
+
+                let transformation = self.getOption('read.mapping.transformer');
+                if (transformation !== undefined) {
+                    const pipe = new Pipe(transformation);
+
+                    for (const callback of self.getOption('read.mapping.callbacks')) {
+                        pipe.setCallback(callback.constructor.name, callback);
+                    }
+
+                    obj = pipe.run(obj);
+                }
+
+                self.set(obj);
+                return response;
+            }
+        })
+    }
+
+    /**
+     * @return {Promise}
+     */
+    write() {
+        const self = this;
+
+        let obj = self.get();
+        let transformation = self.getOption('write.mapping.transformer');
+        if (transformation !== undefined) {
+            const pipe = new Pipe(transformation);
+
+            for (const callback of self.getOption('write.mapping.callbacks')) {
+                pipe.setCallback(callback.constructor.name, callback);
+            }
+
+            obj = pipe.run(obj);
+        }
+
+        let sheathingObject = self.getOption('write.sheathing.object');
+        let sheathingPath = self.getOption('write.sheathing.path');
+        let reportPath = self.getOption('write.report.path');
+
+        if (sheathingObject && sheathingPath) {
+            const sub = obj;
+            obj = sheathingObject;
+            (new Pathfinder(obj)).setVia(sheathingPath, sub);
+        }
+
+        return new Promise((resolve, reject) => {
+            
+            if (self[connectionSymbol].socket.readyState !== 1) {
+                reject('the socket is not ready');
+            }
+
+            self[connectionSymbol].socket.send(JSON.stringify(obj))
+            resolve();
+        });
+    }
+
+
+    /**
+     * @return {RestAPI}
+     */
+    getClone() {
+        const self = this;
+        return new Websocketdatasource(self[internalSymbol].getRealSubject()['options']);
+    }
+
+}
+
diff --git a/development/test/cases/data/datasource/websocket.mjs b/development/test/cases/data/datasource/websocket.mjs
new file mode 100644
index 000000000..95a748286
--- /dev/null
+++ b/development/test/cases/data/datasource/websocket.mjs
@@ -0,0 +1,96 @@
+import {expect} from "chai"
+import {WebSocketDatasource} from "../../../../../application/source/data/datasource/websocket.mjs";
+import {initWebSocket} from "../../../util/websocket.mjs";
+
+const testUrl = "wss://ws.postman-echo.com/raw"
+
+// const g = getGlobal();
+// g['WebSocket'] = WS;
+
+
+describe('Websocket', function () {
+
+    let ds = undefined
+
+    before(function (done) {
+        initWebSocket().then(() => {
+            done()
+        }).catch((e) => {
+            done(e)
+        })
+    });
+
+    afterEach(function (done) {
+        if (ds) {
+            ds.close()
+        }
+        
+        // without this, the node test will hang 
+        for (const sym of Object.getOwnPropertySymbols(ds)) {
+            if (sym.toString() ==='Symbol(connection)') {
+                if(ds[sym]?.socket?.['terminate']) {
+                    ds[sym]?.socket?.['terminate']()
+                }
+            }
+        }
+        
+        done()
+    });
+
+    it('should connect', function (done) {
+        ds = new WebSocketDatasource({
+            url: testUrl,
+            reconnect: {
+                enabled: false
+            }
+        });
+        ds.connect()
+        setTimeout(() => {
+            expect(ds.isConnected()).to.be.true;
+            done();
+        }, 500);
+
+
+    })
+
+    it('should send message', function (done) {
+        ds = new WebSocketDatasource({
+            url: testUrl,
+            reconnect: {
+                enabled: false
+            }
+        });
+        ds.connect()
+
+        ds.set({
+            data: {
+                message: "Hello World"
+            }
+        })
+
+        setTimeout(() => {
+
+            ds.write().then(() => {
+
+                ds.set({})
+                expect(ds.get()).to.be.deep.equal({});
+
+
+                setTimeout(() => {
+
+                    expect(ds.get()).to.be.deep.equal({
+                        data: {
+                            message: "Hello World"
+                        }
+                    });
+                    done();
+                }, 1000);
+            }).catch((err) => {
+                done(new Error(err));
+            })
+        }, 1000)
+
+
+    }).timeout(10000);
+
+});
diff --git a/development/test/util/websocket.mjs b/development/test/util/websocket.mjs
new file mode 100644
index 000000000..b7ac1ce19
--- /dev/null
+++ b/development/test/util/websocket.mjs
@@ -0,0 +1,22 @@
+import {getGlobal} from "../../../application/source/types/global.mjs";
+
+function initWebSocket() {
+    if (typeof window === "object" && window['WebSocket']) return Promise.resolve();
+
+    return import("ws").then((ws) => {
+        getGlobal().WebSocket = class extends ws['WebSocket'] {
+            constructor(url, protocols) {
+                super(url, protocols, {
+                    handshakeTimeout: 1000,
+                    maxPayload: 1024 * 1024 * 1024,
+                });
+                
+            }
+        };
+        
+    });
+
+
+}
+
+export {initWebSocket}
\ No newline at end of file
-- 
GitLab