Licensing server can deny or accept a key of 'test'. Truly minimal, designed to get things moving.

This commit is contained in:
Herbert Wolverson 2023-04-07 15:29:27 +00:00
parent a64e4d4f9c
commit 2cb68ce06b
15 changed files with 729 additions and 496 deletions

View File

@ -172,7 +172,7 @@ def validateNetworkAndDevices():
for ipEntry in ipv4_list:
if ipEntry in seenTheseIPsAlready:
warnings.warn("Provided IPv4 '" + ipEntry + "' in ShapedDevices.csv at row " + str(rowNum) + " is duplicate.", stacklevel=2)
devicesValidatedOrNot = False
#devicesValidatedOrNot = False
seenTheseIPsAlready.append(ipEntry)
else:
if (type(ipaddress.ip_network(ipEntry)) is ipaddress.IPv4Network) or (type(ipaddress.ip_address(ipEntry)) is ipaddress.IPv4Address):

1
src/rust/Cargo.lock generated
View File

@ -1585,6 +1585,7 @@ dependencies = [
"serde_json",
"signal-hook",
"sysinfo",
"thiserror",
"tokio",
]

View File

@ -1,76 +1,78 @@
use std::net::SocketAddr;
use lqos_bus::long_term_stats::{LicenseCheck, LicenseReply};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::TcpListener, spawn};
use std::net::SocketAddr;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
spawn,
};
pub async fn start() -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9126").await?;
log::info!("Listening on :::9126");
let listener = TcpListener::bind(":::9126").await?;
log::info!("Listening on :::9126");
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
spawn(async move {
let mut buf = Vec::with_capacity(10240);
if let Ok(bytes) = socket.read_to_end(&mut buf).await {
log::info!("Received {bytes} bytes from {address:?}");
match decode(&buf, address).await {
Err(e) => log::error!("{e:?}"),
Ok(reply) => {
let bytes = build_reply(&reply);
match bytes {
Ok(bytes) => {
if let Err(e) = socket.write_all(&bytes).await {
log::error!("Write error: {e:?}");
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
spawn(async move {
let mut buf = vec![0u8; 10240];
if let Ok(bytes) = socket.read(&mut buf).await {
log::info!("Received {bytes} bytes from {address:?}");
match decode(&buf, address).await {
Err(e) => log::error!("{e:?}"),
Ok(reply) => {
let bytes = build_reply(&reply);
match bytes {
Ok(bytes) => {
if let Err(e) = socket.write_all(&bytes).await {
log::error!("Write error: {e:?}");
}
}
Err(e) => {
log::error!("{e:?}");
}
}
}
Err(e) => {
log::error!("{e:?}");
}
}
}
}
}
});
}
});
}
}
async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<LicenseReply> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
log::info!("Received a version {version} payload of serialized size {size} from {address:?}");
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
log::info!("Received a version {version} payload of serialized size {size} from {address:?}");
match version {
1 => {
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: Result<LicenseCheck, _> =
serde_cbor::from_slice(&buf[start..end]);
match payload {
Ok(payload) => check_license(&payload, address).await,
Err(e) => {
log::error!(
"Unable to deserialize request sent from {address:?}"
);
log::error!("{e:?}");
Err(anyhow::Error::msg("Deserialize error"))
match version {
1 => {
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: LicenseCheck = serde_cbor::from_slice(&buf[start..end])?;
let license = check_license(&payload, address).await?;
Ok(license)
}
_ => {
log::error!("Unknown version of statistics: {version}, dumped {size} bytes");
Err(anyhow::Error::msg("Version error"))
}
}
}
_ => {
log::error!(
"Unknown version of statistics: {version}, dumped {size} bytes"
);
Err(anyhow::Error::msg("Version error"))
}
}
}
async fn check_license(request: &LicenseCheck, address: SocketAddr) -> anyhow::Result<LicenseReply> {
Ok(LicenseReply::Denied)
async fn check_license(
request: &LicenseCheck,
address: SocketAddr,
) -> anyhow::Result<LicenseReply> {
log::info!("Checking license from {address:?}, key: {}", request.key);
if request.key == "test" {
log::info!("License is valid");
Ok(LicenseReply::Valid)
} else {
log::info!("License is denied");
Ok(LicenseReply::Denied)
}
}
fn build_reply(reply: &LicenseReply) -> anyhow::Result<Vec<u8>> {
@ -84,11 +86,11 @@ fn build_reply(reply: &LicenseReply) -> anyhow::Result<Vec<u8>> {
let payload = payload.unwrap();
// Store the version as network order
result.extend( 1u16.to_be_bytes() );
result.extend(1u16.to_be_bytes());
// Store the payload size as network order
result.extend( (payload.len() as u64).to_be_bytes() );
result.extend((payload.len() as u64).to_be_bytes());
// Store the payload itself
result.extend(payload);
Ok(result)
}
}

View File

@ -1,12 +1,12 @@
use super::PREALLOCATE_CLIENT_BUFFER_BYTES;
use crate::{
bus::BusClientError, decode_response, encode_request, BusRequest,
BusResponse, BusSession, BUS_SOCKET_PATH,
bus::BusClientError, decode_response, encode_request, BusRequest, BusResponse, BusSession,
BUS_SOCKET_PATH,
};
use log::error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
};
/// Convenient wrapper for accessing the bus
@ -16,42 +16,43 @@ use tokio::{
/// * `requests` a vector of `BusRequest` requests to make.
///
/// **Returns** Either an error, or a vector of `BusResponse` replies
pub async fn bus_request(
requests: Vec<BusRequest>,
) -> Result<Vec<BusResponse>, BusClientError> {
let stream = UnixStream::connect(BUS_SOCKET_PATH).await;
if let Err(e) = &stream {
if e.kind() == std::io::ErrorKind::NotFound {
error!("Unable to access {BUS_SOCKET_PATH}. Check that lqosd is running and you have appropriate permissions.");
return Err(BusClientError::SocketNotFound);
pub async fn bus_request(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>, BusClientError> {
let stream = UnixStream::connect(BUS_SOCKET_PATH).await;
if let Err(e) = &stream {
if e.kind() == std::io::ErrorKind::NotFound {
error!("Unable to access {BUS_SOCKET_PATH}. Check that lqosd is running and you have appropriate permissions.");
return Err(BusClientError::SocketNotFound);
}
}
}
let mut stream = stream.unwrap(); // This unwrap is safe, we checked that it exists previously
let test = BusSession { persist: false, requests };
let msg = encode_request(&test);
if msg.is_err() {
error!("Unable to encode request {:?}", test);
return Err(BusClientError::EncodingError);
}
let msg = msg.unwrap();
let ret = stream.write(&msg).await;
if ret.is_err() {
error!("Unable to write to {BUS_SOCKET_PATH} stream.");
error!("{:?}", ret);
return Err(BusClientError::StreamWriteError);
}
let mut buf = Vec::with_capacity(PREALLOCATE_CLIENT_BUFFER_BYTES);
let ret = stream.read_to_end(&mut buf).await;
if ret.is_err() {
error!("Unable to read from {BUS_SOCKET_PATH} stream.");
error!("{:?}", ret);
return Err(BusClientError::StreamReadError);
}
let reply = decode_response(&buf);
if reply.is_err() {
error!("Unable to decode response from socket.");
return Err(BusClientError::DecodingError);
}
let reply = reply.unwrap();
Ok(reply.responses)
let mut stream = stream.unwrap(); // This unwrap is safe, we checked that it exists previously
let test = BusSession {
persist: false,
requests,
};
let msg = encode_request(&test);
if msg.is_err() {
error!("Unable to encode request {:?}", test);
return Err(BusClientError::EncodingError);
}
let msg = msg.unwrap();
let ret = stream.write(&msg).await;
if ret.is_err() {
error!("Unable to write to {BUS_SOCKET_PATH} stream.");
error!("{:?}", ret);
return Err(BusClientError::StreamWriteError);
}
let mut buf = Vec::with_capacity(PREALLOCATE_CLIENT_BUFFER_BYTES);
let ret = stream.read_to_end(&mut buf).await;
if ret.is_err() {
error!("Unable to read from {BUS_SOCKET_PATH} stream.");
error!("{:?}", ret);
return Err(BusClientError::StreamReadError);
}
let reply = decode_response(&buf);
if reply.is_err() {
error!("Unable to decode response from socket.");
return Err(BusClientError::DecodingError);
}
let reply = reply.unwrap();
Ok(reply.responses)
}

View File

@ -157,7 +157,10 @@ pub enum BusRequest {
/// Specific requests from the long-term stats system
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum StatsRequest {
/// Retrieve the current totals for all hosts
CurrentTotals,
/// Retrieve the values for all hosts
AllHosts,
/// Get the network tree
Tree,
}

View File

@ -1,49 +1,190 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
/// Type that provides a minimum, maximum and average value
/// for a given statistic within the associated time period.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsSummary {
pub min: (u64, u64),
pub max: (u64, u64),
pub avg: (u64, u64),
/// Minimum value
pub min: (u64, u64),
/// Maximum value
pub max: (u64, u64),
/// Average value
pub avg: (u64, u64),
}
/// Type that provides a minimum, maximum and average value
/// for a given RTT value within the associated time period.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsRttSummary {
pub min: u32,
pub max: u32,
pub avg: u32,
/// Minimum value
pub min: u32,
/// Maximum value
pub max: u32,
/// Average value
pub avg: u32,
}
/// Type that holds total traffic statistics for a given time period
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsTotals {
pub packets: StatsSummary,
pub bits: StatsSummary,
pub shaped_bits: StatsSummary,
/// Total number of packets
pub packets: StatsSummary,
/// Total number of bits
pub bits: StatsSummary,
/// Total number of shaped bits
pub shaped_bits: StatsSummary,
}
/// Type that holds per-host statistics for a given stats collation
/// period.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsHost {
pub circuit_id: String,
pub ip_address: String,
pub bits: StatsSummary,
pub rtt: StatsRttSummary,
pub tree_indices: Vec<usize>,
/// Host circuit_id as it appears in ShapedDevices.csv
pub circuit_id: String,
/// Host's IP address
pub ip_address: String,
/// Host's traffic statistics
pub bits: StatsSummary,
/// Host's RTT statistics
pub rtt: StatsRttSummary,
/// Positional arguments indicating which tree entries apply
pub tree_indices: Vec<usize>,
}
/// Node inside a traffic summary tree
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsTreeNode {
pub name: String,
pub max_throughput: (u32, u32),
pub parents: Vec<usize>,
pub immediate_parent: Option<usize>,
/// Name (from network.json)
pub name: String,
/// Maximum allowed throughput (from network.json)
pub max_throughput: (u32, u32),
/// Indices of parents in the tree
pub parents: Vec<usize>,
/// Index of immediate parent in the tree
pub immediate_parent: Option<usize>,
}
/// Network-transmitted query to ask the status of a license
/// key.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct LicenseCheck {
key: String,
/// The key to check
pub key: String,
}
/// License server responses for a key
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum LicenseReply {
Denied
}
/// The license is denied
Denied,
/// The license is valid
Valid,
}
/// Errors that can occur when checking licenses
#[derive(Debug, Error)]
pub enum LicenseCheckError {
/// Serialization error
#[error("Unable to serialize license check")]
SerializeFail,
/// Network error
#[error("Unable to send license check")]
SendFail,
/// Network error
#[error("Unable to receive license result")]
ReceiveFail,
/// Deserialization error
#[error("Unable to deserialize license result")]
DeserializeFail,
}
fn build_license_request(key: String) -> Result<Vec<u8>, LicenseCheckError> {
let mut result = Vec::new();
let payload = serde_cbor::to_vec(&LicenseCheck { key });
if let Err(e) = payload {
log::warn!("Unable to serialize statistics. Not sending them.");
log::warn!("{e:?}");
return Err(LicenseCheckError::SerializeFail);
}
let payload = payload.unwrap();
// Store the version as network order
result.extend(1u16.to_be_bytes());
// Store the payload size as network order
result.extend((payload.len() as u64).to_be_bytes());
// Store the payload itself
result.extend(payload);
Ok(result)
}
const LICENSE_SERVER: &str = "127.0.0.1:9126";
/// Ask the license server if the license is valid
///
/// # Arguments
///
/// * `key` - The license key to check
pub async fn ask_license_server(key: String) -> Result<LicenseReply, LicenseCheckError> {
if let Ok(buffer) = build_license_request(key) {
let stream = TcpStream::connect(LICENSE_SERVER).await;
if let Err(e) = &stream {
if e.kind() == std::io::ErrorKind::NotFound {
log::error!("Unable to access {LICENSE_SERVER}. Check that lqosd is running and you have appropriate permissions.");
return Err(LicenseCheckError::SendFail);
}
}
let mut stream = stream.unwrap(); // This unwrap is safe, we checked that it exists previously
let ret = stream.write(&buffer).await;
if ret.is_err() {
log::error!("Unable to write to {LICENSE_SERVER} stream.");
log::error!("{:?}", ret);
return Err(LicenseCheckError::SendFail);
}
let mut buf = Vec::with_capacity(10240);
let ret = stream.read_to_end(&mut buf).await;
if ret.is_err() {
log::error!("Unable to read from {LICENSE_SERVER} stream.");
log::error!("{:?}", ret);
return Err(LicenseCheckError::SendFail);
}
decode_response(&buf)
} else {
Err(LicenseCheckError::SerializeFail)
}
}
fn decode_response(buf: &[u8]) -> Result<LicenseReply, LicenseCheckError> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2]
.try_into()
.map_err(|_| LicenseCheckError::DeserializeFail)?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE]
.try_into()
.map_err(|_| LicenseCheckError::DeserializeFail)?;
let size = u64::from_be_bytes(*size_buf);
if version != 1 {
log::error!("License server returned an unknown version: {}", version);
return Err(LicenseCheckError::DeserializeFail);
}
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: Result<LicenseReply, _> = serde_cbor::from_slice(&buf[start..end]);
match payload {
Ok(payload) => Ok(payload),
Err(e) => {
log::error!("Unable to deserialize license result");
log::error!("{e:?}");
Err(LicenseCheckError::DeserializeFail)
}
}
}

