Merge pull request #83 from khandieyea/bwgraph-perf

move tc data to dicts, stop large amount of iterations at scale
This commit is contained in:
Robert Chacón 2022-09-11 19:17:00 -06:00 committed by GitHub
commit 14ff572235
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,178 +1,187 @@
import os
import subprocess import subprocess
from subprocess import PIPE
import io
import decimal
import json import json
from ispConfig import fqOrCAKE, interfaceA, interfaceB, influxDBBucket, influxDBOrg, influxDBtoken, influxDBurl import subprocess
from datetime import date, datetime, timedelta from datetime import datetime
import decimal
from influxdb_client import InfluxDBClient, Point, Dialect from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
import dateutil.parser
from ispConfig import interfaceA, interfaceB, influxDBBucket, influxDBOrg, influxDBtoken, influxDBurl
def getInterfaceStats(interface):
command = 'tc -j -s qdisc show dev ' + interface
jsonAr = json.loads(subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8'))
jsonDict = {}
for element in filter(lambda e: 'parent' in e, jsonAr):
flowID = ':'.join(map(lambda p: f'0x{p}', element['parent'].split(':')[0:2]))
jsonDict[flowID] = element
del jsonAr
return jsonDict
def getDeviceStats(devices): def getDeviceStats(devices):
interfaces = [interfaceA, interfaceB] interfaces = [interfaceA, interfaceB]
for interface in interfaces: for interface in interfaces:
command = 'tc -j -s qdisc show dev ' + interface tcShowResults = getInterfaceStats(interface)
commands = command.split(' ') if interface == interfaceA:
tcShowResults = subprocess.run(commands, stdout=subprocess.PIPE).stdout.decode('utf-8') interfaceAjson = tcShowResults
if interface == interfaceA: else:
interfaceAjson = json.loads(tcShowResults) interfaceBjson = tcShowResults
else:
interfaceBjson = json.loads(tcShowResults) for device in devices:
for device in devices: if 'timeQueried' in device:
if 'timeQueried' in device: device['priorQueryTime'] = device['timeQueried']
device['priorQueryTime'] = device['timeQueried'] for interface in interfaces:
for interface in interfaces: if interface == interfaceA:
if interface == interfaceA: jsonVersion = interfaceAjson
jsonVersion = interfaceAjson else:
else: jsonVersion = interfaceBjson
jsonVersion = interfaceBjson
for element in jsonVersion: element = jsonVersion[device['qdisc']] if device['qdisc'] in jsonVersion else False
if "parent" in element:
parentFixed = '0x' + element['parent'].split(':')[0] + ':' + '0x' + element['parent'].split(':')[1] if element:
if parentFixed == device['qdisc']: drops = int(element['drops'])
drops = int(element['drops']) packets = int(element['packets'])
packets = int(element['packets']) bytesSent = int(element['bytes'])
bytesSent = int(element['bytes']) if interface == interfaceA:
if interface == interfaceA: if 'bytesSentDownload' in device:
if 'bytesSentDownload' in device: device['priorQueryBytesDownload'] = device['bytesSentDownload']
device['priorQueryBytesDownload'] = device['bytesSentDownload'] device['bytesSentDownload'] = bytesSent
device['bytesSentDownload'] = bytesSent else:
else: if 'bytesSentUpload' in device:
if 'bytesSentUpload' in device: device['priorQueryBytesUpload'] = device['bytesSentUpload']
device['priorQueryBytesUpload'] = device['bytesSentUpload'] device['bytesSentUpload'] = bytesSent
device['bytesSentUpload'] = bytesSent
device['timeQueried'] = datetime.now().isoformat() device['timeQueried'] = datetime.now().isoformat()
for device in devices: for device in devices:
if 'priorQueryTime' in device: if 'priorQueryTime' in device:
try: try:
bytesDLSinceLastQuery = device['bytesSentDownload'] - device['priorQueryBytesDownload'] bytesDLSinceLastQuery = device['bytesSentDownload'] - device['priorQueryBytesDownload']
bytesULSinceLastQuery = device['bytesSentUpload'] - device['priorQueryBytesUpload'] bytesULSinceLastQuery = device['bytesSentUpload'] - device['priorQueryBytesUpload']
except: except:
bytesDLSinceLastQuery = 0 bytesDLSinceLastQuery = 0
bytesULSinceLastQuery = 0 bytesULSinceLastQuery = 0
currentQueryTime = datetime.fromisoformat(device['timeQueried']) currentQueryTime = datetime.fromisoformat(device['timeQueried'])
priorQueryTime = datetime.fromisoformat(device['priorQueryTime']) priorQueryTime = datetime.fromisoformat(device['priorQueryTime'])
delta = currentQueryTime - priorQueryTime delta = currentQueryTime - priorQueryTime
deltaSeconds = delta.total_seconds() deltaSeconds = delta.total_seconds()
if deltaSeconds > 0: if deltaSeconds > 0:
bitsDownload = round((((bytesDLSinceLastQuery*8))/deltaSeconds)) bitsDownload = round((((bytesDLSinceLastQuery*8))/deltaSeconds))
bitsUpload = round((((bytesULSinceLastQuery*8))/deltaSeconds)) bitsUpload = round((((bytesULSinceLastQuery*8))/deltaSeconds))
else: else:
bitsDownload = 0 bitsDownload = 0
bitsUpload = 0 bitsUpload = 0
device['bitsDownloadSinceLastQuery'] = bitsDownload device['bitsDownloadSinceLastQuery'] = bitsDownload
device['bitsUploadSinceLastQuery'] = bitsUpload device['bitsUploadSinceLastQuery'] = bitsUpload
else: else:
device['bitsDownloadSinceLastQuery'] = 0 device['bitsDownloadSinceLastQuery'] = 0
device['bitsUploadSinceLastQuery'] = 0 device['bitsUploadSinceLastQuery'] = 0
return (devices) return (devices)
def getParentNodeStats(parentNodes, devices): def getParentNodeStats(parentNodes, devices):
for parentNode in parentNodes: for parentNode in parentNodes:
thisNodeBitsDownload = 0 thisNodeBitsDownload = 0
thisNodeBitsUpload = 0 thisNodeBitsUpload = 0
for device in devices: for device in devices:
if device['ParentNode'] == parentNode['parentNodeName']: if device['ParentNode'] == parentNode['parentNodeName']:
thisNodeBitsDownload += device['bitsDownloadSinceLastQuery'] thisNodeBitsDownload += device['bitsDownloadSinceLastQuery']
thisNodeBitsUpload += device['bitsUploadSinceLastQuery'] thisNodeBitsUpload += device['bitsUploadSinceLastQuery']
parentNode['bitsDownloadSinceLastQuery'] = thisNodeBitsDownload parentNode['bitsDownloadSinceLastQuery'] = thisNodeBitsDownload
parentNode['bitsUploadSinceLastQuery'] = thisNodeBitsUpload parentNode['bitsUploadSinceLastQuery'] = thisNodeBitsUpload
return parentNodes return parentNodes
def getParentNodeDict(data, depth, parentNodeNameDict): def getParentNodeDict(data, depth, parentNodeNameDict):
if parentNodeNameDict == None: if parentNodeNameDict == None:
parentNodeNameDict = {} parentNodeNameDict = {}
for elem in data: for elem in data:
if 'children' in data[elem]: if 'children' in data[elem]:
for child in data[elem]['children']: for child in data[elem]['children']:
parentNodeNameDict[child] = elem parentNodeNameDict[child] = elem
tempDict = getParentNodeDict(data[elem]['children'], depth+1, parentNodeNameDict) tempDict = getParentNodeDict(data[elem]['children'], depth+1, parentNodeNameDict)
parentNodeNameDict = dict(parentNodeNameDict, **tempDict) parentNodeNameDict = dict(parentNodeNameDict, **tempDict)
return parentNodeNameDict return parentNodeNameDict
def parentNodeNameDictPull(): def parentNodeNameDictPull():
#Load network heirarchy #Load network heirarchy
with open('network.json', 'r') as j: with open('network.json', 'r') as j:
network = json.loads(j.read()) network = json.loads(j.read())
parentNodeNameDict = getParentNodeDict(network, 0, None) parentNodeNameDict = getParentNodeDict(network, 0, None)
return parentNodeNameDict return parentNodeNameDict
def refreshBandwidthGraphs(): def refreshBandwidthGraphs():
startTime = datetime.now() startTime = datetime.now()
with open('statsByParentNode.json', 'r') as j: with open('statsByParentNode.json', 'r') as j:
parentNodes = json.loads(j.read()) parentNodes = json.loads(j.read())
with open('statsByDevice.json', 'r') as j:
devices = json.loads(j.read())
parentNodeNameDict = parentNodeNameDictPull()
print("Retrieving device statistics")
devices = getDeviceStats(devices)
print("Computing parent node statistics")
parentNodes = getParentNodeStats(parentNodes, devices)
print("Writing data to InfluxDB")
bucket = influxDBBucket
org = influxDBOrg
token = influxDBtoken
url=influxDBurl
client = InfluxDBClient(
url=url,
token=token,
org=org
)
write_api = client.write_api(write_options=SYNCHRONOUS)
queriesToSend = []
for device in devices:
bitsDownload = int(device['bitsDownloadSinceLastQuery'])
bitsUpload = int(device['bitsUploadSinceLastQuery'])
if (bitsDownload > 0) and (bitsUpload > 0):
percentUtilizationDownload = round((bitsDownload / round(device['downloadMax']*1000000)),4)
percentUtilizationUpload = round((bitsUpload / round(device['uploadMax']*1000000)),4)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", bitsDownload) with open('statsByDevice.json', 'r') as j:
queriesToSend.append(p) devices = json.loads(j.read())
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", bitsUpload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", percentUtilizationDownload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", percentUtilizationUpload)
queriesToSend.append(p)
for parentNode in parentNodes: parentNodeNameDict = parentNodeNameDictPull()
bitsDownload = int(parentNode['bitsDownloadSinceLastQuery'])
bitsUpload = int(parentNode['bitsUploadSinceLastQuery']) print("Retrieving device statistics")
if (bitsDownload > 0) and (bitsUpload > 0): devices = getDeviceStats(devices)
percentUtilizationDownload = round((bitsDownload / round(parentNode['downloadMax']*1000000)),4) print("Computing parent node statistics")
percentUtilizationUpload = round((bitsUpload / round(parentNode['uploadMax']*1000000)),4) parentNodes = getParentNodeStats(parentNodes, devices)
print("Writing data to InfluxDB")
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", bitsDownload) bucket = influxDBBucket
queriesToSend.append(p) org = influxDBOrg
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", bitsUpload) token = influxDBtoken
queriesToSend.append(p) url=influxDBurl
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", percentUtilizationDownload) client = InfluxDBClient(
queriesToSend.append(p) url=url,
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", percentUtilizationUpload) token=token,
queriesToSend.append(p) org=org
)
write_api = client.write_api(write_options=SYNCHRONOUS)
queriesToSend = []
for device in devices:
bitsDownload = int(device['bitsDownloadSinceLastQuery'])
bitsUpload = int(device['bitsUploadSinceLastQuery'])
if (bitsDownload > 0) and (bitsUpload > 0):
percentUtilizationDownload = round((bitsDownload / round(device['downloadMax']*1000000)),4)
percentUtilizationUpload = round((bitsUpload / round(device['uploadMax']*1000000)),4)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", bitsDownload)
queriesToSend.append(p)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", bitsUpload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", percentUtilizationDownload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", percentUtilizationUpload)
queriesToSend.append(p)
for parentNode in parentNodes:
bitsDownload = int(parentNode['bitsDownloadSinceLastQuery'])
bitsUpload = int(parentNode['bitsUploadSinceLastQuery'])
if (bitsDownload > 0) and (bitsUpload > 0):
percentUtilizationDownload = round((bitsDownload / round(parentNode['downloadMax']*1000000)),4)
percentUtilizationUpload = round((bitsUpload / round(parentNode['uploadMax']*1000000)),4)
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", bitsDownload)
queriesToSend.append(p)
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", bitsUpload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", percentUtilizationDownload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", percentUtilizationUpload)
queriesToSend.append(p)
write_api.write(bucket=bucket, record=queriesToSend)
print("Added " + str(len(queriesToSend)) + " points to InfluxDB.")
client.close()
with open('statsByParentNode.json', 'w') as infile:
json.dump(parentNodes, infile)
with open('statsByDevice.json', 'w') as infile:
json.dump(devices, infile)
endTime = datetime.now()
durationSeconds = round((endTime - startTime).total_seconds(),2)
print("Graphs updated within " + str(durationSeconds) + " seconds.")
write_api.write(bucket=bucket, record=queriesToSend)
print("Added " + str(len(queriesToSend)) + " points to InfluxDB.")
client.close()
with open('statsByParentNode.json', 'w') as infile:
json.dump(parentNodes, infile)
with open('statsByDevice.json', 'w') as infile:
json.dump(devices, infile)
endTime = datetime.now()
durationSeconds = round((endTime - startTime).total_seconds(),2)
print("Graphs updated within " + str(durationSeconds) + " seconds.")
if __name__ == '__main__': if __name__ == '__main__':
refreshBandwidthGraphs() refreshBandwidthGraphs()