WIP - sumissions transition to a processing queue. Beginnings of some early Influx code.

This commit is contained in:
Herbert Wolverson 2023-04-17 13:10:54 +00:00
parent 3e57a5098a
commit 6337bdb3d0
18 changed files with 981 additions and 228 deletions

1
.gitignore vendored
View File

@ -55,6 +55,7 @@ src/lqusers.toml
src/dist
src/rust/lqos_anonymous_stats_server/anonymous.sqlite
src/rust/long_term_stats/license_server/lqkeys.bin
src/rust/long_term_stats/lts_node/lqkeys.bin
# Ignore Rust build artifacts
src/rust/target

803
src/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,3 +12,11 @@ stop you.
* Setup the database schema (TBD).
* Put the connection string for your database in `/etc/lqdb` on each host.
## For each stats node in the cluster
* Install InfluxDB.
* Install lts_node.
* Setup `/etc/lqdb`.
* Copy `lts_keys.bin` from the license server to the `lts_node` directory.
* Run the process.
* Add the node to the license server.

View File

@ -15,4 +15,6 @@ lqos_bus = { path = "../../lqos_bus" }
serde_json = "1"
pgdb = { path = "../pgdb" }
dryoc = { version = "0.5", features = ["serde"] }
once_cell = "1"
once_cell = "1"
influxdb2 = "0"
futures = "0"

View File

@ -2,7 +2,6 @@ mod submissions;
mod web;
mod pki;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start the logger
@ -10,6 +9,21 @@ async fn main() -> anyhow::Result<()> {
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"),
);
// Get the database connection pool
let pool = pgdb::get_connection_pool(5).await;
if pool.is_err() {
log::error!("Unable to connect to the database");
log::error!("{pool:?}");
return Err(anyhow::Error::msg("Unable to connect to the database"));
}
let pool = pool.unwrap();
// Start the submission queue
let submission_sender = {
log::info!("Starting the submission queue");
submissions::submissions_queue(pool.clone()).await?
};
// Start the webserver
{
log::info!("Starting the webserver");
@ -18,7 +32,7 @@ async fn main() -> anyhow::Result<()> {
// Start the submissions serer
log::info!("Starting the submissions server");
if let Err(e) = tokio::spawn(submissions::submissions_server()).await {
if let Err(e) = tokio::spawn(submissions::submissions_server(pool.clone(), submission_sender)).await {
log::error!("Server exited with error: {}", e);
}

View File

@ -1,92 +1,4 @@
use std::net::SocketAddr;
use dryoc::dryocbox::*;
use lqos_bus::long_term_stats::{NodeIdAndLicense, StatsSubmission};
use pgdb::sqlx::{Pool, Postgres};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
spawn,
};
use crate::pki::LIBREQOS_KEYPAIR;
pub async fn submissions_server() -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9128").await?;
log::info!("Listening for stats submissions on :::9128");
let pool = pgdb::get_connection_pool(5).await;
if pool.is_err() {
log::error!("Unable to connect to the database");
log::error!("{pool:?}");
return Err(anyhow::Error::msg("Unable to connect to the database"));
}
let pool = pool.unwrap();
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
let pool = pool.clone();
spawn(async move {
let mut buffer = Vec::new();
if let Ok(bytes) = socket.read_to_end(&mut buffer).await {
log::info!("Received {bytes} bytes from {address:?}");
match decode(&buffer, address, pool).await {
Ok(stats) => {
println!("{stats:?}");
}
Err(e) => log::error!("{e:?}"),
}
}
});
}
}
async fn decode(
buf: &[u8],
address: SocketAddr,
pool: Pool<Postgres>,
) -> anyhow::Result<StatsSubmission> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
// Check the version
log::info!("Received a version {version} header of serialized size {size} from {address:?}");
if version != 1 {
log::warn!("Received a version {version} header from {address:?}");
return Err(anyhow::Error::msg("Received an unknown version header"));
}
// Read the header
let start = 2 + U64SIZE;
let end = start + size as usize;
let header: NodeIdAndLicense = lqos_bus::cbor::from_slice(&buf[start..end])?;
// Check the header against the database and retrieve the current
// public key
let public_key = pgdb::fetch_public_key(pool, &header.license_key, &header.node_id).await?;
let public_key: PublicKey = lqos_bus::cbor::from_slice(&public_key)?;
let private_key = LIBREQOS_KEYPAIR.read().unwrap().secret_key.clone();
// Retrieve the payload size
let size_buf = &buf[end .. end + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
let payload_encrypted = &buf[end + U64SIZE .. end + U64SIZE + size as usize];
// Decrypt
let dryocbox = DryocBox::from_bytes(&payload_encrypted).expect("failed to read box");
let decrypted = dryocbox
.decrypt_to_vec(
&header.nonce.into(),
&public_key,
&private_key,
)
.expect("unable to decrypt");
// Try to deserialize
let payload: StatsSubmission = lqos_bus::cbor::from_slice(&decrypted)?;
Ok(payload)
}
mod submission_server;
mod submission_queue;
pub use submission_server::submissions_server;
pub use submission_queue::submissions_queue;

