diff --git a/application/source/net/webconnect.mjs b/application/source/net/webconnect.mjs new file mode 100644 index 0000000000000000000000000000000000000000..48535e2b1b816a6827aa752b93f00b210e81cadd --- /dev/null +++ b/application/source/net/webconnect.mjs @@ -0,0 +1,346 @@ +/** + * Copyright schukai GmbH and contributors 2022. All Rights Reserved. + * Node module: @schukai/monster + * This file is licensed under the AGPLv3 License. + * License text available at https://www.gnu.org/licenses/agpl-3.0.en.html + */ + +import {instanceSymbol} from "../constants.mjs"; +import {isInteger, isString, isObject} from "../types/is.mjs"; +import {BaseWithOptions} from "../types/basewithoptions.mjs"; +import {ObservableQueue} from "../types/observablequeue.mjs"; +import {Message} from "./webconnect/message.mjs"; + + +export {WebConnect} + +/** + * @private + * @type {Symbol} + */ +const receiveQueueSymbol = Symbol("receiveQueue"); +/** + * @private + * @type {Symbol} + */ +const sendQueueSymbol = Symbol("sendQueue"); + +/** + * @private + * @type {Symbol} + * + * hint: this name is used in the tests. if you want to change it, please change it in the tests as well. + */ +const connectionSymbol = Symbol("connection"); + +/** + * @private + * @type {symbol} + */ +const manualCloseSymbol = Symbol("manualClose"); + +/** + * @private + * @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1 + * @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}} + */ +const connectionStatusCode = { + 1000: "Normal closure", + 1001: "Going away", + 1002: "Protocol error", + 1003: "Unsupported data", + 1004: "Reserved", + 1005: "No status code", + 1006: "Connection closed abnormally", + 1007: "Invalid frame payload data", + 1008: "Policy violation", + 1009: "Message too big", + 1010: "Mandatory extension", + 1011: "Internal server error", + 1015: "TLS handshake" +}; + +/** + * @private + * @this {WebConnect} + * @throws {Error} No url defined for websocket datasource. + */ +function connectServer(resolve, reject) { + const self = this; + + const url = self.getOption('url'); + if (!url) { + reject('No url defined for webconnect.'); + return; + } + + let promiseAllredyResolved = false; + + let connectionTimeout = self.getOption('connection.timeout'); + if (!isInteger(connectionTimeout) || connectionTimeout < 100) { + connectionTimeout = 5000; + } + + setTimeout(() => { + if (promiseAllredyResolved) { + return; + } + reject(new Error("Connection timeout")); + }, connectionTimeout); + + let reconnectTimeout = self.getOption('connection.reconnect.timeout'); + if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000; + let reconnectAttempts = self.getOption('connection.reconnect.attempts'); + if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1; + let reconnectEnabled = self.getOption('connection.reconnect.enabled'); + if (reconnectEnabled !== true) reconnectEnabled = false; + + self[manualCloseSymbol] = false; + self[connectionSymbol].reconnectCounter++; + + if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) { + self[connectionSymbol].socket.close(); + } + self[connectionSymbol].socket = null; + self[connectionSymbol].socket = new WebSocket(url); + + self[connectionSymbol].socket.onmessage = function (event) { + if (event.data instanceof Blob) { + const reader = new FileReader(); + reader.addEventListener("loadend", function () { + self[receiveQueueSymbol].add(new Message(reader.result)) + }); + reader.readAsText(new Message(event.data)); + } else { + self[receiveQueueSymbol].add(Message.fromJSON(event.data)); + } + }; + + self[connectionSymbol].socket.onopen = function () { + self[connectionSymbol].reconnectCounter = 0; + if (typeof resolve === 'function' && !promiseAllredyResolved) { + promiseAllredyResolved = true; + resolve(); + } + }; + + self[connectionSymbol].socket.close = function (event) { + + if (self[manualCloseSymbol]) { + self[manualCloseSymbol] = false; + return; + } + + if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) { + setTimeout(() => { + self.connect(); + }, reconnectTimeout * this[connectionSymbol].reconnectCounter); + } + + }; + + self[connectionSymbol].socket.onerror = (error) => { + + if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) { + setTimeout(() => { + self.connect(); + }, reconnectTimeout * this[connectionSymbol].reconnectCounter); + } else { + if (typeof reject === 'function' && !promiseAllredyResolved) { + promiseAllredyResolved = true; + reject(error); + } + } + + }; +} + +/** + * The RestAPI is a class that enables a REST API server. + * + * @externalExample ../../../example/data/storage/restapi.mjs + * @license AGPLv3 + * @since 3.1.0 + * @copyright schukai GmbH + * @memberOf Monster.Data.Datasource + * @summary The LocalStorage class encapsulates the access to data objects. + */ +class WebConnect extends BaseWithOptions { + + /** + * + * @param {Object} [options] options contains definitions for the datasource. + */ + constructor(options) { + + if (isString(options)) { + options = {url: options}; + } + + super(options); + + this[receiveQueueSymbol] = new ObservableQueue(); + this[sendQueueSymbol] = new ObservableQueue(); + + this[connectionSymbol] = {}; + this[connectionSymbol].socket = null; + this[connectionSymbol].reconnectCounter = 0; + this[manualCloseSymbol] = false; + } + + /** + * + * @returns {Promise} + */ + connect() { + const self = this; + + return new Promise((resolve, reject) => { + connectServer.call(this, resolve, reject); + }); + } + + /** + * @returns {boolean} + */ + isConnected() { + return this[connectionSymbol]?.socket?.readyState === 1; + } + + /** + * This method is called by the `instanceof` operator. + * @returns {symbol} + */ + static get [instanceSymbol]() { + return Symbol.for("@schukai/monster/net/webconnect"); + } + + /** + * @property {string} url=undefined Defines the resource that you wish to fetch. + * @property {Object} connection + * @property {Object} connection.timeout=5000 Defines the timeout for the connection. + * @property {Number} connection.reconnect.timeout The timeout in milliseconds for the reconnect. + * @property {Number} connection.reconnect.attempts The maximum number of reconnects. + * @property {Bool} connection.reconnect.enabled If the reconnect is enabled. + */ + get defaults() { + return Object.assign({}, super.defaults, { + url: undefined, + connection: { + timeout: 5000, + reconnect: { + timeout: 1000, + attempts: 1, + enabled: false, + } + } + }); + } + + /** + * This method closes the connection. + * + * @param {Number} [code=1000] The close code. + * @param {String} [reason=""] The close reason. + * @returns {Promise} + * @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1 + */ + close(statusCode, reason ) { + if (!isInteger(statusCode) || statusCode < 1000 || statusCode > 4999) { + statusCode = 1000; + } + if (!isString(reason)) { + reason = ''; + } + + return new Promise((resolve, reject) => { + try { + this[manualCloseSymbol] = true; + if (this[connectionSymbol].socket) { + this[connectionSymbol].socket.close(statusCode, reason); + } + } catch (error) { + reject(error); + } + resolve(); + }); + + } + + /** + * Polls the receive queue for new messages. + * + * @returns {Message} + */ + poll() { + return this[receiveQueueSymbol].poll(); + } + + /** + * Are there any messages in the receive queue? + * + * @returns {boolean} + */ + dataReceived() { + return !this[receiveQueueSymbol].isEmpty(); + } + + /** + * Get Message from the receive queue, but do not remove it. + * + * @returns {Object} + */ + peek() { + return this[receiveQueueSymbol].peek(); + } + + /** + * Attach a new observer + * + * @param {Observer} observer + * @returns {ProxyObserver} + */ + attachObserver(observer) { + this[receiveQueueSymbol].attachObserver(observer); + return this; + } + + /** + * Detach a observer + * + * @param {Observer} observer + * @returns {ProxyObserver} + */ + detachObserver(observer) { + this[receiveQueueSymbol].detachObserver(observer); + return this; + } + + /** + * @param {Observer} observer + * @returns {boolean} + */ + containsObserver(observer) { + return this[receiveQueueSymbol].containsObserver(observer); + } + + /** + * @param {Message|Object} message + * @return {Promise} + */ + send(message) { + const self = this; + + return new Promise((resolve, reject) => { + + if (self[connectionSymbol].socket.readyState !== 1) { + reject('the socket is not ready'); + } + + self[connectionSymbol].socket.send(JSON.stringify(message)) + resolve(); + }); + } + +} +