Implement an actually sane queue system for WSS messaging, with re-connect baked in.

This commit is contained in:
Herbert Wolverson
2023-07-12 20:22:05 +00:00
parent 5afbd1bff5
commit 9d5f9a3e73
12 changed files with 311 additions and 260 deletions

View File

@@ -10,4 +10,4 @@ cp ../../site_build/output/* .
cp ../../site_build/src/main.html .
cp ../../site_build/wasm/wasm_pipe_bg.wasm .
popd
RUST_LOG=info RUST_BACKTRACE=1 cargo run --release
RUST_LOG=info RUST_BACKTRACE=1 cargo run

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -3,7 +3,6 @@ import 'bootstrap/dist/js/bootstrap.js';
import { SiteRouter } from './router';
import { Bus, onAuthFail, onAuthOk, onMessage } from './bus';
import { Auth } from './auth';
import init from '../wasm/wasm_pipe.js';
await init();
@@ -38,6 +37,7 @@ window.changeGraphPeriod = (period: string) => changeGraphPeriod(period);
// 10 Second interval for refreshing the page
window.setInterval(() => {
window.bus.updateConnected();
window.router.ontick();
let btn = document.getElementById("graphPeriodBtn") as HTMLButtonElement;
btn.innerText = window.graphPeriod;
@@ -46,6 +46,7 @@ window.setInterval(() => {
// Faster interval for tracking the WSS connection
window.setInterval(() => {
window.bus.updateConnected();
window.bus.sendQueue();
}, 500);
function changeGraphPeriod(period: string) {

View File

@@ -1,4 +1,4 @@
import { connect_wasm_pipe, is_wasm_connected } from "../wasm/wasm_pipe";
import { connect_wasm_pipe, is_wasm_connected, send_wss_queue } from "../wasm/wasm_pipe";
import { Auth } from "./auth";
import { SiteRouter } from "./router";
@@ -23,6 +23,10 @@ export class Bus {
}
}
sendQueue() {
send_wss_queue();
}
connect() {
const currentUrlWithoutAnchors = window.location.href.split('#')[0].replace("https://", "").replace("http://", "");
const url = "ws://" + currentUrlWithoutAnchors + "ws";
@@ -126,7 +130,7 @@ export class Bus {
function retryConnect() {
if (!window.bus.connected) {
window.bus.connect();
//window.bus.connect();
}
}

View File

@@ -9,6 +9,9 @@ export function connect_wasm_pipe(url: string): void;
*/
export function is_wasm_connected(): boolean;
/**
*/
export function send_wss_queue(): void;
/**
* @param {string} token
*/
export function send_token(token: string): void;
@@ -139,6 +142,7 @@ export interface InitOutput {
readonly memory: WebAssembly.Memory;
readonly connect_wasm_pipe: (a: number, b: number) => void;
readonly is_wasm_connected: () => number;
readonly send_wss_queue: () => void;
readonly send_token: (a: number, b: number) => void;
readonly send_login: (a: number, b: number, c: number, d: number, e: number, f: number) => void;
readonly request_node_status: () => void;
@@ -169,7 +173,8 @@ export interface InitOutput {
readonly __wbindgen_malloc: (a: number) => number;
readonly __wbindgen_realloc: (a: number, b: number, c: number) => number;
readonly __wbindgen_export_2: WebAssembly.Table;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h66d1b1d022c5cd85: (a: number, b: number, c: number) => void;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__hb4b341652e081e3f: (a: number, b: number, c: number) => void;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__ha318d2d73313995c: (a: number, b: number, c: number) => void;
readonly __wbindgen_free: (a: number, b: number) => void;
readonly __wbindgen_exn_store: (a: number) => void;
}

View File

@@ -1,11 +1,52 @@
let wasm;
const cachedTextDecoder = (typeof TextDecoder !== 'undefined' ? new TextDecoder('utf-8', { ignoreBOM: true, fatal: true }) : { decode: () => { throw Error('TextDecoder not available') } } );
if (typeof TextDecoder !== 'undefined') { cachedTextDecoder.decode(); };
let cachedUint8Memory0 = null;
function getUint8Memory0() {
if (cachedUint8Memory0 === null || cachedUint8Memory0.byteLength === 0) {
cachedUint8Memory0 = new Uint8Array(wasm.memory.buffer);
}
return cachedUint8Memory0;
}
function getStringFromWasm0(ptr, len) {
ptr = ptr >>> 0;
return cachedTextDecoder.decode(getUint8Memory0().subarray(ptr, ptr + len));
}
const heap = new Array(128).fill(undefined);
heap.push(undefined, null, true, false);
let heap_next = heap.length;
function addHeapObject(obj) {
if (heap_next === heap.length) heap.push(heap.length + 1);
const idx = heap_next;
heap_next = heap[idx];
heap[idx] = obj;
return idx;
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 132) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
function debugString(val) {
// primitive types
const type = typeof val;
@@ -73,15 +114,6 @@ function debugString(val) {
let WASM_VECTOR_LEN = 0;
let cachedUint8Memory0 = null;
function getUint8Memory0() {
if (cachedUint8Memory0 === null || cachedUint8Memory0.byteLength === 0) {
cachedUint8Memory0 = new Uint8Array(wasm.memory.buffer);
}
return cachedUint8Memory0;
}
const cachedTextEncoder = (typeof TextEncoder !== 'undefined' ? new TextEncoder('utf-8') : { encode: () => { throw Error('TextEncoder not available') } } );
const encodeString = (typeof cachedTextEncoder.encodeInto === 'function'
@@ -144,38 +176,6 @@ function getInt32Memory0() {
return cachedInt32Memory0;
}
const cachedTextDecoder = (typeof TextDecoder !== 'undefined' ? new TextDecoder('utf-8', { ignoreBOM: true, fatal: true }) : { decode: () => { throw Error('TextDecoder not available') } } );
if (typeof TextDecoder !== 'undefined') { cachedTextDecoder.decode(); };
function getStringFromWasm0(ptr, len) {
ptr = ptr >>> 0;
return cachedTextDecoder.decode(getUint8Memory0().subarray(ptr, ptr + len));
}
let heap_next = heap.length;
function dropObject(idx) {
if (idx < 132) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
function addHeapObject(obj) {
if (heap_next === heap.length) heap.push(heap.length + 1);
const idx = heap_next;
heap_next = heap[idx];
heap[idx] = obj;
return idx;
}
function makeMutClosure(arg0, arg1, dtor, f) {
const state = { a: arg0, b: arg1, cnt: 1, dtor };
const real = (...args) => {
@@ -201,7 +201,11 @@ function makeMutClosure(arg0, arg1, dtor, f) {
return real;
}
function __wbg_adapter_12(arg0, arg1, arg2) {
wasm._dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h66d1b1d022c5cd85(arg0, arg1, addHeapObject(arg2));
wasm._dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__hb4b341652e081e3f(arg0, arg1, addHeapObject(arg2));
}
function __wbg_adapter_15(arg0, arg1, arg2) {
wasm._dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__ha318d2d73313995c(arg0, arg1, addHeapObject(arg2));
}
function notDefined(what) { return () => { throw new Error(`${what} is not defined`); }; }
@@ -222,6 +226,12 @@ export function is_wasm_connected() {
return ret !== 0;
}
/**
*/
export function send_wss_queue() {
wasm.send_wss_queue();
}
/**
* @param {string} token
*/
@@ -560,22 +570,9 @@ async function __wbg_load(module, imports) {
function __wbg_get_imports() {
const imports = {};
imports.wbg = {};
imports.wbg.__wbindgen_debug_string = function(arg0, arg1) {
const ret = debugString(getObject(arg1));
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len1 = WASM_VECTOR_LEN;
getInt32Memory0()[arg0 / 4 + 1] = len1;
getInt32Memory0()[arg0 / 4 + 0] = ptr1;
};
imports.wbg.__wbindgen_throw = function(arg0, arg1) {
throw new Error(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbg_log_cd48b3599daf93ee = function(arg0, arg1) {
console.log(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbindgen_object_drop_ref = function(arg0) {
takeObject(arg0);
};
imports.wbg.__wbg_windowbusgetToken_eab6ac8f06d69af2 = function(arg0) {
const ret = window.bus.getToken();
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
@@ -583,9 +580,65 @@ function __wbg_get_imports() {
getInt32Memory0()[arg0 / 4 + 1] = len1;
getInt32Memory0()[arg0 / 4 + 0] = ptr1;
};
imports.wbg.__wbg_windowonAuthOk_9cd9fb8f74884ca4 = function(arg0, arg1, arg2, arg3, arg4, arg5) {
let deferred0_0;
let deferred0_1;
let deferred1_0;
let deferred1_1;
let deferred2_0;
let deferred2_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
deferred1_0 = arg2;
deferred1_1 = arg3;
deferred2_0 = arg4;
deferred2_1 = arg5;
window.onAuthOk(getStringFromWasm0(arg0, arg1), getStringFromWasm0(arg2, arg3), getStringFromWasm0(arg4, arg5));
} finally {
wasm.__wbindgen_free(deferred0_0, deferred0_1);
wasm.__wbindgen_free(deferred1_0, deferred1_1);
wasm.__wbindgen_free(deferred2_0, deferred2_1);
}
};
imports.wbg.__wbg_windowonAuthFail_ddfdfcd594ff15b8 = typeof window.onAuthFail == 'function' ? window.onAuthFail : notDefined('window.onAuthFail');
imports.wbg.__wbg_windowonMessage_5c5b80d5376153dc = function(arg0, arg1) {
let deferred0_0;
let deferred0_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
window.onMessage(getStringFromWasm0(arg0, arg1));
} finally {
wasm.__wbindgen_free(deferred0_0, deferred0_1);
}
};
imports.wbg.__wbg_setonopen_6fd8b28538150568 = function(arg0, arg1) {
getObject(arg0).onopen = getObject(arg1);
};
imports.wbg.__wbg_setonerror_9f7532626d7a9ce2 = function(arg0, arg1) {
getObject(arg0).onerror = getObject(arg1);
};
imports.wbg.__wbg_setonclose_6b22bc5d93628786 = function(arg0, arg1) {
getObject(arg0).onclose = getObject(arg1);
};
imports.wbg.__wbg_setonmessage_493b82147081ec7e = function(arg0, arg1) {
getObject(arg0).onmessage = getObject(arg1);
};
imports.wbg.__wbg_setbinaryType_2e2320b177c86b17 = function(arg0, arg1) {
getObject(arg0).binaryType = takeObject(arg1);
};
imports.wbg.__wbg_new_39e958ac9d5cae7d = function() { return handleError(function (arg0, arg1) {
const ret = new WebSocket(getStringFromWasm0(arg0, arg1));
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbg_send_737fddb36434277e = function() { return handleError(function (arg0, arg1, arg2) {
getObject(arg0).send(getArrayU8FromWasm0(arg1, arg2));
}, arguments) };
imports.wbg.__wbindgen_string_new = function(arg0, arg1) {
const ret = getStringFromWasm0(arg0, arg1);
return addHeapObject(ret);
};
imports.wbg.__wbg_data_ef47af9c565d228b = function(arg0) {
const ret = getObject(arg0).data;
return addHeapObject(ret);
@@ -608,79 +661,36 @@ function __wbg_get_imports() {
const ret = getObject(arg0).length;
return ret;
};
imports.wbg.__wbindgen_memory = function() {
const ret = wasm.memory;
return addHeapObject(ret);
imports.wbg.__wbg_set_4b3aa8445ac1e91c = function(arg0, arg1, arg2) {
getObject(arg0).set(getObject(arg1), arg2 >>> 0);
};
imports.wbg.__wbindgen_object_drop_ref = function(arg0) {
takeObject(arg0);
};
imports.wbg.__wbg_buffer_fcbfb6d88b2732e9 = function(arg0) {
const ret = getObject(arg0).buffer;
return addHeapObject(ret);
};
imports.wbg.__wbg_set_4b3aa8445ac1e91c = function(arg0, arg1, arg2) {
getObject(arg0).set(getObject(arg1), arg2 >>> 0);
imports.wbg.__wbindgen_debug_string = function(arg0, arg1) {
const ret = debugString(getObject(arg1));
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len1 = WASM_VECTOR_LEN;
getInt32Memory0()[arg0 / 4 + 1] = len1;
getInt32Memory0()[arg0 / 4 + 0] = ptr1;
};
imports.wbg.__wbg_windowonMessage_5c5b80d5376153dc = function(arg0, arg1) {
let deferred0_0;
let deferred0_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
window.onMessage(getStringFromWasm0(arg0, arg1));
} finally {
wasm.__wbindgen_free(deferred0_0, deferred0_1);
}
imports.wbg.__wbindgen_throw = function(arg0, arg1) {
throw new Error(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbg_windowonAuthFail_ddfdfcd594ff15b8 = typeof window.onAuthFail == 'function' ? window.onAuthFail : notDefined('window.onAuthFail');
imports.wbg.__wbg_windowonAuthOk_9cd9fb8f74884ca4 = function(arg0, arg1, arg2, arg3, arg4, arg5) {
let deferred0_0;
let deferred0_1;
let deferred1_0;
let deferred1_1;
let deferred2_0;
let deferred2_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
deferred1_0 = arg2;
deferred1_1 = arg3;
deferred2_0 = arg4;
deferred2_1 = arg5;
window.onAuthOk(getStringFromWasm0(arg0, arg1), getStringFromWasm0(arg2, arg3), getStringFromWasm0(arg4, arg5));
} finally {
wasm.__wbindgen_free(deferred0_0, deferred0_1);
wasm.__wbindgen_free(deferred1_0, deferred1_1);
wasm.__wbindgen_free(deferred2_0, deferred2_1);
}
};
imports.wbg.__wbg_new_39e958ac9d5cae7d = function() { return handleError(function (arg0, arg1) {
const ret = new WebSocket(getStringFromWasm0(arg0, arg1));
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbindgen_string_new = function(arg0, arg1) {
const ret = getStringFromWasm0(arg0, arg1);
imports.wbg.__wbindgen_memory = function() {
const ret = wasm.memory;
return addHeapObject(ret);
};
imports.wbg.__wbg_setbinaryType_2e2320b177c86b17 = function(arg0, arg1) {
getObject(arg0).binaryType = takeObject(arg1);
};
imports.wbg.__wbg_setonmessage_493b82147081ec7e = function(arg0, arg1) {
getObject(arg0).onmessage = getObject(arg1);
};
imports.wbg.__wbg_setonclose_6b22bc5d93628786 = function(arg0, arg1) {
getObject(arg0).onclose = getObject(arg1);
};
imports.wbg.__wbg_setonerror_9f7532626d7a9ce2 = function(arg0, arg1) {
getObject(arg0).onerror = getObject(arg1);
};
imports.wbg.__wbg_setonopen_6fd8b28538150568 = function(arg0, arg1) {
getObject(arg0).onopen = getObject(arg1);
};
imports.wbg.__wbindgen_closure_wrapper1901 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 70, __wbg_adapter_12);
imports.wbg.__wbindgen_closure_wrapper440 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 89, __wbg_adapter_12);
return addHeapObject(ret);
};
imports.wbg.__wbindgen_closure_wrapper1902 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 70, __wbg_adapter_12);
imports.wbg.__wbindgen_closure_wrapper442 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 87, __wbg_adapter_15);
return addHeapObject(ret);
};

View File

@@ -3,6 +3,7 @@
export const memory: WebAssembly.Memory;
export function connect_wasm_pipe(a: number, b: number): void;
export function is_wasm_connected(): number;
export function send_wss_queue(): void;
export function send_token(a: number, b: number): void;
export function send_login(a: number, b: number, c: number, d: number, e: number, f: number): void;
export function request_node_status(): void;
@@ -33,6 +34,7 @@ export function request_ext_capacity_graph(a: number, b: number, c: number, d: n
export function __wbindgen_malloc(a: number): number;
export function __wbindgen_realloc(a: number, b: number, c: number): number;
export const __wbindgen_export_2: WebAssembly.Table;
export function _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h66d1b1d022c5cd85(a: number, b: number, c: number): void;
export function _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__hb4b341652e081e3f(a: number, b: number, c: number): void;
export function _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__ha318d2d73313995c(a: number, b: number, c: number): void;
export function __wbindgen_free(a: number, b: number): void;
export function __wbindgen_exn_store(a: number): void;

View File

@@ -1,5 +1,5 @@
#!/bin/bash
cargo build --target wasm32-unknown-unknown --release
wasm-bindgen --target web --out-dir staging/ ../../target/wasm32-unknown-unknown/release/wasm_pipe.wasm
cargo build --target wasm32-unknown-unknown
wasm-bindgen --target web --out-dir staging/ ../../target/wasm32-unknown-unknown/debug/wasm_pipe.wasm
cp staging/* ../site_build/wasm
cp staging/wasm_pipe_bg.wasm ../lts_node/web

View File

@@ -1,3 +1,5 @@
use std::collections::VecDeque;
use wasm_bindgen::prelude::*;
use wasm_pipe_types::{WasmRequest, WasmResponse};
use web_sys::{BinaryType, ErrorEvent, MessageEvent, WebSocket};
@@ -22,6 +24,8 @@ extern "C" {
static mut CONNECTED: bool = false;
static mut WS: Option<WebSocket> = None;
static mut QUEUE: VecDeque<Vec<u8>> = VecDeque::new();
static mut URL: String = String::new();
#[wasm_bindgen]
pub fn connect_wasm_pipe(url: String) {
@@ -30,6 +34,9 @@ pub fn connect_wasm_pipe(url: String) {
log("Already connected");
return;
}
if !url.is_empty() {
URL = url.clone();
}
WS = Some(WebSocket::new(&url).unwrap());
if let Some(ws) = &mut WS {
ws.set_binary_type(BinaryType::Arraybuffer);
@@ -91,6 +98,30 @@ pub fn is_wasm_connected() -> bool {
unsafe { CONNECTED && WS.is_some() }
}
#[wasm_bindgen]
pub fn send_wss_queue() {
//log("Call to send queue");
unsafe {
// Bail out if there's nothing to do
if QUEUE.is_empty() {
//log("Queue is empty");
return;
}
// Send queued messages
if let Some(ws) = &mut WS {
while let Some(msg) = QUEUE.pop_front() {
log(&format!("Sending message: {msg:?}"));
ws.send_with_u8_array(&msg).unwrap();
}
} else {
log("No WebSocket connection");
CONNECTED = false;
connect_wasm_pipe(String::new());
}
}
}
fn build_message(msg: WasmRequest) -> Vec<u8> {
let cbor = serde_cbor::to_vec(&msg).unwrap();
miniz_oxide::deflate::compress_to_vec(&cbor, 8)
@@ -100,9 +131,7 @@ fn send_message(msg: WasmRequest) {
log(&format!("Sending message: {msg:?}"));
let msg = build_message(msg);
unsafe {
if let Some(ws) = &mut WS {
ws.send_with_u8_array(&msg).unwrap();
}
QUEUE.push_back(msg);
}
}