View File

@ -0,0 +1,53 @@
use lqos_bus::long_term_stats::StatsTotals;
use pgdb::OrganizationDetails;
use futures::prelude::*;
use influxdb2::models::DataPoint;
use influxdb2::Client;
pub async fn collect_host_totals(org: &OrganizationDetails, node_id: &str, timestamp: i64, totals: Option<StatsTotals>) -> anyhow::Result<()> {
if let Some(totals) = totals {
let client = Client::new(&org.influx_host, &org.influx_org, &org.influx_token);
let points = vec![
DataPoint::builder("packets_down")
.timestamp(timestamp)
.tag("node", node_id.to_string())
.field("min", totals.packets.min.0 as i64)
.field("max", totals.packets.max.0 as i64)
.field("avg", totals.packets.avg.0 as i64)
.build()?,
DataPoint::builder("packets_up")
.tag("node", node_id.to_string())
.field("min", totals.packets.min.1 as i64)
.field("max", totals.packets.max.1 as i64)
.field("avg", totals.packets.avg.1 as i64)
.build()?,
DataPoint::builder("bits_down")
.tag("node", node_id.to_string())
.field("min", totals.bits.min.0 as i64)
.field("max", totals.bits.max.0 as i64)
.field("avg", totals.bits.avg.0 as i64)
.build()?,
DataPoint::builder("bits_up")
.tag("node", node_id.to_string())
.field("min", totals.bits.min.1 as i64)
.field("max", totals.bits.max.1 as i64)
.field("avg", totals.bits.avg.1 as i64)
.build()?,
DataPoint::builder("shaped_bits_down")
.tag("node", node_id.to_string())
.field("min", totals.shaped_bits.min.0 as i64)
.field("max", totals.shaped_bits.max.0 as i64)
.field("avg", totals.shaped_bits.avg.0 as i64)
.build()?,
DataPoint::builder("shaped_bits_up")
.tag("node", node_id.to_string())
.field("min", totals.shaped_bits.min.1 as i64)
.field("max", totals.shaped_bits.max.1 as i64)
.field("avg", totals.shaped_bits.avg.1 as i64)
.build()?,
];
client.write_with_precision(&org.influx_bucket, stream::iter(points), influxdb2::api::write::TimestampPrecision::Seconds).await?;
}
Ok(())
}

View File

@ -0,0 +1,4 @@
mod queue;
mod host_totals;
mod organization_cache;
pub use queue::{submissions_queue, SubmissionType};

View File

@ -0,0 +1,25 @@
use std::collections::HashMap;
use once_cell::sync::Lazy;
use pgdb::{OrganizationDetails, sqlx::{Pool, Postgres}};
use tokio::sync::RwLock;
static ORG_CACHE: Lazy<RwLock<HashMap<String, OrganizationDetails>>> = Lazy::new(|| {
RwLock::new(HashMap::new())
});
pub async fn get_org_details(cnn: Pool<Postgres>, key: &str) -> Option<OrganizationDetails> {
{ // Safety scope - lock is dropped on exit
let cache = ORG_CACHE.read().await;
if let Some(org) = cache.get(key) {
return Some(org.clone());
}
}
// We can be certain that we don't have a dangling lock now.
// Upgrade to a write lock and try to fetch the org details.
let mut cache = ORG_CACHE.write().await;
if let Ok(org) = pgdb::get_organization(cnn, key).await {
cache.insert(key.to_string(), org.clone());
return Some(org);
}
None
}

View File

