mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2024-11-29 11:23:49 -06:00
138 lines
4.4 KiB
Python
138 lines
4.4 KiB
Python
import os
|
|
import subprocess
|
|
from subprocess import PIPE
|
|
import io
|
|
import decimal
|
|
import json
|
|
from ispConfig import fqOrCAKE, interfaceA, interfaceB, influxDBBucket, influxDBOrg, influxDBtoken, influxDBurl, ppingLocation
|
|
from datetime import date, datetime, timedelta
|
|
import decimal
|
|
from influxdb_client import InfluxDBClient, Point, Dialect
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
import dateutil.parser
|
|
|
|
def getLatencies(subscriberCircuits, secondsToRun):
|
|
interfaces = [interfaceA, interfaceB]
|
|
tcpLatency = 0
|
|
listOfAllDiffs = []
|
|
maxLatencyRecordable = 200
|
|
matchableIPs = []
|
|
for circuit in subscriberCircuits:
|
|
for device in circuit['devices']:
|
|
matchableIPs.append(device['ipv4'])
|
|
|
|
rttDict = {}
|
|
jitterDict = {}
|
|
#for interface in interfaces:
|
|
command = "./pping -i " + interfaceA + " -s " + str(secondsToRun) + " -m"
|
|
commands = command.split(' ')
|
|
wd = ppingLocation
|
|
tcShowResults = subprocess.run(command, shell=True, cwd=wd,stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode('utf-8').splitlines()
|
|
for line in tcShowResults:
|
|
if len(line) > 59:
|
|
rtt1 = float(line[18:27])*1000
|
|
rtt2 = float(line[27:36]) *1000
|
|
toAndFrom = line[38:].split(' ')[3]
|
|
fromIP = toAndFrom.split('+')[0].split(':')[0]
|
|
toIP = toAndFrom.split('+')[1].split(':')[0]
|
|
matchedIP = ''
|
|
if fromIP in matchableIPs:
|
|
matchedIP = fromIP
|
|
elif toIP in matchableIPs:
|
|
matchedIP = toIP
|
|
jitter = rtt1 - rtt2
|
|
#Cap ceil
|
|
if rtt1 >= maxLatencyRecordable:
|
|
rtt1 = 200
|
|
#Lowest observed rtt
|
|
if matchedIP in rttDict:
|
|
if rtt1 < rttDict[matchedIP]:
|
|
rttDict[matchedIP] = rtt1
|
|
jitterDict[matchedIP] = jitter
|
|
else:
|
|
rttDict[matchedIP] = rtt1
|
|
jitterDict[matchedIP] = jitter
|
|
for circuit in subscriberCircuits:
|
|
for device in circuit['devices']:
|
|
diffsForThisDevice = []
|
|
if device['ipv4'] in rttDict:
|
|
device['tcpLatency'] = rttDict[device['ipv4']]
|
|
else:
|
|
device['tcpLatency'] = None
|
|
if device['ipv4'] in jitterDict:
|
|
device['tcpJitter'] = jitterDict[device['ipv4']]
|
|
else:
|
|
device['tcpJitter'] = None
|
|
return subscriberCircuits
|
|
|
|
def getParentNodeStats(parentNodes, subscriberCircuits):
|
|
for parentNode in parentNodes:
|
|
acceptableLatencies = []
|
|
for circuit in subscriberCircuits:
|
|
for device in circuit['devices']:
|
|
if device['ParentNode'] == parentNode['parentNodeName']:
|
|
if device['tcpLatency'] != None:
|
|
acceptableLatencies.append(device['tcpLatency'])
|
|
|
|
if len(acceptableLatencies) > 0:
|
|
parentNode['tcpLatency'] = sum(acceptableLatencies)/len(acceptableLatencies)
|
|
else:
|
|
parentNode['tcpLatency'] = None
|
|
return parentNodes
|
|
|
|
def refreshLatencyGraphs(secondsToRun):
|
|
startTime = datetime.now()
|
|
with open('statsByParentNode.json', 'r') as j:
|
|
parentNodes = json.loads(j.read())
|
|
|
|
with open('statsByCircuit.json', 'r') as j:
|
|
subscriberCircuits = json.loads(j.read())
|
|
|
|
print("Retrieving circuit statistics")
|
|
subscriberCircuits = getLatencies(subscriberCircuits, secondsToRun)
|
|
|
|
print("Computing parent node statistics")
|
|
parentNodes = getParentNodeStats(parentNodes, devices)
|
|
|
|
print("Writing data to InfluxDB")
|
|
bucket = influxDBBucket
|
|
org = influxDBOrg
|
|
token = influxDBtoken
|
|
url = influxDBurl
|
|
client = InfluxDBClient(
|
|
url=url,
|
|
token=token,
|
|
org=org
|
|
)
|
|
write_api = client.write_api(write_options=SYNCHRONOUS)
|
|
|
|
queriesToSend = []
|
|
|
|
for circuit in subscriberCircuits:
|
|
for device in circuit['devices']:
|
|
if device['tcpLatency'] != None:
|
|
p = Point('Latency').tag("Device", device['deviceName']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("TCP Latency", device['tcpLatency'])
|
|
queriesToSend.append(p)
|
|
|
|
for parentNode in parentNodes:
|
|
if parentNode['tcpLatency'] != None:
|
|
p = Point('Latency').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("TCP Latency", parentNode['tcpLatency'])
|
|
queriesToSend.append(p)
|
|
|
|
write_api.write(bucket=bucket, record=queriesToSend)
|
|
print("Added " + str(len(queriesToSend)) + " points to InfluxDB.")
|
|
client.close()
|
|
|
|
#with open('statsByParentNode.json', 'w') as infile:
|
|
# json.dump(parentNodes, infile)
|
|
|
|
#with open('statsByDevice.json', 'w') as infile:
|
|
# json.dump(devices, infile)
|
|
|
|
endTime = datetime.now()
|
|
durationSeconds = round((endTime - startTime).total_seconds())
|
|
print("Graphs updated within " + str(durationSeconds) + " seconds.")
|
|
|
|
if __name__ == '__main__':
|
|
refreshLatencyGraphs(10)
|