diff --git a/application/source/data/datasource/websocket.mjs b/application/source/data/datasource/websocket.mjs index 9047ef768d11a8793cb8115df669e5c6da362a58..2413070321c63f4add868f4f727d54daeda088fc 100644 --- a/application/source/data/datasource/websocket.mjs +++ b/application/source/data/datasource/websocket.mjs @@ -6,20 +6,15 @@ */ import {internalSymbol, instanceSymbol} from "../../constants.mjs"; -import {isInteger, isString, isObject} from "../../types/is.mjs"; -import {Queue} from "../../types/queue.mjs"; +import {isString, isObject} from "../../types/is.mjs"; +import {WebConnect} from "../../net/webconnect.mjs"; +import {Message} from "../../net/webconnect/message.mjs"; import {Datasource} from "../datasource.mjs"; import {Pathfinder} from "../pathfinder.mjs"; import {Pipe} from "../pipe.mjs"; export {WebSocketDatasource} -/** - * @private - * @type {Symbol} - */ -const receiveQueueSymbol = Symbol("queue"); - /** * @private @@ -27,122 +22,33 @@ const receiveQueueSymbol = Symbol("queue"); * * hint: this name is used in the tests. if you want to change it, please change it in the tests as well. */ -const connectionSymbol = Symbol("connection"); - -/** - * @private - * @type {symbol} - */ -const manualCloseSymbol = Symbol("manualClose"); - -/** - * @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1 - * @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}} - */ -const connectionStatusCode = { - 1000: "Normal closure", - 1001: "Going away", - 1002: "Protocol error", - 1003: "Unsupported data", - 1004: "Reserved", - 1005: "No status code", - 1006: "Connection closed abnormally", - 1007: "Invalid frame payload data", - 1008: "Policy violation", - 1009: "Message too big", - 1010: "Mandatory extension", - 1011: "Internal server error", - 1015: "TLS handshake" -}; +const webConnectSymbol = Symbol("connection"); /** - * @private - * @this {WebSocketDatasource} - * @throws {Error} No url defined for websocket datasource. + * + * @param self + * @param obj + * @returns {*} */ -function connectServer(resolve, reject) { +function doTransform(type, obj) { const self = this; - - let promiseAllredyResolved = false; - let connectionTimeout = self.getOption('connection.timeout'); - if (!isInteger(connectionTimeout) || connectionTimeout < 100) { - connectionTimeout = 5000; - } - - setTimeout(() => { - if (promiseAllredyResolved) { - return; + let transformation = self.getOption(type + '.mapping.transformer'); + if (transformation !== undefined) { + const pipe = new Pipe(transformation); + const callbacks = self.getOption(type + '.mapping.callbacks') + + if (isObject(callbacks)) { + for (const key in callbacks) { + if (callbacks.hasOwnProperty(key) && typeof callbacks[key] === 'function') { + pipe.setCallback(key, callbacks[key]); + } + } } - reject(new Error("Connection timeout")); - }, connectionTimeout); - - let reconnectTimeout = self.getOption('connection.reconnect.timeout'); - if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000; - let reconnectAttempts = self.getOption('connection.reconnect.attempts'); - if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1; - let reconnectEnabled = self.getOption('connection.reconnect.enabled'); - if (reconnectEnabled !== true) reconnectEnabled = false; - - self[manualCloseSymbol] = false; - self[connectionSymbol].reconnectCounter++; - - if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) { - self[connectionSymbol].socket.close(); - } - self[connectionSymbol].socket = null; - const url = self.getOption('url'); - if (!url) { - reject('No url defined for websocket datasource.'); - return; + obj = pipe.run(obj); } - self[connectionSymbol].socket = new WebSocket(url); - - self[connectionSymbol].socket.onmessage = function (event) { - self[receiveQueueSymbol].add(event); - setTimeout(function () { - self.read(); - }, 1); - }; - - self[connectionSymbol].socket.onopen = function () { - self[connectionSymbol].reconnectCounter = 0; - if (typeof resolve === 'function' && !promiseAllredyResolved) { - promiseAllredyResolved = true; - resolve(); - } - }; - - self[connectionSymbol].socket.close = function (event) { - - if (self[manualCloseSymbol]) { - self[manualCloseSymbol] = false; - return; - } - - if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) { - setTimeout(() => { - self.connect(); - }, reconnectTimeout * this[connectionSymbol].reconnectCounter); - } - - }; - - self[connectionSymbol].socket.onerror = (error) => { - - if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) { - setTimeout(() => { - self.connect(); - }, reconnectTimeout * this[connectionSymbol].reconnectCounter); - } else { - if (typeof reject === 'function' && !promiseAllredyResolved) { - promiseAllredyResolved = true; - reject(error); - } - } - - }; + return obj; } /** @@ -163,6 +69,8 @@ class WebSocketDatasource extends Datasource { */ constructor(options) { super(); + + const self = this; if (isString(options)) { options = {url: options}; @@ -170,12 +78,17 @@ class WebSocketDatasource extends Datasource { if (!isObject(options)) options = {}; this.setOptions(options); - this[receiveQueueSymbol] = new Queue(); - - this[connectionSymbol] = {}; - this[connectionSymbol].socket = null; - this[connectionSymbol].reconnectCounter = 0; - this[manualCloseSymbol] = false; + this[webConnectSymbol] = new WebConnect({ + url: self.getOption('url'), + connection: { + timeout: self.getOption('connection.timeout'), + reconnect: { + timeout: self.getOption('connection.reconnect.timeout'), + attempts: self.getOption('connection.reconnect.attempts'), + enabled: self.getOption('connection.reconnect.enabled') + } + } + }); } /** @@ -183,18 +96,14 @@ class WebSocketDatasource extends Datasource { * @returns {Promise} */ connect() { - const self = this; - - return new Promise((resolve, reject) => { - connectServer.call(this, resolve, reject); - }); + return this[webConnectSymbol].connect(); } /** * @returns {boolean} */ isConnected() { - return this[connectionSymbol]?.socket?.readyState === 1; + return this[webConnectSymbol].isConnected(); } /** @@ -216,12 +125,11 @@ class WebSocketDatasource extends Datasource { * @property {Object} write.mapping the mapping is applied before writing. * @property {String} write.mapping.transformer Transformer to select the appropriate entries * @property {Monster.Data.Datasource~exampleCallback[]} write.mapping.callback with the help of the callback, the structures can be adjusted before writing. - * @property {Object} write.report - * @property {String} write.report.path Path to validations * @property {Object} write.sheathing * @property {Object} write.sheathing.object Object to be wrapped * @property {string} write.sheathing.path Path to the data * @property {Object} read={} Options + * @property {String} read.path Path to data * @property {Object} read.mapping the mapping is applied after reading. * @property {String} read.mapping.transformer Transformer to select the appropriate entries * @property {Monster.Data.Datasource~exampleCallback[]} read.mapping.callback with the help of the callback, the structures can be adjusted after reading. @@ -232,10 +140,7 @@ class WebSocketDatasource extends Datasource { write: { mapping: { transformer: undefined, - callbacks: [] - }, - report: { - path: undefined + callbacks: {} }, sheathing: { object: undefined, @@ -245,8 +150,9 @@ class WebSocketDatasource extends Datasource { read: { mapping: { transformer: undefined, - callbacks: [] + callbacks: {} }, + path: undefined, }, connection: { timeout: 5000, @@ -265,11 +171,7 @@ class WebSocketDatasource extends Datasource { * @returns {Promise} */ close() { - this[manualCloseSymbol] = true; - if (this[connectionSymbol].socket) { - this[connectionSymbol].socket.close(); - } - return this; + return this[webConnectSymbol].close(); } /** @@ -277,99 +179,130 @@ class WebSocketDatasource extends Datasource { */ read() { const self = this; - let response; - - if (self[connectionSymbol]?.socket?.readyState !== 1) { - return Promise.reject('The connection is not established.'); - } return new Promise((resolve, reject) => { - if (self[receiveQueueSymbol].isEmpty()) { - resolve(); - } - while (!self[receiveQueueSymbol].isEmpty()) { - - const event = self[receiveQueueSymbol].poll(); - const body = event?.data; - if (!body) continue; - - let obj; - try { - obj = JSON.parse(body); - } catch (e) { - - let msg = 'the response does not contain a valid json (actual: '; - - if (body.length > 100) { - msg += body.substring(0, 97) + '...'; - } else { - msg += body; - } - - msg += "; " + e.message + ')'; + while (this[webConnectSymbol].dataReceived() === true) { + let obj = this[webConnectSymbol].poll(); + if (!isObject(obj)) { + reject(new Error('The received data is not an object.')); + return; + } - reject(msg); + if (!(obj instanceof Message)) { + reject(new Error('The received data is not a Message.')); + return; } - let transformation = self.getOption('read.mapping.transformer'); - if (transformation !== undefined) { - const pipe = new Pipe(transformation); + obj = obj.getData(); - for (const callback of self.getOption('read.mapping.callbacks')) { - pipe.setCallback(callback.constructor.name, callback); - } + obj = self.transformServerPayload.call(self, obj); + self.set( obj); + } - obj = pipe.run(obj); - } + resolve(self.get()); - self.set(obj); - return response; - } }) - } + + }; + + // const self = this; + // let response; + // + // if (self[webConnectSymbol]?.socket?.readyState !== 1) { + // return Promise.reject('The connection is not established.'); + // } + // + // return new Promise((resolve, reject) => { + // if (self[receiveQueueSymbol].isEmpty()) { + // resolve(); + // } + // + // while (!self[receiveQueueSymbol].isEmpty()) { + // + // const event = self[receiveQueueSymbol].poll(); + // const body = event?.data; + // if (!body) continue; + // + // let obj; + // try { + // obj = JSON.parse(body); + // } catch (e) { + // + // let msg = 'the response does not contain a valid json (actual: '; + // + // if (body.length > 100) { + // msg += body.substring(0, 97) + '...'; + // } else { + // msg += body; + // } + // + // msg += "; " + e.message + ')'; + // + // reject(msg); + // return; + // } + // + // obj = self.transformServerPayload.call(self, obj); + // + // + // self.set(obj); + // return response; + // } + // }) +//} /** - * @return {Promise} + * This prepares the data that comes from the server. + * Should not be called directly. + * + * @private + * @param {Object} payload + * @returns {Object} */ - write() { + transformServerPayload(payload) { const self = this; + payload = doTransform.call(self, 'read', payload); - if (self[connectionSymbol]?.socket?.readyState !== 1) { - return Promise.reject('The connection is not established.'); + const dataPath = self.getOption('read.path'); + if (dataPath) { + payload = (new Pathfinder(payload)).getVia(dataPath); } - let obj = self.get(); - let transformation = self.getOption('write.mapping.transformer'); - if (transformation !== undefined) { - const pipe = new Pipe(transformation); + return payload; + } - for (const callback of self.getOption('write.mapping.callbacks')) { - pipe.setCallback(callback.constructor.name, callback); - } + /** + * This prepares the data for writing and should not be called directly. + * + * @private + * @param {Object} payload + * @returns {Object} + */ + prepareServerPayload(payload) { + const self = this; - obj = pipe.run(obj); - } + payload = doTransform.call(self, 'write', payload); let sheathingObject = self.getOption('write.sheathing.object'); let sheathingPath = self.getOption('write.sheathing.path'); - let reportPath = self.getOption('write.report.path'); if (sheathingObject && sheathingPath) { - const sub = obj; - obj = sheathingObject; - (new Pathfinder(obj)).setVia(sheathingPath, sub); + const sub = payload; + payload = sheathingObject; + (new Pathfinder(payload)).setVia(sheathingPath, sub); } - return new Promise((resolve, reject) => { - - if (self[connectionSymbol].socket.readyState !== 1) { - reject('the socket is not ready'); - } + return payload; + } - self[connectionSymbol].socket.send(JSON.stringify(obj)) - resolve(); - }); + /** + * @return {Promise} + */ + write() { + const self = this; + let obj = self.prepareServerPayload(self.get()); + return self[webConnectSymbol].send(obj) } diff --git a/development/test/cases/data/datasource/websocket.mjs b/development/test/cases/data/datasource/websocket.mjs index 75f7a0495cfc545047d4f98fefa10885fa37fdce..b8a9174c37cedab543c3644fa6d5b6b4cebc972f 100644 --- a/development/test/cases/data/datasource/websocket.mjs +++ b/development/test/cases/data/datasource/websocket.mjs @@ -24,16 +24,22 @@ describe('Websocket', function () { if (ds) { ds.close() } - - // without this, the node test will hang + + // workaround: without this, the node test will hang for (const sym of Object.getOwnPropertySymbols(ds)) { - if (sym.toString() ==='Symbol(connection)') { - if(ds[sym]?.socket?.['terminate']) { - ds[sym]?.socket?.['terminate']() + if (sym.toString() === 'Symbol(connection)') { + const connection = ds[sym] + for (const sym2 of Object.getOwnPropertySymbols(connection)) { + if (sym2.toString() === 'Symbol(connection)') { + const socket = connection[sym2]?.socket; + if (socket) { + socket.terminate() + } + } } } } - + done() }); @@ -41,12 +47,90 @@ describe('Websocket', function () { ds = new WebSocketDatasource(testUrl) const clone = ds.getClone() - + expect(clone).to.be.an.instanceof(WebSocketDatasource) - + + }) + + it('should transform data', function (done) { + + let writeCallbackCalled = false + let readCallbackCalled = false + + ds = new WebSocketDatasource({ + url: testUrl, + write: { + mapping: { + transformer: "call:onWrite", + callbacks: { + onWrite: (data) => { + writeCallbackCalled = true + return data + } + } + }, + sheathing: { + object: { + demo: 1, + data: { + xyz: undefined + } + }, + path: "data.xyz", + }, + }, + read: { + mapping: { + transformer: "call:onRead", + callbacks: { + onRead: (data) => { + readCallbackCalled = true + return data + } + } + }, + path: 'data.xyz', + } + }) + + ds.connect().then(() => { + ds.set({ + envelop: { + message: "Hello World" + } + }) + + ds.write().then(() => { + + ds.set({}) + expect(ds.get()).to.be.deep.equal({}); + + setTimeout(() => { + + + ds.read().then(() => { + expect(ds.get()).to.be.deep.equal({envelop:{message: "Hello World"}}); + expect(writeCallbackCalled).to.be.true + expect(readCallbackCalled).to.be.true + done() + }).catch((e) => { + done(e) + }) + }, 200) + + }).catch((err) => { + done(new Error(err)); + }) + + + }).catch((e) => { + done(e) + }) + + }) - + it('should connect', function (done) { ds = new WebSocketDatasource({ url: testUrl, @@ -54,11 +138,11 @@ describe('Websocket', function () { enabled: false } }); - ds.connect() - setTimeout(() => { - expect(ds.isConnected()).to.be.true; - done(); - }, 500); + ds.connect().then(() => { + done() + }).catch((e) => { + done(e) + }) }) @@ -70,36 +154,37 @@ describe('Websocket', function () { enabled: false } }); - ds.connect() - - ds.set({ - data: { - message: "Hello World" - } - }) - - setTimeout(() => { + ds.connect().then(() => { + ds.set({ + envelop: { + message: "Hello World" + } + }) ds.write().then(() => { ds.set({}) expect(ds.get()).to.be.deep.equal({}); - setTimeout(() => { - expect(ds.get()).to.be.deep.equal({ - data: { - message: "Hello World" - } - }); - done(); - }, 1000); + ds.read().then(() => { + expect(ds.get()).to.be.deep.equal({envelop:{message: "Hello World"}}); + done() + }).catch((e) => { + done(e) + }) + },500) + + }).catch((err) => { done(new Error(err)); }) - }, - 500) + + + }).catch((e) => { + done(e) + }) }).timeout(10000); diff --git a/development/test/cases/net/webconnect.mjs b/development/test/cases/net/webconnect.mjs new file mode 100644 index 0000000000000000000000000000000000000000..e960c205946475746c94dc01d6a303262fca2bde --- /dev/null +++ b/development/test/cases/net/webconnect.mjs @@ -0,0 +1,116 @@ +import {expect} from "chai" +import {WebConnect} from "../../../../application/source/net/webconnect.mjs"; +import {Message} from "../../../../application/source/net/webconnect/message.mjs"; +import {Observer} from "../../../../application/source/types/observer.mjs"; +import {initWebSocket} from "../../util/websocket.mjs"; + +const testUrl = "wss://ws.postman-echo.com/raw" + +describe('Websocket', function () { + + let ds = undefined + + before(function (done) { + initWebSocket().then(() => { + done() + }).catch((e) => { + done(e) + }) + }); + + afterEach(function (done) { + if (ds) { + ds.close() + } + + // without this, the node test will hang + for (const sym of Object.getOwnPropertySymbols(ds)) { + if (sym.toString() === 'Symbol(connection)') { + if (ds[sym]?.socket?.['terminate']) { + ds[sym]?.socket?.['terminate']() + } + } + } + + done() + }); + + + it('should transform data', function (done) { + + ds = new WebConnect( { + url: testUrl, + }) + + ds.connect().then(() => { + + ds.attachObserver(new Observer(()=> { + done() + })) + + ds.send({ + data: { + message: "Hello World" + } + }) + + }).catch((e) => { + done(e) + }) + + + }) + + it('should connect', function (done) { + ds = new WebConnect({ + url: testUrl, + reconnect: { + enabled: false + } + }); + ds.connect().then(() => { + done() + }).catch((e) => { + done(e) + }) + + + }) + + it('should send message', function (done) { + ds = new WebConnect({ + url: testUrl, + reconnect: { + enabled: false + } + }); + ds.connect().then(() => { + + ds.attachObserver(new Observer(()=> { + + expect(ds.dataReceived()).to.be.true + + try { + const msg = ds.poll() + expect(msg).to.be.instanceOf(Message) + const data = msg.getData() + expect(data).to.be.deep.equal({message: "Hello World"}) + } catch (e) { + done(e) + return + } + done() + })) + + ds.send({ + message: "Hello World" + }) + + }).catch((e) => { + done(e) + }) + + + }).timeout(10000); + +}); diff --git a/development/test/cases/net/webconnect/message.mjs b/development/test/cases/net/webconnect/message.mjs new file mode 100644 index 0000000000000000000000000000000000000000..68ae0a14500dc0c9dd1a5dd31b17abb09e788f07 --- /dev/null +++ b/development/test/cases/net/webconnect/message.mjs @@ -0,0 +1,50 @@ +import {expect} from "chai" +import {Message} from "../../../../../application/source/net/webconnect/message.mjs"; + +describe('Message', function () { + + it('construct withouth parameters should throw', function (done) { + + try { + new Message(); + done(new Error('should throw')); + } catch (e) { + done(); + return; + } + + }) + + it('from json should ' , function (done) { + const json = { + "id": "123", + "type": "test", + "data": { + "test": "test" + } + } + const message = Message.fromJSON(JSON.stringify(json)); + const data = message.getData(); + expect(data.id).to.equal(json.id); + expect(data.type).to.equal(json.type); + expect(data.data).to.deep.equal(json.data); + done(); + }) + + it ("to json should", function (done) { + const obj = { + "id": "123", + "type": "test", + "data": { + "test": "test" + } + } + const message = new Message(obj); + const data = JSON.stringify(message); + expect(data).to.equal('{"id":"123","type":"test","data":{"test":"test"}}'); + done(); + }) + + + +}); diff --git a/development/test/cases/types/observablequeue.mjs b/development/test/cases/types/observablequeue.mjs new file mode 100644 index 0000000000000000000000000000000000000000..be1486b7982a6dd782194d741797eb7b12c3a61f --- /dev/null +++ b/development/test/cases/types/observablequeue.mjs @@ -0,0 +1,17 @@ +import {expect} from "chai" +import {ObservableQueue} from "../../../../application/source/types/observablequeue.mjs"; +import {Observer} from "../../../../application/source/types/observer.mjs"; + +describe('ObservableQueue', function () { + describe('Observer', function () { + + it('should notify', function (done) { + let queue = new ObservableQueue; + let o = new Observer((q) => { + done() + }); + queue.attachObserver(o); + expect(queue.add('a')).to.be.instanceOf(ObservableQueue); + }); + }); +}) \ No newline at end of file diff --git a/development/test/cases/types/queue.mjs b/development/test/cases/types/queue.mjs index 3d25da2e42e651fc3362a8fe11433f1eb75d730c..5ac5d7ce09907cb47d9b0f71129a8225c3645666 100644 --- a/development/test/cases/types/queue.mjs +++ b/development/test/cases/types/queue.mjs @@ -30,7 +30,7 @@ describe('Queue', function () { }); }) - + describe('add and clear', function () { it('should empty', function () { @@ -42,4 +42,7 @@ describe('Queue', function () { }); }) + + + }) \ No newline at end of file