@ -0,0 +1,45 @@
//! Provides a queue of submissions to be processed by the long-term storage.
//! This is a "fan in" pattern: multi-producer, single-consumer messages
//! send data into the queue, which is managed by a single consumer
//! thread. The consumer thread spawns tokio tasks to actually
//! perform the processing.
use lqos_bus::long_term_stats::{NodeIdAndLicense, StatsSubmission};
use pgdb::sqlx::{Pool, Postgres};
use tokio::{sync::mpsc::{Sender, Receiver}, join};
use crate::submissions::submission_queue::{host_totals::collect_host_totals, organization_cache::get_org_details};
const SUBMISSION_QUEUE_SIZE: usize = 100;
pub type SubmissionType = (NodeIdAndLicense, StatsSubmission);
pub async fn submissions_queue(cnn: Pool<Postgres>) -> anyhow::Result<Sender<SubmissionType>> {
// Create a channel to send data to the consumer thread
let (tx, rx) = tokio::sync::mpsc::channel::<SubmissionType>(SUBMISSION_QUEUE_SIZE);
tokio::spawn(run_queue(cnn, rx)); // Note that'we *moving* rx into the spawned task
Ok(tx)
}
async fn run_queue(cnn: Pool<Postgres>, mut rx: Receiver<SubmissionType>) -> anyhow::Result<()> {
while let Some(message) = rx.recv().await {
log::info!("Received a message from the submission queue");
let (node_id, stats) = message;
tokio::spawn(ingest_stats(cnn.clone(), node_id, stats));
}
Ok(())
}
async fn ingest_stats(cnn: Pool<Postgres>, node_id: NodeIdAndLicense, stats: StatsSubmission) -> anyhow::Result<()> {
log::info!("Ingesting stats for node {node_id:?}");
if let Some(org) = get_org_details(cnn, &node_id.license_key).await {
let ts = stats.timestamp as i64;
// TODO: Error handling
let _ = join!(
collect_host_totals(&org, &node_id.node_id, ts, stats.totals)
);
} else {
log::warn!("Unable to find organization for license {}", node_id.license_key);
}
Ok(())
}

View File

@ -0,0 +1,96 @@
//! Provides a TCP handler server, listening on port 9128. Connections
//! are expected in the encrypted LTS format (see the `lq_bus` crate).
//! If everything checks out, they are sent to the submission queue
//! for storage.
use std::net::SocketAddr;
use dryoc::dryocbox::*;
use lqos_bus::long_term_stats::{NodeIdAndLicense, StatsSubmission};
use pgdb::sqlx::{Pool, Postgres};
use tokio::{
io::AsyncReadExt,
net::TcpListener,
spawn, sync::mpsc::Sender,
};
use crate::pki::LIBREQOS_KEYPAIR;
use super::submission_queue::SubmissionType;
/// Starts the submission server, listening on port 9128.
/// The server runs in the background.
pub async fn submissions_server(cnn: Pool<Postgres>, sender: Sender<SubmissionType>) -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9128").await?;
log::info!("Listening for stats submissions on :::9128");
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
let pool = cnn.clone();
let my_sender = sender.clone();
spawn(async move {
let mut buffer = Vec::new();
if let Ok(bytes) = socket.read_to_end(&mut buffer).await {
log::info!("Received {bytes} bytes from {address:?}");
let decode_result = decode(&buffer, address, pool).await;
if decode_result.is_err() {
log::error!("{decode_result:?}");
return;
}
let stats = decode_result.unwrap();
if let Err(e) = my_sender.send(stats).await {
log::error!("Unable to send stats to the queue: {}", e);
}
}
});
}
}
async fn decode(
buf: &[u8],
address: SocketAddr,
pool: Pool<Postgres>,
) -> anyhow::Result<SubmissionType> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
// Check the version
log::info!("Received a version {version} header of serialized size {size} from {address:?}");
if version != 1 {
log::warn!("Received a version {version} header from {address:?}");
return Err(anyhow::Error::msg("Received an unknown version header"));
}
// Read the header
let start = 2 + U64SIZE;
let end = start + size as usize;
let header: NodeIdAndLicense = lqos_bus::cbor::from_slice(&buf[start..end])?;
// Check the header against the database and retrieve the current
// public key
let public_key = pgdb::fetch_public_key(pool, &header.license_key, &header.node_id).await?;
let public_key: PublicKey = lqos_bus::cbor::from_slice(&public_key)?;
let private_key = LIBREQOS_KEYPAIR.read().unwrap().secret_key.clone();
// Retrieve the payload size
let size_buf = &buf[end .. end + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
let payload_encrypted = &buf[end + U64SIZE .. end + U64SIZE + size as usize];
// Decrypt
let dryocbox = DryocBox::from_bytes(payload_encrypted).expect("failed to read box");
let decrypted = dryocbox
.decrypt_to_vec(
&header.nonce.into(),
&public_key,
&private_key,
)
.expect("unable to decrypt");
// Try to deserialize
let payload: StatsSubmission = lqos_bus::cbor::from_slice(&decrypted)?;
Ok((header, payload))
}

