First steps at making a licensing database that uses PostgreSQL for the cluster manager.

This commit is contained in:
Herbert Wolverson 2023-04-12 19:29:46 +00:00
parent 400c4fbee1
commit c0845b1e15
12 changed files with 1042 additions and 405 deletions

1302
src/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -29,4 +29,5 @@ members = [
"lqstats", # A CLI utility for retrieving long-term statistics
"long_term_stats/license_server", # Licensing Server for LibreQoS Long-term stats
"long_term_stats/lts_node", # Long-term stats cluster node
"long_term_stats/pgdb", # PostgreSQL interface for the LTS system
]

View File

@ -1,3 +1,14 @@
# Long Term Stats
Details go here.
We'd really rather you let us host your long-term statistics. It's a lot
of work, and gives us a revenue stream to keep building LibreQoS.
If you really want to self-host, setup is a bit convoluted - but we won't
stop you.
## PostgreSQL
* Install PostgreSQL somewhere on your network. You only want one PostgreSQL host per long-term node stats cluster.
* Setup the database schema (TBD).
* Put the connection string for your database in `/etc/lqdb` on each host.

View File

@ -11,3 +11,4 @@ log = "0"
serde_cbor = "0" # For RFC8949/7409 format C binary objects
serde = { version = "1.0", features = ["derive"] }
lqos_bus = { path = "../../lqos_bus" }
pgdb = { path = "../pgdb" }

View File

@ -1,4 +1,5 @@
use lqos_bus::long_term_stats::{LicenseCheck, LicenseReply};
use pgdb::sqlx::{Pool, Postgres};
use std::net::SocketAddr;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
@ -10,14 +11,23 @@ pub async fn start() -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9126").await?;
log::info!("Listening on :::9126");
let pool = pgdb::get_connection_pool(5).await;
if pool.is_err() {
log::error!("Unable to connect to the database");
log::error!("{pool:?}");
return Err(anyhow::Error::msg("Unable to connect to the database"));
}
let pool = pool.unwrap();
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
let pool = pool.clone();
spawn(async move {
let mut buf = vec![0u8; 10240];
if let Ok(bytes) = socket.read(&mut buf).await {
log::info!("Received {bytes} bytes from {address:?}");
match decode(&buf, address).await {
match decode(&buf, address, pool).await {
Err(e) => log::error!("{e:?}"),
Ok(reply) => {
let bytes = build_reply(&reply);
@ -38,7 +48,7 @@ pub async fn start() -> anyhow::Result<()> {
}
}
async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<LicenseReply> {
async fn decode(buf: &[u8], address: SocketAddr, pool: Pool<Postgres>) -> anyhow::Result<LicenseReply> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
@ -51,7 +61,7 @@ async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<LicenseReply>
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: LicenseCheck = serde_cbor::from_slice(&buf[start..end])?;
let license = check_license(&payload, address).await?;
let license = check_license(&payload, address, pool).await?;
Ok(license)
}
_ => {
@ -64,6 +74,7 @@ async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<LicenseReply>
async fn check_license(
request: &LicenseCheck,
address: SocketAddr,
pool: Pool<Postgres>,
) -> anyhow::Result<LicenseReply> {
log::info!("Checking license from {address:?}, key: {}", request.key);
if request.key == "test" {
@ -73,6 +84,19 @@ async fn check_license(
stats_host: "127.0.0.1:9127".to_string(), // Also temporary
})
} else {
match pgdb::get_stats_host_for_key(pool, &request.key).await {
Ok(host) => {
log::info!("License is valid");
return Ok(LicenseReply::Valid {
expiry: 0, // Temporary value
stats_host: host,
});
}
Err(e) => {
log::warn!("Unable to get stats host for key: {e:?}");
}
}
log::info!("License is denied");
Ok(LicenseReply::Denied)
}

View File

@ -0,0 +1,13 @@
[package]
name = "pgdb"
version = "0.1.0"
edition = "2021"
[dependencies]
once_cell = "1"
thiserror = "1"
env_logger = "0"
log = "0"
lqos_bus = { path = "../../lqos_bus" }
sqlx = { version = "0.7.0-alpha.2", default_features = false, features = [ "runtime-tokio", "tls-rustls", "postgres" ] }
futures = "0"

View File

@ -0,0 +1,37 @@
//! Manages access to the safely stored connection string, in `/etc/lqdb`.
//! Failure to obtain a database connection is a fatal error.
//! The connection string is read once, on the first call to `get_connection_string()`.
//! Please be careful to never include `/etc/lqdb` in any git commits.
use std::path::Path;
use std::fs::File;
use std::io::Read;
use once_cell::sync::Lazy;
pub static CONNECTION_STRING: Lazy<String> = Lazy::new(read_connection_string);
/// Read the connection string from /etc/lqdb
/// Called by the `Lazy` on CONNECTION_STRING
fn read_connection_string() -> String {
let path = Path::new("/etc/lqdb");
if !path.exists() {
log::error!("{} does not exist", path.display());
panic!("{} does not exist", path.display());
}
match File::open(path) {
Ok(mut file) => {
let mut buf = String::new();
if let Ok(_size) = file.read_to_string(&mut buf) {
buf
} else {
log::error!("Could not read {}", path.display());
panic!("Could not read {}", path.display());
}
}
Err(e) => {
log::error!("Could not open {}: {e:?}", path.display());
panic!("Could not open {}: {e:?}", path.display());
}
}
}

View File

@ -0,0 +1,4 @@
mod connection_string;
mod pool;
pub use pool::get_connection_pool;

View File

@ -0,0 +1,13 @@
use sqlx::{postgres::PgPoolOptions, Postgres, Pool};
use super::connection_string::CONNECTION_STRING;
/// Obtain a connection pool to the database.
///
/// # Arguments
/// * `max_connections` - The maximum number of connections to the database.
pub async fn get_connection_pool(max_connections: u32) -> Result<Pool<Postgres>, sqlx::Error> {
PgPoolOptions::new()
.max_connections(max_connections)
.connect(&CONNECTION_STRING)
.await
}

View File

@ -0,0 +1,9 @@
mod connection;
mod license;
pub mod sqlx {
pub use sqlx::*;
}
pub use connection::get_connection_pool;
pub use license::get_stats_host_for_key;

View File

@ -0,0 +1,22 @@
//! Handles license checks from the `license_server`.
use sqlx::{Pool, Postgres, Row};
use thiserror::Error;
pub async fn get_stats_host_for_key(cnn: Pool<Postgres>, key: &str) -> Result<String, StatsHostError> {
let row = sqlx::query("SELECT ip_address FROM licenses INNER JOIN stats_hosts ON stats_hosts.id = licenses.stats_host WHERE key=$1")
.bind(key)
.fetch_one(&cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let ip_address: &str = row.try_get("ip_address").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
log::info!("Found stats host for key: {}", ip_address);
Ok(ip_address.to_string())
}
#[derive(Debug, Error)]
pub enum StatsHostError {
#[error("Database error occurred")]
DatabaseError(String),
}

View File

@ -12,5 +12,5 @@ log = "0"
lqos_bus = { path = "../lqos_bus" }
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0" # For RFC8949/7409 format C binary objects
sqlite = "0"
sqlite = "0.30.4"
axum = "0.6"