This commit is contained in:
Herbert Wolverson 2023-01-23 22:01:05 +00:00
commit c9810f93b6
25 changed files with 823 additions and 701 deletions

View File

@ -90,3 +90,4 @@ So for example, an ISP delivering 1Gbps service plans with 10Gbps aggregate thro
## Versions
### IPv4 + IPv6
- [v1.3](https://github.com/LibreQoE/LibreQoS/tree/main/v1.3) [Setup Guide](https://github.com/LibreQoE/LibreQoS/wiki/LibreQoS-v1.3-Installation-&-Usage-Guide-Physical-Server-and-Ubuntu-22.04)
- [v1.4-ALPHA](https://github.com/LibreQoE/LibreQoS/tree/main) [Setup Guide](https://github.com/LibreQoE/LibreQoS/blob/main/src/TESTING-1.4.md)

BIN
docs/jk.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 163 KiB

View File

@ -61,7 +61,7 @@ This will take a while the first time, but it puts everything in the right place
Copy the daemon configuration file to `/etc`:
```
sudo cp lqos.example /etc/lqos
sudo cp lqos.example /etc/lqos.conf
```
Now edit the file to match your setup:
@ -93,16 +93,27 @@ Change `enp1s0f1` and `enp1s0f2` to match your network interfaces. It doesn't ma
Follow the regular instructions to set your interfaces in `ispConfig.py` and your `network.json` and `ShapedDevices.csv` files.
## Run the program
## Configure autostart services (lqosd, lqos_node_manager)
```
cp /opt/libreqos/src/bin/lqos_node_manager.service.example /etc/systemd/system/lqos_node_manager.service
cp /opt/libreqos/src/bin/lqosd.service.example /etc/systemd/system/lqosd.service
systemctl daemon-reload
systemctl enable lqosd
systemctl enable lqos_node_manager
```
## Run the program (debug mode)
You can setup `lqosd` and `lqos_node_manager` as daemons to keep running (there are example `systemd` files in the `src/bin` folder). Since v1.4 is under such heavy development, I recommend using `screen` to run detached instances - and make finding issues easier.
1. `screen`
2. `cd /wherever_you_put_libreqos/src/bin`
3. `sudo ./lqosd`
4. Create a new `screen` window with `Ctrl-A, C`.
5. Run the webserver with `./lqos_node_manager`
6. If you didn't see errors, detach with `Ctrl-A, D`
1. Stop services: systemctl stop lqosd lqos_node_manager
2. `screen`
3. `cd /wherever_you_put_libreqos/src/bin`
4. `sudo ./lqosd`
5. Create a new `screen` window with `Ctrl-A, C`.
6. Run the webserver with `./lqos_node_manager`
7. If you didn't see errors, detach with `Ctrl-A, D`
You can now point a web browser at `http://a.b.c.d:9123` (replace `a.b.c.d` with the management IP address of your shaping server) and enjoy a real-time view of your network.
@ -110,11 +121,15 @@ In the web browser, click `Reload LibreQoS` to setup your shaping rules.
# Updating 1.4 Once You Have It
* Note: On January 22nd 2023 /etc/lqos was changed to /etc/lqos.conf to remedy Issue #205. If upgrading, be sure to move /etc/lqos to /etc/lqos.conf
<img src="https://raw.githubusercontent.com/LibreQoE/LibreQoS/main/docs/jk.jpg" width=200px></a>
1. Resume screen with `screen -r`
2. Go to console 0 (`Ctrl-A, 0`) and stop `lqosd` with `ctrl+c`.
3. Go to console 1 (`Ctl-A, 1`) and stop `lqos_node_manager` with `ctrl+c`.
4. Detach from `screen` with `Ctrl-A, D`.
5. Change to your `LibreQoS` directory (e.g. `cd /opt/LibreQoS`)
5. Change to your `LibreQoS` directory (e.g. `cd /opt/libreqos`)
6. Update from Git: `git pull`
7. Recompile: `./build-rust.sh`
8. Resume screen with `screen -r`.

View File

@ -6,7 +6,7 @@
# You still need to setup services to run `lqosd` and `lqos_node_manager`
# automatically.
#
# Don't forget to setup `/etc/lqos`
# Don't forget to setup `/etc/lqos.conf`
PROGS="lqosd lqtop xdp_iphash_to_cpu_cmdline xdp_pping lqos_node_manager webusers"
mkdir -p bin/static
pushd rust
@ -37,7 +37,7 @@ popd
cp rust/target/release/liblqos_python.so .
echo "-----------------------------------------------------------------"
echo "Don't forget to setup /etc/lqos!"
echo "Don't forget to setup /etc/lqos.conf!"
echo "Template .service files can be found in bin/"
echo ""
echo "Run rust/remove_pinned_maps.sh before you restart lqosd"

1
src/rust/Cargo.lock generated
View File

@ -1459,6 +1459,7 @@ dependencies = [
"anyhow",
"crossterm 0.19.0",
"lqos_bus",
"lqos_utils",
"tokio",
"tui",
]

View File

@ -7,101 +7,109 @@ use serde::{Deserialize, Serialize};
/// or data.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum BusRequest {
/// A generic "is it alive?" test. Returns an `Ack`.
Ping,
/// A generic "is it alive?" test. Returns an `Ack`.
Ping,
/// Request total current throughput. Returns a
/// `BusResponse::CurrentThroughput` value.
GetCurrentThroughput,
/// Request total current throughput. Returns a
/// `BusResponse::CurrentThroughput` value.
GetCurrentThroughput,
/// Retrieve the top N downloads by bandwidth use.
GetTopNDownloaders{
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32
},
/// Retrieve the top N downloads by bandwidth use.
GetTopNDownloaders {
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32,
},
/// Retrieves the TopN hosts with the worst RTT, sorted by RTT descending.
GetWorstRtt{
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32
},
/// Retrieves the TopN hosts with the worst RTT, sorted by RTT descending.
GetWorstRtt {
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32,
},
/// Retrieves current byte counters for all hosts.
GetHostCounter,
/// Retrieves the TopN hosts with the best RTT, sorted by RTT descending.
GetBestRtt {
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32,
},
/// Requests that the XDP back-end associate an IP address with a
/// TC (traffic control) handle, and CPU. The "upload" flag indicates
/// that this is a second channel applied to the SAME network interface,
/// used for "on-a-stick" mode upload channels.
MapIpToFlow {
/// The IP address to map, as a string. It can be IPv4 or IPv6,
/// and supports CIDR notation for subnets. "192.168.1.1",
/// "192.168.1.0/24", are both valid.
ip_address: String,
/// Retrieves current byte counters for all hosts.
GetHostCounter,
/// The TC Handle to which the IP address should be mapped.
tc_handle: TcHandle,
/// Requests that the XDP back-end associate an IP address with a
/// TC (traffic control) handle, and CPU. The "upload" flag indicates
/// that this is a second channel applied to the SAME network interface,
/// used for "on-a-stick" mode upload channels.
MapIpToFlow {
/// The IP address to map, as a string. It can be IPv4 or IPv6,
/// and supports CIDR notation for subnets. "192.168.1.1",
/// "192.168.1.0/24", are both valid.
ip_address: String,
/// The CPU on which the TC handle should be shaped.
cpu: u32,
/// The TC Handle to which the IP address should be mapped.
tc_handle: TcHandle,
/// If true, this is a *second* flow for the same IP range on
/// the same NIC. Used for handling "on a stick" configurations.
upload: bool,
},
/// The CPU on which the TC handle should be shaped.
cpu: u32,
/// Requests that the XDP program unmap an IP address/subnet from
/// the traffic management system.
DelIpFlow {
/// The IP address to unmap. It can be an IPv4, IPv6 or CIDR
/// subnet.
ip_address: String,
/// If true, this is a *second* flow for the same IP range on
/// the same NIC. Used for handling "on a stick" configurations.
upload: bool,
},
/// Should we delete a secondary mapping (for upload)?
upload: bool,
},
/// Requests that the XDP program unmap an IP address/subnet from
/// the traffic management system.
DelIpFlow {
/// The IP address to unmap. It can be an IPv4, IPv6 or CIDR
/// subnet.
ip_address: String,
/// Clear all XDP IP/TC/CPU mappings.
ClearIpFlow,
/// Should we delete a secondary mapping (for upload)?
upload: bool,
},
/// Retreieve list of all current IP/TC/CPU mappings.
ListIpFlow,
/// Clear all XDP IP/TC/CPU mappings.
ClearIpFlow,
/// Simulate the previous version's `xdp_pping` command, returning
/// RTT data for all mapped flows by TC handle.
XdpPping,
/// Retreieve list of all current IP/TC/CPU mappings.
ListIpFlow,
/// Divide current RTT data into histograms and return the data for
/// rendering.
RttHistogram,
/// Simulate the previous version's `xdp_pping` command, returning
/// RTT data for all mapped flows by TC handle.
XdpPping,
/// Cound the number of mapped and unmapped hosts detected by the
/// system.
HostCounts,
/// Divide current RTT data into histograms and return the data for
/// rendering.
RttHistogram,
/// Retrieve a list of all unmapped IPs that have been detected
/// carrying traffic.
AllUnknownIps,
/// Cound the number of mapped and unmapped hosts detected by the
/// system.
HostCounts,
/// Reload the `LibreQoS.py` program and return details of the
/// reload run.
ReloadLibreQoS,
/// Retrieve a list of all unmapped IPs that have been detected
/// carrying traffic.
AllUnknownIps,
/// Retrieve raw queue data for a given circuit ID.
GetRawQueueData(String), // The string is the circuit ID
/// Reload the `LibreQoS.py` program and return details of the
/// reload run.
ReloadLibreQoS,
/// Requests a real-time adjustment of the `lqosd` tuning settings
UpdateLqosDTuning(u64, Tunables),
/// Retrieve raw queue data for a given circuit ID.
GetRawQueueData(String), // The string is the circuit ID
/// Request that we start watching a circuit's queue
WatchQueue(String),
/// Requests a real-time adjustment of the `lqosd` tuning settings
UpdateLqosDTuning(u64, Tunables),
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]
RequestLqosEquinixTest,
/// Request that we start watching a circuit's queue
WatchQueue(String),
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]
RequestLqosEquinixTest,
}

