From ec7cde161dfc5185de198ce45ee198b10adc6625 Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Fri, 6 Jan 2023 21:43:07 +0100 Subject: [PATCH] feat: connect return now a promise --- .../source/data/datasource/websocket.mjs | 197 +++++++++++------- 1 file changed, 120 insertions(+), 77 deletions(-) diff --git a/application/source/data/datasource/websocket.mjs b/application/source/data/datasource/websocket.mjs index daa771d15..9047ef768 100644 --- a/application/source/data/datasource/websocket.mjs +++ b/application/source/data/datasource/websocket.mjs @@ -24,7 +24,7 @@ const receiveQueueSymbol = Symbol("queue"); /** * @private * @type {Symbol} - * + * * hint: this name is used in the tests. if you want to change it, please change it in the tests as well. */ const connectionSymbol = Symbol("connection"); @@ -55,6 +55,96 @@ const connectionStatusCode = { 1015: "TLS handshake" }; +/** + * @private + * @this {WebSocketDatasource} + * @throws {Error} No url defined for websocket datasource. + */ +function connectServer(resolve, reject) { + const self = this; + + let promiseAllredyResolved = false; + let connectionTimeout = self.getOption('connection.timeout'); + if (!isInteger(connectionTimeout) || connectionTimeout < 100) { + connectionTimeout = 5000; + } + + setTimeout(() => { + if (promiseAllredyResolved) { + return; + } + reject(new Error("Connection timeout")); + }, connectionTimeout); + + let reconnectTimeout = self.getOption('connection.reconnect.timeout'); + if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000; + let reconnectAttempts = self.getOption('connection.reconnect.attempts'); + if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1; + let reconnectEnabled = self.getOption('connection.reconnect.enabled'); + if (reconnectEnabled !== true) reconnectEnabled = false; + + self[manualCloseSymbol] = false; + self[connectionSymbol].reconnectCounter++; + + if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) { + self[connectionSymbol].socket.close(); + } + self[connectionSymbol].socket = null; + + const url = self.getOption('url'); + if (!url) { + reject('No url defined for websocket datasource.'); + return; + } + + self[connectionSymbol].socket = new WebSocket(url); + + self[connectionSymbol].socket.onmessage = function (event) { + self[receiveQueueSymbol].add(event); + setTimeout(function () { + self.read(); + }, 1); + }; + + self[connectionSymbol].socket.onopen = function () { + self[connectionSymbol].reconnectCounter = 0; + if (typeof resolve === 'function' && !promiseAllredyResolved) { + promiseAllredyResolved = true; + resolve(); + } + }; + + self[connectionSymbol].socket.close = function (event) { + + if (self[manualCloseSymbol]) { + self[manualCloseSymbol] = false; + return; + } + + if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) { + setTimeout(() => { + self.connect(); + }, reconnectTimeout * this[connectionSymbol].reconnectCounter); + } + + }; + + self[connectionSymbol].socket.onerror = (error) => { + + if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) { + setTimeout(() => { + self.connect(); + }, reconnectTimeout * this[connectionSymbol].reconnectCounter); + } else { + if (typeof reject === 'function' && !promiseAllredyResolved) { + promiseAllredyResolved = true; + reject(error); + } + } + + }; +} + /** * The RestAPI is a class that enables a REST API server. * @@ -85,78 +175,26 @@ class WebSocketDatasource extends Datasource { this[connectionSymbol] = {}; this[connectionSymbol].socket = null; this[connectionSymbol].reconnectCounter = 0; - this[manualCloseSymbol]=false; + this[manualCloseSymbol] = false; } /** * - * @returns {Websocketdatasource} - * @throws {Error} No url defined for websocket datasource. + * @returns {Promise} */ connect() { const self = this; - let connected = false; - let reconnectTimeout = self.getOption('reconnect.timeout'); - if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000; - let reconnectAttempts = self.getOption('reconnect.attempts'); - if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1; - let reconnectEnabled = self.getOption('reconnect.enabled'); - if (reconnectEnabled !== true) reconnectEnabled = false; - - self[manualCloseSymbol] = false; - self[connectionSymbol].reconnectCounter++; - - if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) { - self[connectionSymbol].socket.close(); - } - self[connectionSymbol].socket = null; - - const url = self.getOption('url'); - if (!url) throw new Error('No url defined for websocket datasource.'); - - self[connectionSymbol].socket = new WebSocket(url); - - self[connectionSymbol].socket.onmessage = function (event) { - self[receiveQueueSymbol].add(event); - setTimeout(function () { - self.read(); - }, 0); - }; - - self[connectionSymbol].socket.onopen = function () { - connected = true; - self[connectionSymbol].reconnectCounter = 0; - }; - - self[connectionSymbol].socket.close = function (event) { - - if (self[manualCloseSymbol]) { - self[manualCloseSymbol] = false; - return; - } - - if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) { - setTimeout(() => { - self.connect(); - }, reconnectTimeout * this[connectionSymbol].reconnectCounter); - } - - }; - - self[connectionSymbol].socket.onerror = (error) => { - - if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) { - setTimeout(() => { - self.connect(); - }, reconnectTimeout * this[connectionSymbol].reconnectCounter); - } - - }; + return new Promise((resolve, reject) => { + connectServer.call(this, resolve, reject); + }); } + /** + * @returns {boolean} + */ isConnected() { - return this[connectionSymbol].socket && this[connectionSymbol].socket.readyState === 1; + return this[connectionSymbol]?.socket?.readyState === 1; } /** @@ -169,9 +207,11 @@ class WebSocketDatasource extends Datasource { /** * @property {string} url=undefined Defines the resource that you wish to fetch. - * @property {Number} reconnect.timeout The timeout in milliseconds for the reconnect. - * @property {Number} reconnect.attempts The maximum number of reconnects. - * @property {Bool} reconnect.enabled If the reconnect is enabled. + * @property {Object} connection + * @property {Object} connection.timeout=5000 Defines the timeout for the connection. + * @property {Number} connection.reconnect.timeout The timeout in milliseconds for the reconnect. + * @property {Number} connection.reconnect.attempts The maximum number of reconnects. + * @property {Bool} connection.reconnect.enabled If the reconnect is enabled. * @property {Object} write={} Options * @property {Object} write.mapping the mapping is applied before writing. * @property {String} write.mapping.transformer Transformer to select the appropriate entries @@ -208,21 +248,24 @@ class WebSocketDatasource extends Datasource { callbacks: [] }, }, - reconnect: { - timeout: 1000, - attempts: 10, - enabled: true + connection: { + timeout: 5000, + reconnect: { + timeout: 1000, + attempts: 1, + enabled: false, + } } }); } /** * This method closes the connection. - * + * * @returns {Promise} */ close() { - this[manualCloseSymbol]=true; + this[manualCloseSymbol] = true; if (this[connectionSymbol].socket) { this[connectionSymbol].socket.close(); } @@ -236,9 +279,9 @@ class WebSocketDatasource extends Datasource { const self = this; let response; - if (self[connectionSymbol]?.socket?.readyState!==1) { + if (self[connectionSymbol]?.socket?.readyState !== 1) { return Promise.reject('The connection is not established.'); - } + } return new Promise((resolve, reject) => { if (self[receiveQueueSymbol].isEmpty()) { @@ -246,7 +289,7 @@ class WebSocketDatasource extends Datasource { } while (!self[receiveQueueSymbol].isEmpty()) { - + const event = self[receiveQueueSymbol].poll(); const body = event?.data; if (!body) continue; @@ -291,8 +334,8 @@ class WebSocketDatasource extends Datasource { */ write() { const self = this; - - if (self[connectionSymbol]?.socket?.readyState!==1) { + + if (self[connectionSymbol]?.socket?.readyState !== 1) { return Promise.reject('The connection is not established.'); } @@ -319,7 +362,7 @@ class WebSocketDatasource extends Datasource { } return new Promise((resolve, reject) => { - + if (self[connectionSymbol].socket.readyState !== 1) { reject('the socket is not ready'); } -- GitLab