mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
Minimally working functionality
This commit is contained in:
@@ -133,6 +133,9 @@ pub enum BusRequest {
|
||||
/// Obtain the lqosd statistics
|
||||
GetLqosStats,
|
||||
|
||||
/// Tell me flow stats for a given IP address
|
||||
GetFlowStats(String),
|
||||
|
||||
/// If running on Equinix (the `equinix_test` feature is enabled),
|
||||
/// display a "run bandwidht test" link.
|
||||
#[cfg(feature = "equinix_tests")]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{IpMapping, IpStats, XdpPpingResult};
|
||||
use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::IpAddr;
|
||||
|
||||
@@ -85,5 +85,8 @@ pub enum BusResponse {
|
||||
high_watermark: (u64, u64),
|
||||
/// Number of flows tracked
|
||||
tracked_flows: u64,
|
||||
}
|
||||
},
|
||||
|
||||
/// Flow Data
|
||||
FlowData(Vec<FlowTransport>),
|
||||
}
|
||||
|
||||
@@ -66,3 +66,20 @@ pub struct XdpPpingResult {
|
||||
/// derived. If 0, the other values are invalid.
|
||||
pub samples: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub enum FlowProto {
|
||||
TCP, UDP, ICMP
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct FlowTransport {
|
||||
pub src: String,
|
||||
pub dst: String,
|
||||
pub proto: FlowProto,
|
||||
pub src_port: u16,
|
||||
pub dst_port: u16,
|
||||
pub bytes: u64,
|
||||
pub packets: u64,
|
||||
pub tos: u8,
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
#![warn(missing_docs)]
|
||||
mod bus;
|
||||
mod ip_stats;
|
||||
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult};
|
||||
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport};
|
||||
mod tc_handle;
|
||||
pub use bus::{
|
||||
bus_request, decode_request, decode_response, encode_request,
|
||||
|
||||
@@ -67,6 +67,7 @@ fn rocket() -> _ {
|
||||
queue_info::circuit_info,
|
||||
queue_info::current_circuit_throughput,
|
||||
queue_info::watch_circuit,
|
||||
queue_info::flow_stats,
|
||||
config_control::get_nic_list,
|
||||
config_control::get_current_python_config,
|
||||
config_control::get_current_lqosd_config,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::auth_guard::AuthGuard;
|
||||
use crate::cache_control::NoCache;
|
||||
use crate::tracker::SHAPED_DEVICES;
|
||||
use lqos_bus::{bus_request, BusRequest, BusResponse};
|
||||
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport};
|
||||
use rocket::response::content::RawJson;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::serde::Serialize;
|
||||
@@ -99,6 +99,22 @@ pub async fn raw_queue_by_circuit(
|
||||
NoCache::new(RawJson(result))
|
||||
}
|
||||
|
||||
#[get("/api/flows/<ip_list>")]
|
||||
pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<FlowTransport>>> {
|
||||
let mut result = Vec::new();
|
||||
let request: Vec<BusRequest> = ip_list.split(",").map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect();
|
||||
let responses = bus_request(request).await.unwrap();
|
||||
for r in responses.iter() {
|
||||
if let BusResponse::FlowData(flow) = r {
|
||||
result.extend_from_slice(flow);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
NoCache::new(Json(result))
|
||||
}
|
||||
|
||||
#[cfg(feature = "equinix_tests")]
|
||||
#[get("/api/run_btest")]
|
||||
pub async fn run_btest() -> NoCache<RawJson<String>> {
|
||||
|
||||
@@ -68,6 +68,9 @@
|
||||
<li class="nav-item" role="presentation">
|
||||
<button class="nav-link" id="pills-funnel-tab" data-bs-toggle="pill" data-bs-target="#pills-funnel" type="button" role="tab" aria-controls="pills-funnel" aria-selected="false">Queue Funnel</button>
|
||||
</li>
|
||||
<li class="nav-item" role="presentation">
|
||||
<button class="nav-link" id="pills-flows-tab" data-bs-toggle="pill" data-bs-target="#pills-flows" type="button" role="tab" aria-controls="pills-flows" aria-selected="false">Flows</button>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
@@ -184,6 +187,19 @@
|
||||
|
||||
<div class="tab-pane fade" id="pills-funnel" role="tabpanel" aria-labelledby="pills-funnel-tab" tabindex="2">
|
||||
</div>
|
||||
|
||||
<div class="tab-pane fade" id="pills-flows" role="tabpanel" aria-labelledby="pills-flows-tab" tabindex="3">
|
||||
<div class="row">
|
||||
<div class="col-sm12">
|
||||
<div class="card bg-light">
|
||||
<div class="card-body">
|
||||
<h5 class="card-title"><i class="fa fa-bar-chart"></i> Flows (Last 30 Seconds)</h5>
|
||||
<div id="flowList"></div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -364,6 +380,8 @@
|
||||
setTimeout(pollQueue, 1000);
|
||||
}
|
||||
|
||||
let ips = [];
|
||||
|
||||
function getThroughput() {
|
||||
const params = new Proxy(new URLSearchParams(window.location.search), {
|
||||
get: (searchParams, prop) => searchParams.get(prop),
|
||||
@@ -376,8 +394,10 @@
|
||||
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
|
||||
];
|
||||
|
||||
ips = [];
|
||||
for (let i=0; i<data.length; i++) {
|
||||
let ip = data[i][0];
|
||||
ips.push(ip);
|
||||
let down = data[i][1];
|
||||
let up = data[i][2];
|
||||
|
||||
@@ -510,6 +530,30 @@
|
||||
setTimeout(plotFunnels, 1000);
|
||||
}
|
||||
|
||||
function getFlows() {
|
||||
let ip_list = "";
|
||||
for (let i=0; i<ips.length; ++i) {
|
||||
ip_list += ips[i] + ",";
|
||||
}
|
||||
ip_list = ip_list.substring(0, ip_list.length-1);
|
||||
$.get("/api/flows/" + ip_list, (data) => {
|
||||
let html = "<table class='table table-striped'>";
|
||||
html += "<thead><th>Protocol</th><th>Flow</th><th>Packets</th><th>Bytes</th></thead><tbody>";
|
||||
for (let i=0; i<data.length; i++) {
|
||||
html += "<tr>";
|
||||
html += "<td>" + data[i].proto + "</td>";
|
||||
html += "<td>" + data[i].src + ":" + data[i].src_port + "▶️" + data[i].dst + ":" + data[i].dst_port + "</td>";
|
||||
html += "<td>" + data[i].packets + "</td>";
|
||||
html += "<td>" + scaleNumber(data[i].bytes) + "</td>";
|
||||
html += "<td>" + data[i].tos + "</td>";
|
||||
html += "</tr>";
|
||||
}
|
||||
html += "</tbody></table>";
|
||||
$("#flowList").html(html);
|
||||
})
|
||||
setTimeout(getFlows, 1000);
|
||||
}
|
||||
|
||||
function start() {
|
||||
colorReloadButton();
|
||||
updateHostCounts();
|
||||
@@ -520,6 +564,7 @@
|
||||
pollQueue();
|
||||
getThroughput();
|
||||
getFunnel(params.id);
|
||||
getFlows();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -276,7 +276,9 @@ static __always_inline struct tcphdr *get_tcp_header(struct dissector_t *dissect
|
||||
if (dissector->eth_type == ETH_P_IP)
|
||||
{
|
||||
return (struct tcphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
|
||||
} else if (dissector->eth_type == ETH_P_IPV6) {
|
||||
}
|
||||
else if (dissector->eth_type == ETH_P_IPV6)
|
||||
{
|
||||
return (struct tcphdr *)(dissector->ip_header.ip6h + 1);
|
||||
}
|
||||
return NULL;
|
||||
@@ -287,17 +289,22 @@ static __always_inline struct udphdr *get_udp_header(struct dissector_t *dissect
|
||||
if (dissector->eth_type == ETH_P_IP)
|
||||
{
|
||||
return (struct udphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
|
||||
} else if (dissector->eth_type == ETH_P_IPV6) {
|
||||
}
|
||||
else if (dissector->eth_type == ETH_P_IPV6)
|
||||
{
|
||||
return (struct udphdr *)(dissector->ip_header.ip6h + 1);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static __always_inline struct icmphdr * get_icmp_header(struct dissector_t * dissector) {
|
||||
static __always_inline struct icmphdr *get_icmp_header(struct dissector_t *dissector)
|
||||
{
|
||||
if (dissector->eth_type == ETH_P_IP)
|
||||
{
|
||||
return (struct icmphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
|
||||
} else if (dissector->eth_type == ETH_P_IPV6) {
|
||||
}
|
||||
else if (dissector->eth_type == ETH_P_IPV6)
|
||||
{
|
||||
return (struct icmphdr *)(dissector->ip_header.ip6h + 1);
|
||||
}
|
||||
return NULL;
|
||||
@@ -312,7 +319,7 @@ static __always_inline void snoop(struct dissector_t *dissector)
|
||||
struct tcphdr *hdr = get_tcp_header(dissector);
|
||||
if (hdr != NULL)
|
||||
{
|
||||
if (hdr + sizeof(struct tcphdr) > dissector->end)
|
||||
if (hdr + 1 > dissector->end)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -326,7 +333,7 @@ static __always_inline void snoop(struct dissector_t *dissector)
|
||||
struct udphdr *hdr = get_udp_header(dissector);
|
||||
if (hdr != NULL)
|
||||
{
|
||||
if (hdr + sizeof(struct udphdr) > dissector->end)
|
||||
if (hdr + 1 > dissector->end)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -339,14 +346,15 @@ static __always_inline void snoop(struct dissector_t *dissector)
|
||||
struct icmphdr *hdr = get_icmp_header(dissector);
|
||||
if (hdr != NULL)
|
||||
{
|
||||
if (hdr + sizeof(struct icmphdr) > dissector->end)
|
||||
if (hdr + 1 > dissector->end)
|
||||
{
|
||||
return;
|
||||
}
|
||||
dissector->src_port = hdr->type;
|
||||
dissector->dst_port = hdr->code;
|
||||
}
|
||||
} break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,19 +32,22 @@ struct
|
||||
__uint(pinning, LIBBPF_PIN_BY_NAME);
|
||||
} palantir SEC(".maps");
|
||||
|
||||
static __always_inline void update_palantir(struct dissector_t * dissector, __u32 size) {
|
||||
static __always_inline void update_palantir(struct dissector_t * dissector, __u32 size, int dir) {
|
||||
if (dissector->src_port == 0 || dissector->dst_port == 0) return;
|
||||
struct palantir_key key = {0};
|
||||
key.src = dissector->src_ip;
|
||||
key.dst = dissector->dst_ip;
|
||||
key.ip_protocol = dissector->ip_protocol;
|
||||
key.src_port = dissector->src_port;
|
||||
key.dst_port = dissector->dst_port;
|
||||
key.src_port = bpf_ntohs(dissector->src_port);
|
||||
key.dst_port = bpf_ntohs(dissector->dst_port);
|
||||
struct palantir_data * counter = (struct palantir_data *)bpf_map_lookup_elem(&palantir, &key);
|
||||
if (counter) {
|
||||
counter->last_seen = bpf_ktime_get_boot_ns();
|
||||
counter->packets += 1;
|
||||
counter->bytes += size;
|
||||
counter->tos = dissector->tos;
|
||||
if (dissector->tos != 0) {
|
||||
counter->tos = dissector->tos;
|
||||
}
|
||||
} else {
|
||||
struct palantir_data counter = {0};
|
||||
counter.last_seen = bpf_ktime_get_boot_ns();
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
#include "common/bifrost.h"
|
||||
#include "common/palantir.h"
|
||||
|
||||
//#define VERBOSE 1
|
||||
|
||||
/* Theory of operation:
|
||||
1. (Packet arrives at interface)
|
||||
2. XDP (ingress) starts
|
||||
@@ -73,7 +75,7 @@ int xdp_prog(struct xdp_md *ctx)
|
||||
&bifrost_interface_map,
|
||||
&my_interface
|
||||
);
|
||||
if (redirect_info) {
|
||||
if (redirect_info && redirect_info->scan_vlans) {
|
||||
// If we have a redirect, mark it - the dissector will
|
||||
// apply it
|
||||
vlan_redirect = true;
|
||||
@@ -128,10 +130,16 @@ int xdp_prog(struct xdp_md *ctx)
|
||||
ctx->data_end - ctx->data, // end - data = length
|
||||
tc_handle
|
||||
);
|
||||
update_palantir(&dissector, ctx->data_end - ctx->data);
|
||||
|
||||
|
||||
// Send on its way
|
||||
if (tc_handle != 0) {
|
||||
// Send data to Palantir
|
||||
#ifdef VERBOSE
|
||||
bpf_debug("(XDP) Storing Palantir Data");
|
||||
#endif
|
||||
update_palantir(&dissector, ctx->data_end - ctx->data, effective_direction);
|
||||
|
||||
// Handle CPU redirection if there is one specified
|
||||
__u32 *cpu_lookup;
|
||||
cpu_lookup = bpf_map_lookup_elem(&cpus_available, &cpu);
|
||||
|
||||
@@ -26,6 +26,7 @@ use signal_hook::{
|
||||
iterator::Signals,
|
||||
};
|
||||
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED};
|
||||
use throughput_tracker::get_flow_stats;
|
||||
use tokio::join;
|
||||
mod stats;
|
||||
|
||||
@@ -189,6 +190,9 @@ fn handle_bus_requests(
|
||||
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
BusRequest::GetFlowStats(ip) => {
|
||||
get_flow_stats(ip)
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod throughput_entry;
|
||||
mod tracking_data;
|
||||
mod palantir_data;
|
||||
pub use palantir_data::get_flow_stats;
|
||||
use crate::{
|
||||
shaped_devices_tracker::NETWORK_JSON,
|
||||
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::time::Duration;
|
||||
use std::{time::Duration, net::IpAddr};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use lqos_sys::{PalantirData, PalantirKey};
|
||||
use lqos_bus::{BusResponse, FlowTransport};
|
||||
use lqos_sys::{PalantirData, PalantirKey, XdpIpAddress};
|
||||
use lqos_utils::unix_time::time_since_boot;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -19,6 +20,7 @@ pub(crate) struct FlowData {
|
||||
last_seen: u64,
|
||||
bytes: u64,
|
||||
packets: u64,
|
||||
tos: u8,
|
||||
}
|
||||
|
||||
impl PalantirMonitor {
|
||||
@@ -32,6 +34,7 @@ impl PalantirMonitor {
|
||||
values.iter().for_each(|v| {
|
||||
result.bytes += v.bytes;
|
||||
result.packets += v.packets;
|
||||
result.tos += v.tos;
|
||||
if v.last_seen > ls {
|
||||
ls = v.last_seen;
|
||||
}
|
||||
@@ -41,14 +44,19 @@ impl PalantirMonitor {
|
||||
}
|
||||
|
||||
pub(crate) fn ingest(&self, key: &PalantirKey, values: &[PalantirData]) {
|
||||
if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() {
|
||||
//println!("{key:?}");
|
||||
//println!("{values:?}");
|
||||
if let Some(expire_ns) = Self::get_expire_time() {
|
||||
let combined = Self::combine_flows(values);
|
||||
if combined.last_seen > five_minutes_ago_nanoseconds {
|
||||
if combined.last_seen > expire_ns {
|
||||
if let Some(mut flow) = self.data.get_mut(key) {
|
||||
// Update
|
||||
flow.bytes += combined.bytes;
|
||||
flow.packets += combined.packets;
|
||||
flow.bytes = combined.bytes;
|
||||
flow.packets = combined.packets;
|
||||
flow.last_seen = combined.last_seen;
|
||||
if combined.tos != 0 {
|
||||
flow.tos = combined.tos;
|
||||
}
|
||||
} else {
|
||||
// Insert
|
||||
self.data.insert(key.clone(), combined);
|
||||
@@ -62,18 +70,49 @@ impl PalantirMonitor {
|
||||
if let Ok(boot_time) = boot_time {
|
||||
let time_since_boot = Duration::from(boot_time);
|
||||
let five_minutes_ago =
|
||||
time_since_boot.saturating_sub(Duration::from_secs(300));
|
||||
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos() as u64;
|
||||
Some(five_minutes_ago_nanoseconds)
|
||||
time_since_boot.saturating_sub(Duration::from_secs(30));
|
||||
let expire_ns = five_minutes_ago.as_nanos() as u64;
|
||||
Some(expire_ns)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn expire(&self) {
|
||||
if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() {
|
||||
self.data.retain(|_k, v| v.last_seen > five_minutes_ago_nanoseconds);
|
||||
if let Some(expire_ns) = Self::get_expire_time() {
|
||||
self.data.retain(|_k, v| v.last_seen > expire_ns);
|
||||
}
|
||||
FLOWS_TRACKED.store(self.data.len() as u64, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_flow_stats(ip: &str) -> BusResponse {
|
||||
let ip = ip.parse::<IpAddr>();
|
||||
if let Ok(ip) = ip {
|
||||
let ip = XdpIpAddress::from_ip(ip);
|
||||
let mut result = Vec::new();
|
||||
|
||||
for value in PALANTIR.data.iter() {
|
||||
let key = value.key();
|
||||
if key.src_ip == ip || key.dst_ip == ip {
|
||||
result.push(FlowTransport{
|
||||
src: key.src_ip.as_ip().to_string(),
|
||||
dst: key.dst_ip.as_ip().to_string(),
|
||||
src_port: key.src_port,
|
||||
dst_port: key.dst_port,
|
||||
proto: match key.ip_protocol {
|
||||
6 => lqos_bus::FlowProto::TCP,
|
||||
17 => lqos_bus::FlowProto::UDP,
|
||||
_ => lqos_bus::FlowProto::ICMP,
|
||||
},
|
||||
bytes: value.bytes,
|
||||
packets: value.packets,
|
||||
tos: value.tos,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return BusResponse::FlowData(result);
|
||||
}
|
||||
BusResponse::Fail("No Stats or bad IP".to_string())
|
||||
}
|
||||
Reference in New Issue
Block a user