View File

@ -7,55 +7,58 @@ use std::net::IpAddr;
/// inside a `BusReply`.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum BusResponse {
/// Yes, we're alive
Ack,
/// Yes, we're alive
Ack,
/// An operation failed, with the enclosed error message.
Fail(String),
/// An operation failed, with the enclosed error message.
Fail(String),
/// Current throughput for the overall system.
CurrentThroughput {
/// In bps
bits_per_second: (u64, u64),
/// Current throughput for the overall system.
CurrentThroughput {
/// In bps
bits_per_second: (u64, u64),
/// In pps
packets_per_second: (u64, u64),
/// In pps
packets_per_second: (u64, u64),
/// How much of the response has been subject to the shaper?
shaped_bits_per_second: (u64, u64),
},
/// How much of the response has been subject to the shaper?
shaped_bits_per_second: (u64, u64),
},
/// Provides a list of ALL mapped hosts traffic counters,
/// listing the IP Address and upload/download in a tuple.
HostCounters(Vec<(IpAddr, u64, u64)>),
/// Provides a list of ALL mapped hosts traffic counters,
/// listing the IP Address and upload/download in a tuple.
HostCounters(Vec<(IpAddr, u64, u64)>),
/// Provides the Top N downloaders IP stats.
TopDownloaders(Vec<IpStats>),
/// Provides the Top N downloaders IP stats.
TopDownloaders(Vec<IpStats>),
/// Provides the worst N RTT scores, sorted in descending order.
WorstRtt(Vec<IpStats>),
/// Provides the worst N RTT scores, sorted in descending order.
WorstRtt(Vec<IpStats>),
/// List all IP/TC mappings.
MappedIps(Vec<IpMapping>),
/// Provides the best N RTT scores, sorted in descending order.
BestRtt(Vec<IpStats>),
/// Return the data required for compatability with the `xdp_pping`
/// program.
XdpPping(Vec<XdpPpingResult>),
/// List all IP/TC mappings.
MappedIps(Vec<IpMapping>),
/// Return the data required to render the RTT histogram on the
/// local web GUI.
RttHistogram(Vec<u32>),
/// Return the data required for compatability with the `xdp_pping`
/// program.
XdpPping(Vec<XdpPpingResult>),
/// A tuple of (mapped)(unknown) host counts.
HostCounts((u32, u32)),
/// Return the data required to render the RTT histogram on the
/// local web GUI.
RttHistogram(Vec<u32>),
/// A list of all unmapped IP addresses that have been detected.
AllUnknownIps(Vec<IpStats>),
/// A tuple of (mapped)(unknown) host counts.
HostCounts((u32, u32)),
/// The results of reloading LibreQoS.
ReloadLibreQoS(String),
/// A list of all unmapped IP addresses that have been detected.
AllUnknownIps(Vec<IpStats>),
/// A string containing a JSON dump of a queue stats. Analagos to
/// the response from `tc show qdisc`.
RawQueueData(String),
/// The results of reloading LibreQoS.
ReloadLibreQoS(String),
/// A string containing a JSON dump of a queue stats. Analagos to
/// the response from `tc show qdisc`.
RawQueueData(String),
}

View File