View File

@ -9,5 +9,5 @@ thiserror = "1"
env_logger = "0"
log = "0"
lqos_bus = { path = "../../lqos_bus" }
sqlx = { version = "0.7.0-alpha.2", default_features = false, features = [ "runtime-tokio", "tls-rustls", "postgres" ] }
sqlx = { version = "0.6.3", features = [ "runtime-tokio-rustls", "postgres" ] }
futures = "0"

View File

@ -1,9 +1,11 @@
mod connection;
mod license;
mod organization;
pub mod sqlx {
pub use sqlx::*;
}
pub use connection::get_connection_pool;
pub use license::{get_stats_host_for_key, insert_or_update_node_public_key, fetch_public_key};
pub use license::{get_stats_host_for_key, insert_or_update_node_public_key, fetch_public_key};
pub use organization::{OrganizationDetails, get_organization};

View File

@ -0,0 +1,21 @@
use sqlx::{Pool, Postgres};
use crate::license::StatsHostError;
#[derive(Clone, sqlx::FromRow, Debug)]
pub struct OrganizationDetails {
pub key: String,
pub name: String,
pub influx_host: String,
pub influx_org: String,
pub influx_token: String,
pub influx_bucket: String,
}
pub async fn get_organization(cnn: Pool<Postgres>, key: &str) -> Result<OrganizationDetails, StatsHostError> {
let row = sqlx::query_as::<_, OrganizationDetails>("SELECT * FROM organizations WHERE key=$1")
.bind(key)
.fetch_one(&cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
Ok(row)
}

View File

@ -73,18 +73,14 @@ pub struct StatsTreeNode {
/// Collation of all stats for a given time period
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsSubmission {
/// License Key
pub key: String,
/// Node ID
pub node_id: String,
/// Timestamp of the collation (UNIX time)
pub timestamp: u64,
/// Total traffic statistics
pub totals: StatsTotals,
pub totals: Option<StatsTotals>,
/// Per-host statistics
pub hosts: Vec<StatsHost>,
pub hosts: Option<Vec<StatsHost>>,
/// Tree of traffic summaries
pub tree: Vec<StatsTreeNode>,
pub tree: Option<Vec<StatsTreeNode>>,
}
/// Network-transmitted query to ask the status of a license
@ -144,7 +140,7 @@ pub enum LicenseCheckError {
}
/// Stores a license id and node id for transport
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeIdAndLicense {
/// The node id
pub node_id: String,

View File

@ -1,4 +1,3 @@
use lqos_config::EtcLqos;
use lqos_utils::unix_time::unix_now;
use super::{
@ -30,17 +29,11 @@ pub(crate) struct SubmissionHost {
impl From<StatsSubmission> for lqos_bus::long_term_stats::StatsSubmission {
fn from(value: StatsSubmission) -> Self {
let cfg = EtcLqos::load().unwrap();
let lts_cfg = cfg.long_term_stats.unwrap();
let key = lts_cfg.license_key.unwrap_or("".to_string());
let node_id = cfg.node_id.unwrap_or("".to_string());
Self {
key,
node_id,
timestamp: value.timestamp,
totals: value.clone().into(),
hosts: value.hosts.into_iter().map(Into::into).collect(),
tree: value.tree.into_iter().map(Into::into).collect(),
totals: Some(value.clone().into()),
hosts: Some(value.hosts.into_iter().map(Into::into).collect()),
tree: Some(value.tree.into_iter().map(Into::into).collect()),
}
}
}

View File

@ -24,7 +24,7 @@ impl Queue {
}
}
pub async fn push(&self, data: lqos_bus::long_term_stats::StatsSubmission, host: &str) {
pub async fn push(&self, data: lqos_bus::long_term_stats::StatsSubmission, host: String) {
{
let mut lock = self.queue.lock().await;
lock.push(QueueSubmission {

View File

@ -17,7 +17,7 @@ pub(crate) async fn new_submission(data: StatsSubmission) {
log::error!("Your license is invalid. Please contact support.");
}
LicenseState::Valid{ stats_host, .. } => {
QUEUE.push(data.into(), &stats_host).await;
tokio::spawn(QUEUE.push(data.into(), stats_host));
}
}
}