Add time and expiration of old messages to avoid a flood on reconnect.

This commit is contained in:
Herbert Wolverson
2023-07-28 15:23:15 +00:00
parent 0590d3055f
commit 19555bd6e4
10 changed files with 253 additions and 184 deletions

12
src/rust/Cargo.lock generated
View File

@@ -4574,6 +4574,7 @@ dependencies = [
"wasm-bindgen",
"wasm_pipe_types",
"web-sys",
"web-time",
]
[[package]]
@@ -4594,6 +4595,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19353897b48e2c4d849a2d73cb0aeb16dc2be4e00c565abfc11eb65a806e47de"
dependencies = [
"js-sys",
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"

View File

@@ -10,4 +10,4 @@ cp ../../site_build/output/* .
cp ../../site_build/src/main.html .
cp ../../site_build/wasm/wasm_pipe_bg.wasm .
popd
RUST_LOG=info RUST_BACKTRACE=1 cargo run --release
RUST_LOG=info RUST_BACKTRACE=1 cargo run

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,5 +1,25 @@
let wasm;
const heap = new Array(128).fill(undefined);
heap.push(undefined, null, true, false);
function getObject(idx) { return heap[idx]; }
let heap_next = heap.length;
function dropObject(idx) {
if (idx < 132) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
const cachedTextDecoder = (typeof TextDecoder !== 'undefined' ? new TextDecoder('utf-8', { ignoreBOM: true, fatal: true }) : { decode: () => { throw Error('TextDecoder not available') } } );
if (typeof TextDecoder !== 'undefined') { cachedTextDecoder.decode(); };
@@ -18,12 +38,6 @@ function getStringFromWasm0(ptr, len) {
return cachedTextDecoder.decode(getUint8Memory0().subarray(ptr, ptr + len));
}
const heap = new Array(128).fill(undefined);
heap.push(undefined, null, true, false);
let heap_next = heap.length;
function addHeapObject(obj) {
if (heap_next === heap.length) heap.push(heap.length + 1);
const idx = heap_next;
@@ -33,20 +47,6 @@ function addHeapObject(obj) {
return idx;
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 132) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
function debugString(val) {
// primitive types
const type = typeof val;
@@ -200,7 +200,7 @@ function makeMutClosure(arg0, arg1, dtor, f) {
return real;
}
function __wbg_adapter_12(arg0, arg1, arg2) {
function __wbg_adapter_16(arg0, arg1, arg2) {
wasm.__wbindgen_export_3(arg0, arg1, addHeapObject(arg2));
}
@@ -575,6 +575,9 @@ async function __wbg_load(module, imports) {
function __wbg_get_imports() {
const imports = {};
imports.wbg = {};
imports.wbg.__wbindgen_object_drop_ref = function(arg0) {
takeObject(arg0);
};
imports.wbg.__wbg_new_39e958ac9d5cae7d = function() { return handleError(function (arg0, arg1) {
const ret = new WebSocket(getStringFromWasm0(arg0, arg1));
return addHeapObject(ret);
@@ -677,9 +680,34 @@ function __wbg_get_imports() {
imports.wbg.__wbg_send_737fddb36434277e = function() { return handleError(function (arg0, arg1, arg2) {
getObject(arg0).send(getArrayU8FromWasm0(arg1, arg2));
}, arguments) };
imports.wbg.__wbindgen_object_drop_ref = function(arg0) {
takeObject(arg0);
imports.wbg.__wbg_self_b9aad7f1c618bfaf = function() { return handleError(function () {
const ret = self.self;
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbg_window_55e469842c98b086 = function() { return handleError(function () {
const ret = window.window;
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbg_globalThis_d0957e302752547e = function() { return handleError(function () {
const ret = globalThis.globalThis;
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbg_global_ae2f87312b8987fb = function() { return handleError(function () {
const ret = global.global;
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbindgen_is_undefined = function(arg0) {
const ret = getObject(arg0) === undefined;
return ret;
};
imports.wbg.__wbg_newnoargs_e643855c6572a4a8 = function(arg0, arg1) {
const ret = new Function(getStringFromWasm0(arg0, arg1));
return addHeapObject(ret);
};
imports.wbg.__wbg_call_f96b398515635514 = function() { return handleError(function (arg0, arg1) {
const ret = getObject(arg0).call(getObject(arg1));
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbindgen_debug_string = function(arg0, arg1) {
const ret = debugString(getObject(arg1));
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
@@ -690,12 +718,24 @@ function __wbg_get_imports() {
imports.wbg.__wbindgen_throw = function(arg0, arg1) {
throw new Error(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbindgen_closure_wrapper1471 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_12);
imports.wbg.__wbindgen_object_clone_ref = function(arg0) {
const ret = getObject(arg0);
return addHeapObject(ret);
};
imports.wbg.__wbindgen_closure_wrapper1473 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_12);
imports.wbg.__wbg_performance_e5eb61626698b7a4 = function(arg0) {
const ret = getObject(arg0).performance;
return addHeapObject(ret);
};
imports.wbg.__wbg_now_928014da6271b17c = function(arg0) {
const ret = getObject(arg0).now();
return ret;
};
imports.wbg.__wbindgen_closure_wrapper1481 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_16);
return addHeapObject(ret);
};
imports.wbg.__wbindgen_closure_wrapper1483 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_16);
return addHeapObject(ret);
};

View File

@@ -14,6 +14,7 @@ serde_cbor = "0" # For RFC8949/7409 format C binary objects
wasm_pipe_types = { path = "../wasm_pipe_types" }
serde_json = "1.0.96"
thiserror = "1.0.44"
web-time = "0.2"
[dependencies.web-sys]
version = "0.3.22"

View File

@@ -83,7 +83,7 @@ struct Conduit {
status: ConnectionStatus,
socket: Option<WebSocket>,
url: String,
message_queue: VecDeque<Vec<u8>>,
message_queue: VecDeque<WsRequestMessage>,
}
impl Conduit {
@@ -169,11 +169,7 @@ impl Conduit {
}
fn enqueue_message(&mut self, message: WsRequestMessage) {
let serialized = message.serialize();
match serialized {
Ok(msg) => self.message_queue.push_back(msg),
Err(e) => log(&format!("Error enqueing message: {:?}", e)),
}
self.message_queue.push_back(message);
}
fn send_queue(&mut self) {
@@ -182,16 +178,29 @@ impl Conduit {
return;
}
// Kill old messages, to avoid a flood on reconnect
self.message_queue.retain(|msg| msg.submitted.elapsed().as_secs_f32() < 10.0);
log(&format!("{} Enqueued Messages", self.message_queue.len()));
// Send queued messages
if let Some(ws) = &mut self.socket {
while let Some(msg) = self.message_queue.pop_front() {
log(&format!("Sending message: {msg:?}"));
let msg = msg.serialize();
log("Message Serialized");
match msg {
Ok(msg) => {
if let Err(e) = ws.send_with_u8_array(&msg) {
log(&format!("Error sending message: {e:?}"));
self.status = ConnectionStatus::New;
break;
}
}
Err(e) => {
log(&format!("Serialization error: {e:?}"));
}
}
}
} else {
log("No WebSocket connection");
let _ = self.connect();

View File

@@ -1,5 +1,6 @@
use thiserror::Error;
use wasm_pipe_types::{WasmRequest, WasmResponse};
use web_time::Instant;
#[derive(Error, Debug)]
pub enum MessageError {
@@ -26,15 +27,21 @@ impl WsResponseMessage {
}
}
pub struct WsRequestMessage(pub WasmRequest);
pub struct WsRequestMessage {
pub message: WasmRequest,
pub submitted: Instant,
}
impl WsRequestMessage {
pub fn new(msg: WasmRequest) -> Self {
Self(msg)
Self {
message: msg,
submitted: Instant::now(),
}
}
pub fn serialize(&self) -> Result<Vec<u8>, MessageError> {
let cbor = serde_cbor::to_vec(&self.0).map_err(|_| MessageError::Serialize)?;
let cbor = serde_cbor::to_vec(&self.message).map_err(|_| MessageError::Serialize)?;
Ok(miniz_oxide::deflate::compress_to_vec(&cbor, 8))
}
}