@ -2,9 +2,9 @@
`lqos_config` is designed to manage configuration of LibreQoS.
Since all of the parts of the system need to know where to find LibreQoS, it first looks for a file named `/etc/lqos` and uses that to locate the LibreQoS installation.
Since all of the parts of the system need to know where to find LibreQoS, it first looks for a file named `/etc/lqos.conf` and uses that to locate the LibreQoS installation.
`/etc/lqos` looks like this:
`/etc/lqos.conf` looks like this:
```toml
lqos_directory = '/opt/libreqos'

View File

@ -1,10 +1,10 @@
//! Manages the `/etc/lqos` file.
//! Manages the `/etc/lqos.conf` file.
use anyhow::{Error, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
/// Represents the top-level of the `/etc/lqos` file. Serialization
/// Represents the top-level of the `/etc/lqos.conf` file. Serialization
/// structure.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct EtcLqos {
@ -105,12 +105,12 @@ pub struct BridgeVlan {
}
impl EtcLqos {
/// Loads `/etc/lqos`.
/// Loads `/etc/lqos.conf`.
pub fn load() -> Result<Self> {
if !Path::new("/etc/lqos").exists() {
return Err(Error::msg("You must setup /etc/lqos"));
if !Path::new("/etc/lqos.conf").exists() {
return Err(Error::msg("You must setup /etc/lqos.conf"));
}
let raw = std::fs::read_to_string("/etc/lqos")?;
let raw = std::fs::read_to_string("/etc/lqos.conf")?;
let config: Self = toml::from_str(&raw)?;
//println!("{:?}", config);
Ok(config)

View File

@ -1,7 +1,7 @@
//! The `lqos_config` crate stores and handles LibreQoS configuration.
//! Configuration is drawn from:
//! * The `ispConfig.py` file.
//! * The `/etc/lqos` file.
//! * The `/etc/lqos.conf` file.
//! * `ShapedDevices.csv` files.
//! * `network.json` files.

View File

@ -20,7 +20,7 @@ pub struct ConfigShapedDevices {
impl ConfigShapedDevices {
/// The path to the current `ShapedDevices.csv` file, determined
/// by acquiring the prefix from the `/etc/lqos` configuration
/// by acquiring the prefix from the `/etc/lqos.conf` configuration
/// file.
pub fn path() -> Result<PathBuf> {
let cfg = etc::EtcLqos::load()?;

View File

@ -39,6 +39,9 @@
</div>
<ul class="navbar-nav ms-auto">
<li class="nav-item">
<a class="nav-link" href="#" id="startTest"><i class="fa fa-flag-checkered"></i> Run Bandwidth Test</a>
</li>
<li class="nav-item ms-auto">
<a class="nav-link" href="/config"><i class="fa fa-gear"></i> Configuration</a>
</li>

View File

@ -209,7 +209,7 @@
<select id="sqmMode">
<option value="fq_codel">FQ_Codel</option>
<option value="cake diffserv4">Cake + Diffserv4</option>
<option value="cake diffserv4 ackfilter">Cake + Diffserv4 + ACK Filter</option>
<option value="cake diffserv4 ack-filter">Cake + Diffserv4 + ACK Filter</option>
</select>
</td>
</tr>

View File

@ -39,6 +39,9 @@
</div>
<ul class="navbar-nav ms-auto">
<li class="nav-item">
<a class="nav-link" href="#" id="startTest"><i class="fa fa-flag-checkered"></i> Run Bandwidth Test</a>
</li>
<li class="nav-item ms-auto">
<a class="nav-link" href="/config"><i class="fa fa-gear"></i> Configuration</a>
</li>

View File

@ -8,7 +8,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use lqos_queue_tracker::*;
use std::process::{id, Command};
const EXAMPLE_JSON: &str = include_str!("./example_json.txt");
const EXAMPLE_JSON: &str = include_str!("./example.json");
const TC: &str = "/sbin/tc";
const SUDO: &str = "/bin/sudo";
const IP: &str = "ip";

View File

@ -1,2 +1,3 @@
mod commands;
pub mod packet_scale;
mod string_table_enum;
mod commands;

View File

@ -0,0 +1,23 @@
pub fn scale_packets(n: u64) -> String {
if n > 1_000_000_000 {
format!("{:.2} gpps", n as f32 / 1_000_000_000.0)
} else if n > 1_000_000 {
format!("{:.2} mpps", n as f32 / 1_000_000.0)
} else if n > 1_000 {
format!("{:.2} kpps", n as f32 / 1_000.0)
} else {
format!("{n} pps")
}
}
pub fn scale_bits(n: u64) -> String {
if n > 1_000_000_000 {
format!("{:.2} gbit/s", n as f32 / 1_000_000_000.0)
} else if n > 1_000_000 {
format!("{:.2} mbit/s", n as f32 / 1_000_000.0)
} else if n > 1_000 {
format!("{:.2} kbit/s", n as f32 / 1_000.0)
} else {
format!("{n} bit/s")
}
}

View File

@ -11,15 +11,15 @@
## Required Configuration
You *must* have a file present called `/etc/lqos`. At a minimum, it must tell `lqosd` where to find the LibreQoS configuration. For example:
You *must* have a file present called `/etc/lqos.conf`. At a minimum, it must tell `lqosd` where to find the LibreQoS configuration. For example:
```toml
lqos_directory = '/opt/libreqos/v1.3'
lqos_directory = '/opt/libreqos'
```
## Offload Tuning
`lqosd` can set kernel tunables for you on start-up. These are specified in `/etc/lqos` also, in the `[tuning]` section:
`lqosd` can set kernel tunables for you on start-up. These are specified in `/etc/lqos.conf` also, in the `[tuning]` section:
```toml
[tuning]
@ -37,7 +37,7 @@ disable_offload = [ "gso", "tso", "lro", "sg", "gro" ]
## Bifrost - eBPF Kernel Bridge
To enable the kernel-side eBPF bridge, edit `/etc/lqos`:
To enable the kernel-side eBPF bridge, edit `/etc/lqos.conf`:
```toml
[bridge]
@ -63,4 +63,4 @@ vlan_mapping = [
]
```
Reciprocal mappings are created NOT automatically, you have to specify each mapping. When you are using "on a stick" mode, you need to redirect to the same interface.
Reciprocal mappings are created NOT automatically, you have to specify each mapping. When you are using "on a stick" mode, you need to redirect to the same interface.

View File

@ -120,6 +120,7 @@ fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusResponse>
BusRequest::GetHostCounter => throughput_tracker::host_counters(),
BusRequest::GetTopNDownloaders{start, end} => throughput_tracker::top_n(*start, *end),
BusRequest::GetWorstRtt{start, end} => throughput_tracker::worst_n(*start, *end),
BusRequest::GetBestRtt{start, end} => throughput_tracker::best_n(*start, *end),
BusRequest::MapIpToFlow {
ip_address,
tc_handle,
@ -152,4 +153,4 @@ fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusResponse>
}
});
}
}
}

