Skip to content
Snippets Groups Projects
Verified Commit 52eb6caa authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: new Webconnect and Message

parent 92a4d55c
No related branches found
No related tags found
No related merge requests found
/**
* Copyright schukai GmbH and contributors 2022. All Rights Reserved.
* Node module: @schukai/monster
* This file is licensed under the AGPLv3 License.
* License text available at https://www.gnu.org/licenses/agpl-3.0.en.html
*/
import {instanceSymbol} from "../constants.mjs";
import {isInteger, isString, isObject} from "../types/is.mjs";
import {BaseWithOptions} from "../types/basewithoptions.mjs";
import {ObservableQueue} from "../types/observablequeue.mjs";
import {Message} from "./webconnect/message.mjs";
export {WebConnect}
/**
* @private
* @type {Symbol}
*/
const receiveQueueSymbol = Symbol("receiveQueue");
/**
* @private
* @type {Symbol}
*/
const sendQueueSymbol = Symbol("sendQueue");
/**
* @private
* @type {Symbol}
*
* hint: this name is used in the tests. if you want to change it, please change it in the tests as well.
*/
const connectionSymbol = Symbol("connection");
/**
* @private
* @type {symbol}
*/
const manualCloseSymbol = Symbol("manualClose");
/**
* @private
* @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
* @type {{"1000": string, "1011": string, "1010": string, "1008": string, "1007": string, "1006": string, "1005": string, "1004": string, "1015": string, "1003": string, "1002": string, "1001": string, "1009": string}}
*/
const connectionStatusCode = {
1000: "Normal closure",
1001: "Going away",
1002: "Protocol error",
1003: "Unsupported data",
1004: "Reserved",
1005: "No status code",
1006: "Connection closed abnormally",
1007: "Invalid frame payload data",
1008: "Policy violation",
1009: "Message too big",
1010: "Mandatory extension",
1011: "Internal server error",
1015: "TLS handshake"
};
/**
* @private
* @this {WebConnect}
* @throws {Error} No url defined for websocket datasource.
*/
function connectServer(resolve, reject) {
const self = this;
const url = self.getOption('url');
if (!url) {
reject('No url defined for webconnect.');
return;
}
let promiseAllredyResolved = false;
let connectionTimeout = self.getOption('connection.timeout');
if (!isInteger(connectionTimeout) || connectionTimeout < 100) {
connectionTimeout = 5000;
}
setTimeout(() => {
if (promiseAllredyResolved) {
return;
}
reject(new Error("Connection timeout"));
}, connectionTimeout);
let reconnectTimeout = self.getOption('connection.reconnect.timeout');
if (!isInteger(reconnectTimeout) || reconnectTimeout < 1000) reconnectTimeout = 1000;
let reconnectAttempts = self.getOption('connection.reconnect.attempts');
if (!isInteger(reconnectAttempts) || reconnectAttempts < 1) reconnectAttempts = 1;
let reconnectEnabled = self.getOption('connection.reconnect.enabled');
if (reconnectEnabled !== true) reconnectEnabled = false;
self[manualCloseSymbol] = false;
self[connectionSymbol].reconnectCounter++;
if (self[connectionSymbol].socket && self[connectionSymbol].socket.readyState < 2) {
self[connectionSymbol].socket.close();
}
self[connectionSymbol].socket = null;
self[connectionSymbol].socket = new WebSocket(url);
self[connectionSymbol].socket.onmessage = function (event) {
if (event.data instanceof Blob) {
const reader = new FileReader();
reader.addEventListener("loadend", function () {
self[receiveQueueSymbol].add(new Message(reader.result))
});
reader.readAsText(new Message(event.data));
} else {
self[receiveQueueSymbol].add(Message.fromJSON(event.data));
}
};
self[connectionSymbol].socket.onopen = function () {
self[connectionSymbol].reconnectCounter = 0;
if (typeof resolve === 'function' && !promiseAllredyResolved) {
promiseAllredyResolved = true;
resolve();
}
};
self[connectionSymbol].socket.close = function (event) {
if (self[manualCloseSymbol]) {
self[manualCloseSymbol] = false;
return;
}
if (reconnectEnabled && this[connectionSymbol].reconnectCounter < reconnectAttempts) {
setTimeout(() => {
self.connect();
}, reconnectTimeout * this[connectionSymbol].reconnectCounter);
}
};
self[connectionSymbol].socket.onerror = (error) => {
if (reconnectEnabled && self[connectionSymbol].reconnectCounter < reconnectAttempts) {
setTimeout(() => {
self.connect();
}, reconnectTimeout * this[connectionSymbol].reconnectCounter);
} else {
if (typeof reject === 'function' && !promiseAllredyResolved) {
promiseAllredyResolved = true;
reject(error);
}
}
};
}
/**
* The RestAPI is a class that enables a REST API server.
*
* @externalExample ../../../example/data/storage/restapi.mjs
* @license AGPLv3
* @since 3.1.0
* @copyright schukai GmbH
* @memberOf Monster.Data.Datasource
* @summary The LocalStorage class encapsulates the access to data objects.
*/
class WebConnect extends BaseWithOptions {
/**
*
* @param {Object} [options] options contains definitions for the datasource.
*/
constructor(options) {
if (isString(options)) {
options = {url: options};
}
super(options);
this[receiveQueueSymbol] = new ObservableQueue();
this[sendQueueSymbol] = new ObservableQueue();
this[connectionSymbol] = {};
this[connectionSymbol].socket = null;
this[connectionSymbol].reconnectCounter = 0;
this[manualCloseSymbol] = false;
}
/**
*
* @returns {Promise}
*/
connect() {
const self = this;
return new Promise((resolve, reject) => {
connectServer.call(this, resolve, reject);
});
}
/**
* @returns {boolean}
*/
isConnected() {
return this[connectionSymbol]?.socket?.readyState === 1;
}
/**
* This method is called by the `instanceof` operator.
* @returns {symbol}
*/
static get [instanceSymbol]() {
return Symbol.for("@schukai/monster/net/webconnect");
}
/**
* @property {string} url=undefined Defines the resource that you wish to fetch.
* @property {Object} connection
* @property {Object} connection.timeout=5000 Defines the timeout for the connection.
* @property {Number} connection.reconnect.timeout The timeout in milliseconds for the reconnect.
* @property {Number} connection.reconnect.attempts The maximum number of reconnects.
* @property {Bool} connection.reconnect.enabled If the reconnect is enabled.
*/
get defaults() {
return Object.assign({}, super.defaults, {
url: undefined,
connection: {
timeout: 5000,
reconnect: {
timeout: 1000,
attempts: 1,
enabled: false,
}
}
});
}
/**
* This method closes the connection.
*
* @param {Number} [code=1000] The close code.
* @param {String} [reason=""] The close reason.
* @returns {Promise}
* @see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
*/
close(statusCode, reason ) {
if (!isInteger(statusCode) || statusCode < 1000 || statusCode > 4999) {
statusCode = 1000;
}
if (!isString(reason)) {
reason = '';
}
return new Promise((resolve, reject) => {
try {
this[manualCloseSymbol] = true;
if (this[connectionSymbol].socket) {
this[connectionSymbol].socket.close(statusCode, reason);
}
} catch (error) {
reject(error);
}
resolve();
});
}
/**
* Polls the receive queue for new messages.
*
* @returns {Message}
*/
poll() {
return this[receiveQueueSymbol].poll();
}
/**
* Are there any messages in the receive queue?
*
* @returns {boolean}
*/
dataReceived() {
return !this[receiveQueueSymbol].isEmpty();
}
/**
* Get Message from the receive queue, but do not remove it.
*
* @returns {Object}
*/
peek() {
return this[receiveQueueSymbol].peek();
}
/**
* Attach a new observer
*
* @param {Observer} observer
* @returns {ProxyObserver}
*/
attachObserver(observer) {
this[receiveQueueSymbol].attachObserver(observer);
return this;
}
/**
* Detach a observer
*
* @param {Observer} observer
* @returns {ProxyObserver}
*/
detachObserver(observer) {
this[receiveQueueSymbol].detachObserver(observer);
return this;
}
/**
* @param {Observer} observer
* @returns {boolean}
*/
containsObserver(observer) {
return this[receiveQueueSymbol].containsObserver(observer);
}
/**
* @param {Message|Object} message
* @return {Promise}
*/
send(message) {
const self = this;
return new Promise((resolve, reject) => {
if (self[connectionSymbol].socket.readyState !== 1) {
reject('the socket is not ready');
}
self[connectionSymbol].socket.send(JSON.stringify(message))
resolve();
});
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment