Test version - public key exchnage as part of stats submission

This commit is contained in:
Herbert Wolverson 2023-08-28 14:51:19 +00:00
parent 89d2538490
commit 7004ecc83f
4 changed files with 75 additions and 31 deletions

View File

@ -1,9 +1,10 @@
use dryoc::{dryocbox::{Nonce, DryocBox}, types::{NewByteArray, ByteArray}};
use lqos_config::EtcLqos;
use thiserror::Error;
use crate::{transport_data::{LtsCommand, NodeIdAndLicense, HelloVersion2}, submission_queue::queue::QueueError};
use super::keys::{SERVER_PUBLIC_KEY, KEYPAIR};
/*pub(crate) async fn encode_submission_hello(license_key: &str, node_id: &str, node_name: &str) -> Result<Vec<u8>, QueueError> {
pub(crate) async fn encode_submission_hello(license_key: &str, node_id: &str, node_name: &str) -> Result<Vec<u8>, QueueError> {
let mut result = Vec::new();
// Build the body
@ -11,13 +12,56 @@ use super::keys::{SERVER_PUBLIC_KEY, KEYPAIR};
license_key: license_key.to_string(),
node_id: node_id.to_string(),
node_name: node_name.to_string(),
client_public_key: KEYPAIR.read().await.public_key.clone(),
client_public_key: KEYPAIR.read().await.public_key.clone().to_vec(),
};
// Add the version
result.extend(2u16.to_be_bytes());
// Pad to 32-bit boundary
result.extend(3u16.to_be_bytes());
// Serialize the body
let hello_bytes = serde_cbor::to_vec(&hello_message).map_err(|_| QueueError::SendFail)?;
// Add the length
result.extend((hello_bytes.len() as u64).to_be_bytes());
// Add the body
result.extend(hello_bytes);
Ok(result)
}*/
}
#[allow(dead_code)]
#[derive(Debug, Error)]
pub enum SubmissionDecodeError {
#[error("Invalid version")]
InvalidVersion,
#[error("Invalid padding")]
InvalidPadding,
#[error("Failed to deserialize")]
Deserialize,
}
#[allow(dead_code)]
pub(crate) fn decode_submission_hello(bytes: &[u8]) -> Result<HelloVersion2, SubmissionDecodeError> {
let version = u16::from_be_bytes([bytes[0], bytes[1]]);
if version != 2 {
log::error!("Received an invalid version from the server: {}", version);
return Err(SubmissionDecodeError::InvalidVersion);
}
let padding = u16::from_be_bytes([bytes[2], bytes[3]]);
if padding != 3 {
log::error!("Received an invalid padding from the server: {}", padding);
return Err(SubmissionDecodeError::InvalidPadding);
}
let size = u64::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7], bytes[8], bytes[9], bytes[10], bytes[11]]);
let hello_bytes = &bytes[12..12 + size as usize];
let hello: HelloVersion2 = serde_cbor::from_slice(hello_bytes).map_err(|_| SubmissionDecodeError::Deserialize)?;
Ok(hello)
}
pub(crate) async fn encode_submission(submission: &LtsCommand) -> Result<Vec<u8>, QueueError> {
let nonce = Nonce::gen();
@ -73,4 +117,19 @@ fn get_license_key_and_node_id(nonce: &Nonce) -> Result<NodeIdAndLicense, QueueE
}
}
Err(QueueError::SendFail)
}
#[cfg(test)]
mod test {
#[tokio::test]
async fn hello_submission_roundtrip() {
let license_key = "1234567890";
let node_id = "node_id";
let node_name = "node_name";
let hello = super::encode_submission_hello(license_key, node_id, node_name).await.unwrap();
let hello = super::decode_submission_hello(&hello).unwrap();
assert_eq!(hello.license_key, license_key);
assert_eq!(hello.node_id, node_id);
assert_eq!(hello.node_name, node_name);
}
}

View File

@ -1,7 +1,8 @@
use std::time::Duration;
use lqos_config::EtcLqos;
use tokio::{sync::mpsc::Receiver, time::sleep, net::TcpStream, io::{AsyncWriteExt, AsyncReadExt}};
use crate::{submission_queue::comm_channel::keys::store_server_public_key, transport_data::HelloVersion2};
use crate::submission_queue::comm_channel::keys::store_server_public_key;
use self::encode::encode_submission_hello;
use super::queue::{send_queue, QueueError};
mod keys;
pub(crate) use keys::key_exchange;
@ -19,6 +20,7 @@ pub(crate) async fn start_communication_channel(mut rx: Receiver<SenderChannelMe
loop {
match rx.try_recv() {
Ok(SenderChannelMessage::QueueReady) => {
log::info!("Trying to connect to stats.libreqos.io");
let mut stream = connect_if_permitted().await;
// If we're still not connected, skip - otherwise, send the
@ -29,6 +31,8 @@ pub(crate) async fn start_communication_channel(mut rx: Receiver<SenderChannelMe
if all_good.is_err() {
log::error!("Stream fail during send. Will re-send");
}
} else {
log::error!("Unable to connect to stats.libreqos.io: {stream:?}");
}
}
Ok(SenderChannelMessage::Quit) => {
@ -42,6 +46,7 @@ pub(crate) async fn start_communication_channel(mut rx: Receiver<SenderChannelMe
}
async fn connect_if_permitted() -> Result<TcpStream, QueueError> {
log::info!("Connecting to stats.libreqos.io");
// Check that we have a local license key and are enabled
let cfg = EtcLqos::load().map_err(|_| {
log::error!("Unable to load config file.");
@ -73,33 +78,10 @@ async fn connect_if_permitted() -> Result<TcpStream, QueueError> {
})?;
// Send Hello
let pk = crate::submission_queue::comm_channel::keys::KEYPAIR.read().await.public_key.clone();
let hellov2 = HelloVersion2 {
node_id: node_id.clone(),
license_key: license_key.clone(),
node_name: node_name.clone(),
client_public_key: serde_cbor::to_vec(&pk).map_err(|_| QueueError::SendFail)?,
};
let bytes = serde_cbor::to_vec(&hellov2)
.map_err(|_| QueueError::SendFail)?;
stream.write_u16(2).await
.map_err(|e| {
log::error!("Unable to write version to {host}, {e:?}");
QueueError::SendFail
})?;
stream.write_u16(2).await
.map_err(|e| {
log::error!("Unable to write padding to {host}, {e:?}");
QueueError::SendFail
})?;
stream.write_u64(bytes.len() as u64).await
.map_err(|e| {
log::error!("Unable to write size to {host}, {e:?}");
QueueError::SendFail
})?;
let bytes = encode_submission_hello(&license_key, &node_id, &node_name).await?;
stream.write_all(&bytes).await
.map_err(|e| {
log::error!("Unable to write to {host}, {e:?}");
log::error!("Unable to write to {host}: {e:?}");
QueueError::SendFail
})?;

View File

@ -15,8 +15,11 @@ pub(crate) async fn enqueue_if_allowed(data: StatsSubmission, comm_tx: Sender<Se
log::error!("Your license is invalid. Please contact support.");
}
LicenseState::Valid{ .. } => {
log::info!("Sending data to the queue.");
QUEUE.push(LtsCommand::Submit(Box::new(data))).await;
let _ = comm_tx.send(SenderChannelMessage::QueueReady).await;
if let Err(e) = comm_tx.send(SenderChannelMessage::QueueReady).await {
log::error!("Unable to send queue ready message: {}", e);
}
}
}
}

View File

@ -74,7 +74,7 @@ pub struct NodeIdAndLicense {
}
/// For the new V2 hello license system, encodes a greeting
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct HelloVersion2 {
/// The node id
pub node_id: String,