View File

@ -11,271 +11,347 @@ use tokio::{task, time};
const RETIRE_AFTER_SECONDS: u64 = 30;
lazy_static! {
static ref THROUGHPUT_TRACKER: RwLock<ThroughputTracker> =
RwLock::new(ThroughputTracker::new());
static ref THROUGHPUT_TRACKER: RwLock<ThroughputTracker> =
RwLock::new(ThroughputTracker::new());
}
pub async fn spawn_throughput_monitor() {
let _ = task::spawn(async {
let mut interval = time::interval(Duration::from_secs(1));
let _ = task::spawn(async {
let mut interval = time::interval(Duration::from_secs(1));
loop {
let now = Instant::now();
let _ = task::spawn_blocking(move || {
let mut throughput = THROUGHPUT_TRACKER.write();
throughput.copy_previous_and_reset_rtt();
throughput.apply_new_throughput_counters();
throughput.apply_rtt_data();
throughput.update_totals();
throughput.next_cycle();
})
.await;
let elapsed = now.elapsed();
//println!("Tick consumed {:.2} seconds.", elapsed.as_secs_f32());
if elapsed.as_secs_f32() < 1.0 {
let duration = Duration::from_secs(1) - elapsed;
//println!("Sleeping for {:.2} seconds", duration.as_secs_f32());
tokio::time::sleep(duration).await;
} else {
interval.tick().await;
}
}
});
loop {
let now = Instant::now();
let _ = task::spawn_blocking(move || {
let mut throughput = THROUGHPUT_TRACKER.write();
throughput.copy_previous_and_reset_rtt();
throughput.apply_new_throughput_counters();
throughput.apply_rtt_data();
throughput.update_totals();
throughput.next_cycle();
})
.await;
let elapsed = now.elapsed();
//println!("Tick consumed {:.2} seconds.", elapsed.as_secs_f32());
if elapsed.as_secs_f32() < 1.0 {
let duration = Duration::from_secs(1) - elapsed;
//println!("Sleeping for {:.2} seconds", duration.as_secs_f32());
tokio::time::sleep(duration).await;
} else {
interval.tick().await;
}
}
});
}
pub fn current_throughput() -> BusResponse {
let (bits_per_second, packets_per_second, shaped_bits_per_second) = {
let tp = THROUGHPUT_TRACKER.read();
(
tp.bits_per_second(),
tp.packets_per_second(),
tp.shaped_bits_per_second(),
)
};
BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
}
let (bits_per_second, packets_per_second, shaped_bits_per_second) = {
let tp = THROUGHPUT_TRACKER.read();
(
tp.bits_per_second(),
tp.packets_per_second(),
tp.shaped_bits_per_second(),
)
};
BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
}
}
pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data.iter().for_each(|(k, v)| {
let ip = k.as_ip();
let (down, up) = v.bytes_per_second;
result.push((ip, down, up));
});
BusResponse::HostCounters(result)
let mut result = Vec::new();
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data.iter().for_each(|(k, v)| {
let ip = k.as_ip();
let (down, up) = v.bytes_per_second;
result.push((ip, down, up));
});
BusResponse::HostCounters(result)
}
#[inline(always)]
fn retire_check(cycle: u64, recent_cycle: u64) -> bool {
cycle < recent_cycle + RETIRE_AFTER_SECONDS
cycle < recent_cycle + RETIRE_AFTER_SECONDS
}
pub fn top_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.map(|(ip, te)| {
(
*ip,
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
)
})
.collect()
};
full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(ip, (bytes_dn, bytes_up), (packets_dn, packets_up), median_rtt, tc_handle)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
let mut full_list: Vec<(
XdpIpAddress,
(u64, u64),
(u64, u64),
f32,
TcHandle,
)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.map(|(ip, te)| {
(
*ip,
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
)
.collect();
BusResponse::TopDownloaders(result)
})
.collect()
};
full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::TopDownloaders(result)
}
pub fn worst_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.map(|(ip, te)| {
(
*ip,
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(ip, (bytes_dn, bytes_up), (packets_dn, packets_up), median_rtt, tc_handle)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
let mut full_list: Vec<(
XdpIpAddress,
(u64, u64),
(u64, u64),
f32,
TcHandle,
)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.map(|(ip, te)| {
(
*ip,
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
)
.collect();
BusResponse::WorstRtt(result)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::WorstRtt(result)
}
pub fn best_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<(
XdpIpAddress,
(u64, u64),
(u64, u64),
f32,
TcHandle,
)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.map(|(ip, te)| {
(
*ip,
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
te.tc_handle,
)
})
.collect()
};
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
full_list.reverse();
let result = full_list
.iter()
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::BestRtt(result)
}
pub fn xdp_pping_compat() -> BusResponse {
let raw = THROUGHPUT_TRACKER.read();
let result = raw
.raw_data
.iter()
.filter(|(_, d)| retire_check(raw.cycle, d.most_recent_cycle))
.filter_map(|(_ip, data)| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> = data
.recent_rtt_data
.iter()
.filter(|d| **d > 0)
.map(|d| *d)
.collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
valid_samples.sort_by(|a, b| (*a).cmp(&b));
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let max = *(valid_samples.iter().max().unwrap()) as f32 / 100.0;
let min = *(valid_samples.iter().min().unwrap()) as f32 / 100.0;
let sum = valid_samples.iter().sum::<u32>() as f32 / 100.0;
let avg = sum / samples as f32;
let raw = THROUGHPUT_TRACKER.read();
let result = raw
.raw_data
.iter()
.filter(|(_, d)| retire_check(raw.cycle, d.most_recent_cycle))
.filter_map(|(_ip, data)| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> = data
.recent_rtt_data
.iter()
.filter(|d| **d > 0)
.map(|d| *d)
.collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
valid_samples.sort_by(|a, b| (*a).cmp(&b));
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let max = *(valid_samples.iter().max().unwrap()) as f32 / 100.0;
let min = *(valid_samples.iter().min().unwrap()) as f32 / 100.0;
let sum = valid_samples.iter().sum::<u32>() as f32 / 100.0;
let avg = sum / samples as f32;
Some(XdpPpingResult {
tc: format!("{}", data.tc_handle.to_string()),
median,
avg,
max,
min,
samples,
})
} else {
None
}
} else {
None
}
})
.collect();
BusResponse::XdpPping(result)
Some(XdpPpingResult {
tc: format!("{}", data.tc_handle.to_string()),
median,
avg,
max,
min,
samples,
})
} else {
None
}
} else {
None
}
})
.collect();
BusResponse::XdpPping(result)
}
pub fn rtt_histogram() -> BusResponse {
let mut result = vec![0; 20];
let reader = THROUGHPUT_TRACKER.read();
for (_, data) in reader
.raw_data
.iter()
.filter(|(_, d)| retire_check(reader.cycle, d.most_recent_cycle))
{
let valid_samples: Vec<u32> = data
.recent_rtt_data
.iter()
.filter(|d| **d > 0)
.map(|d| *d)
.collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let median = f32::min(200.0, median);
let column = (median / 10.0) as usize;
result[usize::min(column, 19)] += 1;
}
let mut result = vec![0; 20];
let reader = THROUGHPUT_TRACKER.read();
for (_, data) in reader
.raw_data
.iter()
.filter(|(_, d)| retire_check(reader.cycle, d.most_recent_cycle))
{
let valid_samples: Vec<u32> =
data.recent_rtt_data.iter().filter(|d| **d > 0).map(|d| *d).collect();
let samples = valid_samples.len() as u32;
if samples > 0 {
let median = valid_samples[valid_samples.len() / 2] as f32 / 100.0;
let median = f32::min(200.0, median);
let column = (median / 10.0) as usize;
result[usize::min(column, 19)] += 1;
}
}
BusResponse::RttHistogram(result)
BusResponse::RttHistogram(result)
}
pub fn host_counts() -> BusResponse {
let mut total = 0;
let mut shaped = 0;
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.for_each(|(_, d)| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
}
});
BusResponse::HostCounts((total, shaped))
let mut total = 0;
let mut shaped = 0;
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.for_each(|(_, d)| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
}
});
BusResponse::HostCounts((total, shaped))
}
pub fn all_unknown_ips() -> BusResponse {
let boot_time = nix::time::clock_gettime(nix::time::ClockId::CLOCK_BOOTTIME)
.expect("Unable to obtain kernel time.");
let time_since_boot = Duration::from(boot_time);
let five_minutes_ago = time_since_boot - Duration::from_secs(300);
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
let boot_time =
nix::time::clock_gettime(nix::time::ClockId::CLOCK_BOOTTIME)
.expect("Unable to obtain kernel time.");
let time_since_boot = Duration::from(boot_time);
let five_minutes_ago = time_since_boot - Duration::from_secs(300);
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, u64)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| d.tc_handle.as_u32() == 0)
.filter(|(_, d)| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|(ip, te)| {
(
*ip,
te.bytes,
te.packets,
te.median_latency(),
te.tc_handle,
te.most_recent_cycle,
)
})
.collect()
};
full_list.sort_by(|a, b| b.5.partial_cmp(&a.5).unwrap());
let result = full_list
.iter()
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
_last_seen,
)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
let mut full_list: Vec<(
XdpIpAddress,
(u64, u64),
(u64, u64),
f32,
TcHandle,
u64,
)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| d.tc_handle.as_u32() == 0)
.filter(|(_, d)| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|(ip, te)| {
(
*ip,
te.bytes,
te.packets,
te.median_latency(),
te.tc_handle,
te.most_recent_cycle,
)
.collect();
BusResponse::AllUnknownIps(result)
})
.collect()
};
full_list.sort_by(|a, b| b.5.partial_cmp(&a.5).unwrap());
let result = full_list
.iter()
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
median_rtt,
tc_handle,
_last_seen,
)| IpStats {
ip_address: ip.as_ip().to_string(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
},
)
.collect();
BusResponse::AllUnknownIps(result)
}