View File

@ -27,6 +27,7 @@ nix = "0"
sysinfo = "0"
dashmap = "5"
num-traits = "0.2"
thiserror = "1"
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]

View File

@ -70,9 +70,9 @@ impl From<SubmissionHost> for lqos_bus::long_term_stats::StatsHost {
///
/// (n) is defined in /etc/lqos.conf in the `collation_period_seconds`
/// field of the `[long_term_stats]` section.
pub(crate) fn collate_stats() {
pub(crate) async fn collate_stats() {
// Obtain exclusive access to the session
let mut writer = SESSION_BUFFER.lock().unwrap();
let mut writer = SESSION_BUFFER.lock().await;
if writer.is_empty() {
// Nothing to do - so exit
return;
@ -155,5 +155,5 @@ pub(crate) fn collate_stats() {
std::mem::drop(writer);
// Submit
new_submission(submission);
new_submission(submission).await;
}

View File

@ -1,8 +1,9 @@
use crate::throughput_tracker::THROUGHPUT_TRACKER;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use std::{
net::IpAddr,
sync::{atomic::AtomicU64, Mutex},
sync::atomic::AtomicU64,
};
static SUBMISSION_COUNTER: AtomicU64 = AtomicU64::new(0);
@ -25,7 +26,7 @@ pub(crate) struct SessionHost {
pub(crate) static SESSION_BUFFER: Lazy<Mutex<Vec<StatsSession>>> =
Lazy::new(|| Mutex::new(Vec::new()));
pub(crate) fn gather_throughput_stats() {
pub(crate) async fn gather_throughput_stats() {
let count =
SUBMISSION_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if count < 5 {
@ -70,5 +71,5 @@ pub(crate) fn gather_throughput_stats() {
});
});
SESSION_BUFFER.lock().unwrap().push(session);
SESSION_BUFFER.lock().await.push(session);
}

View File

@ -0,0 +1,68 @@
use lqos_bus::long_term_stats::{ask_license_server, LicenseReply};
use lqos_config::EtcLqos;
use lqos_utils::unix_time::unix_now;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
#[derive(Default, Clone)]
struct LicenseStatus {
key: String,
state: LicenseState,
last_check: u64,
}
#[derive(Default, Clone, Copy, PartialEq, Debug)]
pub(crate) enum LicenseState {
#[default]
Unknown,
Denied,
Valid,
}
static LICENSE_STATUS: Lazy<RwLock<LicenseStatus>> =
Lazy::new(|| RwLock::new(LicenseStatus::default()));
pub(crate) async fn get_license_status() -> LicenseState {
if let Ok(unix_time) = unix_now() {
let license_status = {
LICENSE_STATUS.read().await.clone()
};
if license_status.state == LicenseState::Unknown || license_status.last_check < unix_time - (60 * 60) {
return check_license(unix_time).await;
}
return license_status.state;
}
LicenseState::Unknown
}
async fn check_license(unix_time: u64) -> LicenseState {
if let Ok(cfg) = EtcLqos::load() {
if let Some(cfg) = cfg.long_term_stats {
if let Some(key) = cfg.license_key {
let mut lock = LICENSE_STATUS.write().await;
lock.last_check = unix_time;
lock.key = key.clone();
match ask_license_server(key.clone()).await {
Ok(state) => {
match state {
LicenseReply::Denied => {
log::warn!("License is in state: DENIED.");
lock.state = LicenseState::Denied;
}
LicenseReply::Valid => {
log::info!("License is in state: VALID.");
lock.state = LicenseState::Valid;
}
}
return lock.state;
}
Err(e) => {
log::error!("Error checking licensing server");
log::error!("{e:?}");
}
}
}
}
}
LicenseState::Unknown
}

View File

@ -7,70 +7,71 @@ mod collation_utils;
mod collator;
mod submission;
mod tree;
mod licensing;
use std::time::Duration;
use log::{info, warn};
use lqos_config::EtcLqos;
use lqos_utils::fdtimer::periodic;
use std::{
sync::mpsc::{self, Receiver, Sender},
thread,
};
pub(crate) use submission::{get_stats_totals, get_stats_host, get_stats_tree};
use tokio::{sync::mpsc::{Sender, Receiver, self}, time::Instant};
#[derive(Debug)]
/// Messages to/from the stats collection thread
pub enum StatsMessage {
/// Fresh throughput stats have been collected
ThroughputReady,
/// Request that the stats thread terminate
Quit,
}
/// Launch the statistics system
pub fn start_long_term_stats() -> Option<Sender<StatsMessage>> {
pub async fn start_long_term_stats() -> Option<Sender<StatsMessage>> {
if let Ok(cfg) = EtcLqos::load() {
if let Some(cfg) = cfg.long_term_stats {
if cfg.gather_stats {
start_collating_stats(cfg.collation_period_seconds);
return Some(start_collecting_stats());
start_collating_stats(cfg.collation_period_seconds).await;
return Some(start_collecting_stats().await);
} else {
log::warn!("Long-term stats 'gather_stats' set to false");
}
}
}
log::warn!("Not gathering long-term stats. Check the [long_term_stats] section of /etc/lqos.conf.");
None
}
fn start_collecting_stats() -> Sender<StatsMessage> {
async fn start_collecting_stats() -> Sender<StatsMessage> {
// Spawn the manager thread, which will wait for message to maintain
// sync with the generation of stats.
let (tx, rx): (Sender<StatsMessage>, Receiver<StatsMessage>) =
mpsc::channel();
thread::spawn(move || {
info!("Long-term stats gathering thread started");
loop {
let msg = rx.recv();
match msg {
Ok(StatsMessage::Quit) => {
info!("Exiting the long-term stats thread");
break;
}
Ok(StatsMessage::ThroughputReady) => {
data_collector::gather_throughput_stats();
}
Err(e) => {
warn!("Error in the long-term stats thread message receiver");
warn!("{e:?}");
}
}
}
});
let (tx, rx): (Sender<StatsMessage>, Receiver<StatsMessage>) = mpsc::channel(10);
tokio::spawn(long_term_stats_collector(rx));
tx
}
fn start_collating_stats(seconds: u32) {
let interval_ms = (seconds * 1000).into();
thread::spawn(move || {
periodic(
interval_ms,
"Long-Term Stats Collation",
&mut collator::collate_stats
);
});
async fn long_term_stats_collector(mut rx: Receiver<StatsMessage>) {
info!("Long-term stats gathering thread started");
loop {
let msg = rx.recv().await;
match msg {
Some(StatsMessage::ThroughputReady) => {
data_collector::gather_throughput_stats().await;
}
None => {
warn!("Long-term stats thread received a None message");
}
}
}
}
async fn start_collating_stats(seconds: u32) {
tokio::spawn(collation_task(seconds));
}
async fn collation_task(interval_seconds: u32) {
loop {
let now = Instant::now();
collator::collate_stats().await;
let elapsed = now.elapsed();
let sleep_time = Duration::from_secs(interval_seconds.into()) - elapsed;
if sleep_time.as_secs() > 0 {
tokio::time::sleep(sleep_time).await;
}
}
}

View File

@ -1,19 +1,23 @@
use std::sync::RwLock;
use lqos_bus::{BusResponse, long_term_stats::StatsHost};
use lqos_config::EtcLqos;
use once_cell::sync::Lazy;
use super::collator::StatsSubmission;
use super::{collator::StatsSubmission, licensing::{get_license_status, LicenseState}};
pub(crate) static CURRENT_STATS: Lazy<RwLock<Option<StatsSubmission>>> = Lazy::new(|| RwLock::new(None));
pub(crate) fn new_submission(data: StatsSubmission) {
pub(crate) async fn new_submission(data: StatsSubmission) {
*CURRENT_STATS.write().unwrap() = Some(data);
if let Ok(cfg) = EtcLqos::load() {
if let Some(cfg) = &cfg.long_term_stats {
if let Some(license) = &cfg.license_key {
println!("We've got a license key");
}
let license = get_license_status().await;
match license {
LicenseState::Unknown => {
log::info!("Temporary error finding license status. Will retry.");
}
LicenseState::Denied => {
log::error!("Your license is invalid. Please contact support.");
}
LicenseState::Valid => {
// TODO: Send to server
}
}
}

View File

@ -72,15 +72,15 @@ async fn main() -> Result<()> {
};
// Spawn tracking sub-systems
let long_term_stats_tx = long_term_stats::start_long_term_stats();
let long_term_stats_tx = long_term_stats::start_long_term_stats().await;
join!(
start_heimdall(),
spawn_queue_structure_monitor(),
shaped_devices_tracker::shaped_devices_watcher(),
shaped_devices_tracker::network_json_watcher(),
anonymous_usage::start_anonymous_usage(),
throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone()),
);
throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone());
spawn_queue_monitor();
// Handle signals
@ -96,11 +96,11 @@ async fn main() -> Result<()> {
warn!("This should never happen - terminating on unknown signal")
}
}
if let Some(tx) = long_term_stats_tx {
/*if let Some(tx) = long_term_stats_tx {
// Deliberately ignoring the error because we're trying to
// exit ASAP and don't really care!
let _ = tx.send(long_term_stats::StatsMessage::Quit);
}
let _ = tx.send(long_term_stats::StatsMessage::Quit).await;
}*/
std::mem::drop(kernels);
UnixSocketServer::signal_cleanup();
std::mem::drop(file_lock);

View File

@ -2,23 +2,22 @@ mod heimdall_data;
mod throughput_entry;
mod tracking_data;
use crate::{
long_term_stats::StatsMessage, shaped_devices_tracker::NETWORK_JSON,
stats::TIME_TO_POLL_HOSTS,
throughput_tracker::tracking_data::ThroughputTracker,
long_term_stats::StatsMessage, shaped_devices_tracker::NETWORK_JSON, stats::TIME_TO_POLL_HOSTS,
throughput_tracker::tracking_data::ThroughputTracker,
};
pub use heimdall_data::get_flow_stats;
use log::{info, warn};
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
use lqos_utils::{
fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress,
};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::{sync::mpsc::Sender, time::Duration};
use tokio::{
sync::mpsc::Sender,
time::{Duration, Instant},
};
const RETIRE_AFTER_SECONDS: u64 = 30;
pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> =
Lazy::new(ThroughputTracker::new);
pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> = Lazy::new(ThroughputTracker::new);
/// Create the throughput monitor thread, and begin polling for
/// throughput data every second.
@ -27,347 +26,363 @@ pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> =
///
/// * `long_term_stats_tx` - an optional MPSC sender to notify the
/// collection thread that there is fresh data.
pub fn spawn_throughput_monitor(
long_term_stats_tx: Option<Sender<StatsMessage>>,
) {
info!("Starting the bandwidth monitor thread.");
let interval_ms = 1000; // 1 second
info!("Bandwidth check period set to {interval_ms} ms.");
pub async fn spawn_throughput_monitor(long_term_stats_tx: Option<Sender<StatsMessage>>) {
info!("Starting the bandwidth monitor thread.");
let interval_ms = 1000; // 1 second
info!("Bandwidth check period set to {interval_ms} ms.");
tokio::spawn(throughput_task(interval_ms, long_term_stats_tx));
}
std::thread::spawn(move || {
periodic(interval_ms, "Throughput Monitor", &mut || {
let start = std::time::Instant::now();
{
let net_json = NETWORK_JSON.read().unwrap();
net_json.zero_throughput_and_rtt();
} // Scope to end the lock
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.apply_new_throughput_counters();
THROUGHPUT_TRACKER.apply_rtt_data();
THROUGHPUT_TRACKER.update_totals();
THROUGHPUT_TRACKER.next_cycle();
let duration_ms = start.elapsed().as_micros();
TIME_TO_POLL_HOSTS
.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed);
if let Some(tx) = &long_term_stats_tx {
let result = tx.send(StatsMessage::ThroughputReady);
if let Err(e) = result {
warn!("Error sending message to stats collection system. {e:?}");
async fn throughput_task(interval_ms: u64, long_term_stats_tx: Option<Sender<StatsMessage>>) {
loop {
let start = Instant::now();
{
let net_json = NETWORK_JSON.read().unwrap();
net_json.zero_throughput_and_rtt();
} // Scope to end the lock
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.apply_new_throughput_counters();
THROUGHPUT_TRACKER.apply_rtt_data();
THROUGHPUT_TRACKER.update_totals();
THROUGHPUT_TRACKER.next_cycle();
let duration_ms = start.elapsed().as_micros();
TIME_TO_POLL_HOSTS.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed);
if let Some(tx) = &long_term_stats_tx {
let result = tx.send(StatsMessage::ThroughputReady).await;
if let Err(e) = result {
warn!("Error sending message to stats collection system. {e:?}");
}
}
}
});
});
let sleep_duration = Duration::from_millis(interval_ms) - start.elapsed();
if sleep_duration.as_millis() > 0 {
tokio::time::sleep(sleep_duration).await;
}
}
}
pub fn current_throughput() -> BusResponse {
let (bits_per_second, packets_per_second, shaped_bits_per_second) = {
(
THROUGHPUT_TRACKER.bits_per_second(),
THROUGHPUT_TRACKER.packets_per_second(),
THROUGHPUT_TRACKER.shaped_bits_per_second(),
)
};
BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
}
let (bits_per_second, packets_per_second, shaped_bits_per_second) = {
(
THROUGHPUT_TRACKER.bits_per_second(),
THROUGHPUT_TRACKER.packets_per_second(),
THROUGHPUT_TRACKER.shaped_bits_per_second(),
)
};
BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
}
}
pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| {
let ip = v.key().as_ip();
let (down, up) = v.bytes_per_second;
result.push((ip, down, up));
});
BusResponse::HostCounters(result)
let mut result = Vec::new();
THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| {
let ip = v.key().as_ip();
let (down, up) = v.bytes_per_second;
result.push((ip, down, up));
});
BusResponse::HostCounters(result)
}
#[inline(always)]
fn retire_check(cycle: u64, recent_cycle: u64) -> bool {
cycle < recent_cycle + RETIRE_AFTER_SECONDS
cycle < recent_cycle + RETIRE_AFTER_SECONDS
}
type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String);
pub fn top_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp_cycle =
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.map(|te| {
(
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
let mut full_list: Vec<TopList> = {
let tp_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.map(|te| {
(
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
)
})
.collect()
};
full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
circuit_id,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
})
.collect()
};
full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
circuit_id,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::TopDownloaders(result)
.collect();
BusResponse::TopDownloaders(result)
}
pub fn worst_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp_cycle =
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
let mut full_list: Vec<TopList> = {
let tp_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
circuit_id,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
circuit_id,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::WorstRtt(result)
.collect();
BusResponse::WorstRtt(result)
}
pub fn best_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp_cycle =
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
let mut full_list: Vec<TopList> = {
let tp_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
full_list.reverse();
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
circuit_id,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
full_list.reverse();
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
circuit_id,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::BestRtt(result)
.collect();
BusResponse::BestRtt(result)
}
pub fn xdp_pping_compat() -> BusResponse {
let raw_cycle =
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
let result = THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|d| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|data| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> =
data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
valid_samples.sort_by(|a, b| (*a).cmp(b));
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let max = *(valid_samples.iter().max().unwrap()) as f32 / 100.0;
let min = *(valid_samples.iter().min().unwrap()) as f32 / 100.0;
let sum = valid_samples.iter().sum::<u32>() as f32 / 100.0;
let avg = sum / samples as f32;
let raw_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
let result = THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|d| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|data| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> = data
.recent_rtt_data
.iter()
.filter(|d| **d > 0)
.copied()
.collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
valid_samples.sort_by(|a, b| (*a).cmp(b));
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let max = *(valid_samples.iter().max().unwrap()) as f32 / 100.0;
let min = *(valid_samples.iter().min().unwrap()) as f32 / 100.0;
let sum = valid_samples.iter().sum::<u32>() as f32 / 100.0;
let avg = sum / samples as f32;
Some(XdpPpingResult {
tc: data.tc_handle.to_string(),
median,
avg,
max,
min,
samples,
})
} else {
None
}
} else {
None
}
})
.collect();
BusResponse::XdpPping(result)
Some(XdpPpingResult {
tc: data.tc_handle.to_string(),
median,
avg,
max,
min,
samples,
})
} else {
None
}
} else {
None
}
})
.collect();
BusResponse::XdpPping(result)
}
pub fn rtt_histogram() -> BusResponse {
let mut result = vec![0; 20];
let reader_cycle =
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
for data in THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|d| retire_check(reader_cycle, d.most_recent_cycle))
{
let valid_samples: Vec<u32> =
data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let median = f32::min(200.0, median);
let column = (median / 10.0) as usize;
result[usize::min(column, 19)] += 1;
let mut result = vec![0; 20];
let reader_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
for data in THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|d| retire_check(reader_cycle, d.most_recent_cycle))
{
let valid_samples: Vec<u32> = data
.recent_rtt_data
.iter()
.filter(|d| **d > 0)
.copied()
.collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let median = f32::min(200.0, median);
let column = (median / 10.0) as usize;
result[usize::min(column, 19)] += 1;
}
}
}
BusResponse::RttHistogram(result)
BusResponse::RttHistogram(result)
}
pub fn host_counts() -> BusResponse {
let mut total = 0;
let mut shaped = 0;
let tp_cycle =
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|d| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
}
});
BusResponse::HostCounts((total, shaped))
let mut total = 0;
let mut shaped = 0;
let tp_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|d| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
}
});
BusResponse::HostCounts((total, shaped))
}
type FullList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, u64);
pub fn all_unknown_ips() -> BusResponse {
let boot_time = time_since_boot();
if boot_time.is_err() {
warn!("The Linux system clock isn't available to provide time since boot, yet.");
warn!("This only happens immediately after a reboot.");
return BusResponse::NotReadyYet;
}
let boot_time = boot_time.unwrap();
let time_since_boot = Duration::from(boot_time);
let five_minutes_ago =
time_since_boot.saturating_sub(Duration::from_secs(300));
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
let boot_time = time_since_boot();
if boot_time.is_err() {
warn!("The Linux system clock isn't available to provide time since boot, yet.");
warn!("This only happens immediately after a reboot.");
return BusResponse::NotReadyYet;
}
let boot_time = boot_time.unwrap();
let time_since_boot = Duration::from(boot_time);
let five_minutes_ago = time_since_boot.saturating_sub(Duration::from_secs(300));
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
let mut full_list: Vec<FullList> = {
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| d.tc_handle.as_u32() == 0)
.filter(|d| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|te| {
(
*te.key(),
te.bytes,
te.packets,
te.median_latency(),
te.tc_handle,
te.most_recent_cycle,
let mut full_list: Vec<FullList> = {
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| d.tc_handle.as_u32() == 0)
.filter(|d| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|te| {
(
*te.key(),
te.bytes,
te.packets,
te.median_latency(),
te.tc_handle,
te.most_recent_cycle,
)
})
.collect()
};
full_list.sort_by(|a, b| b.5.partial_cmp(&a.5).unwrap());
let result = full_list
.iter()
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
_last_seen,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: String::new(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
})
.collect()
};
full_list.sort_by(|a, b| b.5.partial_cmp(&a.5).unwrap());
let result = full_list
.iter()
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
_last_seen,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: String::new(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::AllUnknownIps(result)
.collect();
BusResponse::AllUnknownIps(result)
}

View File

@ -1,5 +0,0 @@
max_width = 79
tab_spaces = 2
use_small_heuristics = "Max"
array_width = 77