View File

@ -2,32 +2,32 @@ use lqos_bus::TcHandle;
#[derive(Debug)]
pub(crate) struct ThroughputEntry {
pub(crate) first_cycle: u64,
pub(crate) most_recent_cycle: u64,
pub(crate) bytes: (u64, u64),
pub(crate) packets: (u64, u64),
pub(crate) prev_bytes: (u64, u64),
pub(crate) prev_packets: (u64, u64),
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
pub(crate) tc_handle: TcHandle,
pub(crate) recent_rtt_data: [u32; 60],
pub(crate) last_fresh_rtt_data_cycle: u64,
pub(crate) last_seen: u64, // Last seen in kernel time since boot
pub(crate) first_cycle: u64,
pub(crate) most_recent_cycle: u64,
pub(crate) bytes: (u64, u64),
pub(crate) packets: (u64, u64),
pub(crate) prev_bytes: (u64, u64),
pub(crate) prev_packets: (u64, u64),
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
pub(crate) tc_handle: TcHandle,
pub(crate) recent_rtt_data: [u32; 60],
pub(crate) last_fresh_rtt_data_cycle: u64,
pub(crate) last_seen: u64, // Last seen in kernel time since boot
}
impl ThroughputEntry {
pub(crate) fn median_latency(&self) -> f32 {
let mut shifted: Vec<f32> = self
.recent_rtt_data
.iter()
.filter(|n| **n != 0)
.map(|n| *n as f32 / 100.0)
.collect();
if shifted.is_empty() {
return 0.0;
}
shifted.sort_by(|a, b| a.partial_cmp(&b).unwrap());
shifted[shifted.len() / 2]
pub(crate) fn median_latency(&self) -> f32 {
let mut shifted: Vec<f32> = self
.recent_rtt_data
.iter()
.filter(|n| **n != 0)
.map(|n| *n as f32 / 100.0)
.collect();
if shifted.is_empty() {
return 0.0;
}
shifted.sort_by(|a, b| a.partial_cmp(&b).unwrap());
shifted[shifted.len() / 2]
}
}

View File

@ -5,281 +5,283 @@ use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
use std::collections::HashMap;
pub struct ThroughputTracker {
pub(crate) cycle: u64,
pub(crate) raw_data: HashMap<XdpIpAddress, ThroughputEntry>,
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
pub(crate) shaped_bytes_per_second: (u64, u64),
pub(crate) cycle: u64,
pub(crate) raw_data: HashMap<XdpIpAddress, ThroughputEntry>,
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
pub(crate) shaped_bytes_per_second: (u64, u64),
}
impl ThroughputTracker {
pub(crate) fn new() -> Self {
// The capacity should match that found in
// maximums.h (MAX_TRACKED_IPS), so we grab it
// from there via the C API.
Self {
cycle: RETIRE_AFTER_SECONDS,
raw_data: HashMap::with_capacity(lqos_sys::max_tracked_ips()),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
shaped_bytes_per_second: (0, 0),
pub(crate) fn new() -> Self {
// The capacity should match that found in
// maximums.h (MAX_TRACKED_IPS), so we grab it
// from there via the C API.
Self {
cycle: RETIRE_AFTER_SECONDS,
raw_data: HashMap::with_capacity(lqos_sys::max_tracked_ips()),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
shaped_bytes_per_second: (0, 0),
}
}
pub(crate) fn copy_previous_and_reset_rtt(&mut self) {
// Copy previous byte/packet numbers and reset RTT data
// We're using Rayon's "par_iter_mut" to spread the operation across
// all CPU cores.
self.raw_data.par_iter_mut().for_each(|(_k, v)| {
if v.first_cycle < self.cycle {
v.bytes_per_second.0 =
u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
v.bytes_per_second.1 =
u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0);
v.packets_per_second.0 =
u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0);
v.packets_per_second.1 =
u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0);
v.prev_bytes = v.bytes;
v.prev_packets = v.packets;
}
// Roll out stale RTT data
if self.cycle > RETIRE_AFTER_SECONDS
&& v.last_fresh_rtt_data_cycle < self.cycle - RETIRE_AFTER_SECONDS
{
v.recent_rtt_data = [0; 60];
}
});
}
pub(crate) fn apply_new_throughput_counters(&mut self) {
let cycle = self.cycle;
let raw_data = &mut self.raw_data;
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
entry.packets = (0, 0);
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
if c.last_seen != 0 {
entry.last_seen = c.last_seen;
}
}
}
if entry.packets != entry.prev_packets {
entry.most_recent_cycle = cycle;
}
} else {
let mut entry = ThroughputEntry {
first_cycle: self.cycle,
most_recent_cycle: 0,
bytes: (0, 0),
packets: (0, 0),
prev_bytes: (0, 0),
prev_packets: (0, 0),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
tc_handle: TcHandle::zero(),
recent_rtt_data: [0; 60],
last_fresh_rtt_data_cycle: 0,
last_seen: 0,
};
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
}
raw_data.insert(*xdp_ip, entry);
}
});
}
pub(crate) fn copy_previous_and_reset_rtt(&mut self) {
// Copy previous byte/packet numbers and reset RTT data
// We're using Rayon's "par_iter_mut" to spread the operation across
// all CPU cores.
self.raw_data.par_iter_mut().for_each(|(_k, v)| {
if v.first_cycle < self.cycle {
v.bytes_per_second.0 = u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
v.bytes_per_second.1 = u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0);
v.packets_per_second.0 =
u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0);
v.packets_per_second.1 =
u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0);
v.prev_bytes = v.bytes;
v.prev_packets = v.packets;
}
// Roll out stale RTT data
if self.cycle > RETIRE_AFTER_SECONDS
&& v.last_fresh_rtt_data_cycle < self.cycle - RETIRE_AFTER_SECONDS
{
v.recent_rtt_data = [0; 60];
}
});
}
pub(crate) fn apply_rtt_data(&mut self) {
rtt_for_each(&mut |raw_ip, rtt| {
if rtt.has_fresh_data != 0 {
let ip = XdpIpAddress(*raw_ip);
if let Some(tracker) = self.raw_data.get_mut(&ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self.cycle;
}
}
});
}
pub(crate) fn apply_new_throughput_counters(&mut self) {
let cycle = self.cycle;
let raw_data = &mut self.raw_data;
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
entry.packets = (0, 0);
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
if c.last_seen != 0 {
entry.last_seen = c.last_seen;
}
}
if entry.packets != entry.prev_packets {
entry.most_recent_cycle = cycle;
}
} else {
let mut entry = ThroughputEntry {
first_cycle: self.cycle,
most_recent_cycle: 0,
bytes: (0, 0),
packets: (0, 0),
prev_bytes: (0, 0),
prev_packets: (0, 0),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
tc_handle: TcHandle::zero(),
recent_rtt_data: [0; 60],
last_fresh_rtt_data_cycle: 0,
last_seen: 0,
};
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
}
raw_data.insert(*xdp_ip, entry);
}
});
}
pub(crate) fn apply_rtt_data(&mut self) {
rtt_for_each(&mut |raw_ip, rtt| {
if rtt.has_fresh_data != 0 {
let ip = XdpIpAddress(*raw_ip);
if let Some(tracker) = self.raw_data.get_mut(&ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self.cycle;
}
}
});
}
pub(crate) fn update_totals(&mut self) {
self.bytes_per_second = (0, 0);
self.packets_per_second = (0, 0);
self.shaped_bytes_per_second = (0, 0);
self.raw_data
.iter()
.map(|(_k, v)| {
(
v.bytes.0 - v.prev_bytes.0,
v.bytes.1 - v.prev_bytes.1,
v.packets.0 - v.prev_packets.0,
v.packets.1 - v.prev_packets.1,
v.tc_handle.as_u32() > 0,
)
})
.for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
self.bytes_per_second.0 += bytes_down;
self.bytes_per_second.1 += bytes_up;
self.packets_per_second.0 += packets_down;
self.packets_per_second.1 += packets_up;
if shaped {
self.shaped_bytes_per_second.0 += bytes_down;
self.shaped_bytes_per_second.1 += bytes_up;
}
});
}
pub(crate) fn next_cycle(&mut self) {
self.cycle += 1;
}
// pub(crate) fn tick(
// &mut self,
// value_dump: &[(XdpIpAddress, Vec<HostCounter>)],
// rtt: Result<Vec<([u8; 16], RttTrackingEntry)>>,
// ) -> Result<()> {
// // Copy previous byte/packet numbers and reset RTT data
// self.raw_data.iter_mut().for_each(|(_k, v)| {
// if v.first_cycle < self.cycle {
// v.bytes_per_second.0 = u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
// v.bytes_per_second.1 = u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0);
// v.packets_per_second.0 =
// u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0);
// v.packets_per_second.1 =
// u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0);
// v.prev_bytes = v.bytes;
// v.prev_packets = v.packets;
// }
// // Roll out stale RTT data
// if self.cycle > RETIRE_AFTER_SECONDS
// && v.last_fresh_rtt_data_cycle < self.cycle - RETIRE_AFTER_SECONDS
// {
// v.recent_rtt_data = [0; 60];
// }
// });
// value_dump.iter().for_each(|(xdp_ip, counts)| {
// if let Some(entry) = self.raw_data.get_mut(xdp_ip) {
// entry.bytes = (0, 0);
// entry.packets = (0, 0);
// for c in counts {
// entry.bytes.0 += c.download_bytes;
// entry.bytes.1 += c.upload_bytes;
// entry.packets.0 += c.download_packets;
// entry.packets.1 += c.upload_packets;
// if c.tc_handle != 0 {
// entry.tc_handle = TcHandle::from_u32(c.tc_handle);
// }
// if c.last_seen != 0 {
// entry.last_seen = c.last_seen;
// }
// }
// if entry.packets != entry.prev_packets {
// entry.most_recent_cycle = self.cycle;
// }
// } else {
// let mut entry = ThroughputEntry {
// first_cycle: self.cycle,
// most_recent_cycle: 0,
// bytes: (0, 0),
// packets: (0, 0),
// prev_bytes: (0, 0),
// prev_packets: (0, 0),
// bytes_per_second: (0, 0),
// packets_per_second: (0, 0),
// tc_handle: TcHandle::zero(),
// recent_rtt_data: [0; 60],
// last_fresh_rtt_data_cycle: 0,
// last_seen: 0,
// };
// for c in counts {
// entry.bytes.0 += c.download_bytes;
// entry.bytes.1 += c.upload_bytes;
// entry.packets.0 += c.download_packets;
// entry.packets.1 += c.upload_packets;
// if c.tc_handle != 0 {
// entry.tc_handle = TcHandle::from_u32(c.tc_handle);
// }
// }
// self.raw_data.insert(*xdp_ip, entry);
// }
// });
// // Apply RTT data
// if let Ok(rtt_dump) = rtt {
// for (raw_ip, rtt) in rtt_dump {
// if rtt.has_fresh_data != 0 {
// let ip = XdpIpAddress(raw_ip);
// if let Some(tracker) = self.raw_data.get_mut(&ip) {
// tracker.recent_rtt_data = rtt.rtt;
// tracker.last_fresh_rtt_data_cycle = self.cycle;
// }
// }
// }
// }
// // Update totals
// self.bytes_per_second = (0, 0);
// self.packets_per_second = (0, 0);
// self.shaped_bytes_per_second = (0, 0);
// self.raw_data
// .iter()
// .map(|(_k, v)| {
// (
// v.bytes.0 - v.prev_bytes.0,
// v.bytes.1 - v.prev_bytes.1,
// v.packets.0 - v.prev_packets.0,
// v.packets.1 - v.prev_packets.1,
// v.tc_handle.as_u32() > 0,
// )
// })
// .for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
// self.bytes_per_second.0 += bytes_down;
// self.bytes_per_second.1 += bytes_up;
// self.packets_per_second.0 += packets_down;
// self.packets_per_second.1 += packets_up;
// if shaped {
// self.shaped_bytes_per_second.0 += bytes_down;
// self.shaped_bytes_per_second.1 += bytes_up;
// }
// });
// // Onto the next cycle
// self.cycle += 1;
// Ok(())
// }
pub(crate) fn bits_per_second(&self) -> (u64, u64) {
(self.bytes_per_second.0 * 8, self.bytes_per_second.1 * 8)
}
pub(crate) fn shaped_bits_per_second(&self) -> (u64, u64) {
pub(crate) fn update_totals(&mut self) {
self.bytes_per_second = (0, 0);
self.packets_per_second = (0, 0);
self.shaped_bytes_per_second = (0, 0);
self
.raw_data
.iter()
.map(|(_k, v)| {
(
self.shaped_bytes_per_second.0 * 8,
self.shaped_bytes_per_second.1 * 8,
v.bytes.0 - v.prev_bytes.0,
v.bytes.1 - v.prev_bytes.1,
v.packets.0 - v.prev_packets.0,
v.packets.1 - v.prev_packets.1,
v.tc_handle.as_u32() > 0,
)
}
})
.for_each(
|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
self.bytes_per_second.0 += bytes_down;
self.bytes_per_second.1 += bytes_up;
self.packets_per_second.0 += packets_down;
self.packets_per_second.1 += packets_up;
if shaped {
self.shaped_bytes_per_second.0 += bytes_down;
self.shaped_bytes_per_second.1 += bytes_up;
}
},
);
}
pub(crate) fn packets_per_second(&self) -> (u64, u64) {
self.packets_per_second
}
pub(crate) fn next_cycle(&mut self) {
self.cycle += 1;
}
#[allow(dead_code)]
pub(crate) fn dump(&self) {
for (k, v) in self.raw_data.iter() {
let ip = k.as_ip();
log::info!("{:<34}{:?}", ip, v.tc_handle);
}
// pub(crate) fn tick(
// &mut self,
// value_dump: &[(XdpIpAddress, Vec<HostCounter>)],
// rtt: Result<Vec<([u8; 16], RttTrackingEntry)>>,
// ) -> Result<()> {
// // Copy previous byte/packet numbers and reset RTT data
// self.raw_data.iter_mut().for_each(|(_k, v)| {
// if v.first_cycle < self.cycle {
// v.bytes_per_second.0 = u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
// v.bytes_per_second.1 = u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0);
// v.packets_per_second.0 =
// u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0);
// v.packets_per_second.1 =
// u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0);
// v.prev_bytes = v.bytes;
// v.prev_packets = v.packets;
// }
// // Roll out stale RTT data
// if self.cycle > RETIRE_AFTER_SECONDS
// && v.last_fresh_rtt_data_cycle < self.cycle - RETIRE_AFTER_SECONDS
// {
// v.recent_rtt_data = [0; 60];
// }
// });
// value_dump.iter().for_each(|(xdp_ip, counts)| {
// if let Some(entry) = self.raw_data.get_mut(xdp_ip) {
// entry.bytes = (0, 0);
// entry.packets = (0, 0);
// for c in counts {
// entry.bytes.0 += c.download_bytes;
// entry.bytes.1 += c.upload_bytes;
// entry.packets.0 += c.download_packets;
// entry.packets.1 += c.upload_packets;
// if c.tc_handle != 0 {
// entry.tc_handle = TcHandle::from_u32(c.tc_handle);
// }
// if c.last_seen != 0 {
// entry.last_seen = c.last_seen;
// }
// }
// if entry.packets != entry.prev_packets {
// entry.most_recent_cycle = self.cycle;
// }
// } else {
// let mut entry = ThroughputEntry {
// first_cycle: self.cycle,
// most_recent_cycle: 0,
// bytes: (0, 0),
// packets: (0, 0),
// prev_bytes: (0, 0),
// prev_packets: (0, 0),
// bytes_per_second: (0, 0),
// packets_per_second: (0, 0),
// tc_handle: TcHandle::zero(),
// recent_rtt_data: [0; 60],
// last_fresh_rtt_data_cycle: 0,
// last_seen: 0,
// };
// for c in counts {
// entry.bytes.0 += c.download_bytes;
// entry.bytes.1 += c.upload_bytes;
// entry.packets.0 += c.download_packets;
// entry.packets.1 += c.upload_packets;
// if c.tc_handle != 0 {
// entry.tc_handle = TcHandle::from_u32(c.tc_handle);
// }
// }
// self.raw_data.insert(*xdp_ip, entry);
// }
// });
// // Apply RTT data
// if let Ok(rtt_dump) = rtt {
// for (raw_ip, rtt) in rtt_dump {
// if rtt.has_fresh_data != 0 {
// let ip = XdpIpAddress(raw_ip);
// if let Some(tracker) = self.raw_data.get_mut(&ip) {
// tracker.recent_rtt_data = rtt.rtt;
// tracker.last_fresh_rtt_data_cycle = self.cycle;
// }
// }
// }
// }
// // Update totals
// self.bytes_per_second = (0, 0);
// self.packets_per_second = (0, 0);
// self.shaped_bytes_per_second = (0, 0);
// self.raw_data
// .iter()
// .map(|(_k, v)| {
// (
// v.bytes.0 - v.prev_bytes.0,
// v.bytes.1 - v.prev_bytes.1,
// v.packets.0 - v.prev_packets.0,
// v.packets.1 - v.prev_packets.1,
// v.tc_handle.as_u32() > 0,
// )
// })
// .for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
// self.bytes_per_second.0 += bytes_down;
// self.bytes_per_second.1 += bytes_up;
// self.packets_per_second.0 += packets_down;
// self.packets_per_second.1 += packets_up;
// if shaped {
// self.shaped_bytes_per_second.0 += bytes_down;
// self.shaped_bytes_per_second.1 += bytes_up;
// }
// });
// // Onto the next cycle
// self.cycle += 1;
// Ok(())
// }
pub(crate) fn bits_per_second(&self) -> (u64, u64) {
(self.bytes_per_second.0 * 8, self.bytes_per_second.1 * 8)
}
pub(crate) fn shaped_bits_per_second(&self) -> (u64, u64) {
(self.shaped_bytes_per_second.0 * 8, self.shaped_bytes_per_second.1 * 8)
}
pub(crate) fn packets_per_second(&self) -> (u64, u64) {
self.packets_per_second
}
#[allow(dead_code)]
pub(crate) fn dump(&self) {
for (k, v) in self.raw_data.iter() {
let ip = k.as_ip();
log::info!("{:<34}{:?}", ip, v.tc_handle);
}
}
}

View File

@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
lqos_bus = { path = "../lqos_bus" }
lqos_utils = { path = "../lqos_utils" }
anyhow = "1"
tui = "0.19"
crossterm = { version = "0.19", features = [ "serde" ] }

View File

@ -4,6 +4,7 @@ use crossterm::{
terminal::{enable_raw_mode, size},
};
use lqos_bus::{BusClient, BusRequest, BusResponse, IpStats};
use lqos_utils::packet_scale::{scale_bits,scale_packets};
use std::{io, time::Duration};
use tui::{
backend::CrosstermBackend,
@ -52,12 +53,14 @@ async fn get_data(client: &mut BusClient, n_rows: u16) -> Result<DataResult> {
fn draw_menu<'a>(is_connected: bool) -> Paragraph<'a> {
let mut text = Spans::from(vec![
Span::styled("Q", Style::default().fg(Color::Green)),
Span::styled("Q", Style::default().fg(Color::White)),
Span::from("uit"),
]);
if !is_connected {
text.0.push(Span::styled(" NOT CONNECTED ", Style::default().fg(Color::Red)))
} else {
text.0.push(Span::styled(" CONNECTED ", Style::default().fg(Color::Green)))
}
let para = Paragraph::new(text)
@ -65,51 +68,28 @@ fn draw_menu<'a>(is_connected: bool) -> Paragraph<'a> {
.alignment(Alignment::Center)
.block(
Block::default()
.style(Style::default().fg(Color::White))
.style(Style::default().fg(Color::Green))
.border_type(BorderType::Plain)
.title("LibreQoS Monitor"),
.title("LibreQoS Monitor: "),
);
para
}
fn scale_packets(n: u64) -> String {
if n > 1_000_000_000 {
format!("{:.2} gpps", n as f32 / 1_000_000_000.0)
} else if n > 1_000_000 {
format!("{:.2} mpps", n as f32 / 1_000_000.0)
} else if n > 1_000 {
format!("{:.2} kpps", n as f32 / 1_000.0)
} else {
format!("{n} pps")
}
}
fn scale_bits(n: u64) -> String {
if n > 1_000_000_000 {
format!("{:.2} gbit/s", n as f32 / 1_000_000_000.0)
} else if n > 1_000_000 {
format!("{:.2} mbit/s", n as f32 / 1_000_000.0)
} else if n > 1_000 {
format!("{:.2} kbit/s", n as f32 / 1_000.0)
} else {
format!("{n} bit/s")
}
}
fn draw_pps<'a>(
packets_per_second: (u64, u64),
bits_per_second: (u64, u64),
) -> Spans<'a> {
let text = Spans::from(vec![
Span::from(scale_bits(bits_per_second.0)),
Span::from(" "),
Span::from(scale_bits(bits_per_second.1)),
Span::from(" "),
Span::styled("🠗 ", Style::default().fg(Color::Yellow)),
Span::from(scale_packets(packets_per_second.0)),
Span::from(" "),
Span::from(scale_bits(bits_per_second.0)),
Span::styled(" 🠕 ", Style::default().fg(Color::Yellow)),
Span::from(scale_packets(packets_per_second.1)),
Span::from(" "),
Span::from(scale_bits(bits_per_second.1)),
]);
text
}
@ -131,18 +111,18 @@ fn draw_top_pane<'a>(
};
Row::new(vec![
Cell::from(stats.ip_address.clone()),
Cell::from(format!("🠗 {}", scale_bits(stats.bits_per_second.0))),
Cell::from(format!("🠕 {}", scale_bits(stats.bits_per_second.1))),
Cell::from(format!("🠗 {:>13}", scale_bits(stats.bits_per_second.0))),
Cell::from(format!("🠕 {:>13}", scale_bits(stats.bits_per_second.1))),
Cell::from(format!(
"🠗 {}",
"🠗 {:>13}",
scale_packets(stats.packets_per_second.0)
)),
Cell::from(format!(
"🠕 {}",
"🠕 {:>13}",
scale_packets(stats.packets_per_second.1)
)),
Cell::from(format!("{:.2} ms", stats.median_tcp_rtt)),
Cell::from(stats.tc_handle.to_string()),
Cell::from(format!("{:>7} ms", format!("{:.2}",stats.median_tcp_rtt))),
Cell::from(format!("{:>7}",stats.tc_handle.to_string())),
])
.style(Style::default().fg(color))
})
@ -192,7 +172,7 @@ pub async fn main() -> Result<()> {
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
let t = terminal.size().unwrap();
let mut n_rows = t.height - 3;
let mut n_rows = 33;
loop {
if let Ok(result) = get_data(&mut bus_client, n_rows).await {
@ -206,11 +186,11 @@ pub async fn main() -> Result<()> {
terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(1)
.margin(0)
.constraints(
[
Constraint::Min(3),
Constraint::Percentage(99),
Constraint::Min(1),
Constraint::Percentage(100),
]
.as_ref(),
)
@ -334,6 +314,10 @@ pub async fn main() -> Result<()> {
code: KeyCode::Char('M'),
modifiers: KeyModifiers::NONE,
}) => break, // FIXME filter on My Network
Event::Key(KeyEvent {
code: KeyCode::Char('H'),
modifiers: KeyModifiers::NONE,
}) => break, // FIXME Generate histogram
Event::Key(KeyEvent {
code: KeyCode::Char('T'),
modifiers: KeyModifiers::NONE,