Merge pull request #84 from khandieyea/dev

minor tidy, but chunk `devices` and send to influx in groups. 20k Poi…
This commit is contained in:
Robert Chacón 2022-09-12 01:14:31 -06:00 committed by GitHub
commit f24267fac6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,64 +20,61 @@ def getInterfaceStats(interface):
return jsonDict
def chunk_list(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def getDeviceStats(devices):
interfaces = [interfaceA, interfaceB]
for interface in interfaces:
tcShowResults = getInterfaceStats(interface)
if interface == interfaceA:
interfaceAjson = tcShowResults
else:
interfaceBjson = tcShowResults
ifaceStats = list(map(getInterfaceStats, interfaces))
for device in devices:
if 'timeQueried' in device:
device['priorQueryTime'] = device['timeQueried']
for interface in interfaces:
if interface == interfaceA:
jsonVersion = interfaceAjson
else:
jsonVersion = interfaceBjson
for (interface, stats, dirSuffix) in zip(interfaces, ifaceStats, ['Download', 'Upload']):
element = jsonVersion[device['qdisc']] if device['qdisc'] in jsonVersion else False
element = stats[device['qdisc']] if device['qdisc'] in stats else False
if element:
bytesSent = int(element['bytes'])
drops = int(element['drops'])
packets = int(element['packets'])
bytesSent = int(element['bytes'])
if interface == interfaceA:
if 'bytesSentDownload' in device:
device['priorQueryBytesDownload'] = device['bytesSentDownload']
device['bytesSentDownload'] = bytesSent
else:
if 'bytesSentUpload' in device:
device['priorQueryBytesUpload'] = device['bytesSentUpload']
device['bytesSentUpload'] = bytesSent
if 'bytesSent' + dirSuffix in device:
device['priorQueryBytes' + dirSuffix] = device['bytesSent' + dirSuffix]
device['bytesSent' + dirSuffix] = bytesSent
if 'dropsSent' + dirSuffix in device:
device['priorDropsSent' + dirSuffix] = device['dropsSent' + dirSuffix]
device['dropsSent' + dirSuffix] = drops
if 'packetsSent' + dirSuffix in device:
device['priorPacketsSent' + dirSuffix] = device['packetsSent' + dirSuffix]
device['packetsSent' + dirSuffix] = packets
device['timeQueried'] = datetime.now().isoformat()
for device in devices:
device['bitsDownloadSinceLastQuery'] = device['bitsUploadSinceLastQuery'] = 0
if 'priorQueryTime' in device:
try:
bytesDLSinceLastQuery = device['bytesSentDownload'] - device['priorQueryBytesDownload']
bytesULSinceLastQuery = device['bytesSentUpload'] - device['priorQueryBytesUpload']
except:
bytesDLSinceLastQuery = 0
bytesULSinceLastQuery = 0
bytesDLSinceLastQuery = bytesULSinceLastQuery = 0
currentQueryTime = datetime.fromisoformat(device['timeQueried'])
priorQueryTime = datetime.fromisoformat(device['priorQueryTime'])
delta = currentQueryTime - priorQueryTime
deltaSeconds = delta.total_seconds()
if deltaSeconds > 0:
bitsDownload = round((((bytesDLSinceLastQuery*8))/deltaSeconds))
bitsUpload = round((((bytesULSinceLastQuery*8))/deltaSeconds))
else:
bitsDownload = 0
bitsUpload = 0
device['bitsDownloadSinceLastQuery'] = bitsDownload
device['bitsUploadSinceLastQuery'] = bitsUpload
else:
device['bitsDownloadSinceLastQuery'] = 0
device['bitsUploadSinceLastQuery'] = 0
return (devices)
deltaSeconds = (currentQueryTime - priorQueryTime).total_seconds()
device['bitsDownloadSinceLastQuery'] = round(
((bytesDLSinceLastQuery * 8) / deltaSeconds)) if deltaSeconds > 0 else 0
device['bitsUploadSinceLastQuery'] = round(
((bytesULSinceLastQuery * 8) / deltaSeconds)) if deltaSeconds > 0 else 0
return devices
def getParentNodeStats(parentNodes, devices):
for parentNode in parentNodes:
@ -92,6 +89,7 @@ def getParentNodeStats(parentNodes, devices):
parentNode['bitsUploadSinceLastQuery'] = thisNodeBitsUpload
return parentNodes
def getParentNodeDict(data, depth, parentNodeNameDict):
if parentNodeNameDict == None:
parentNodeNameDict = {}
@ -104,6 +102,7 @@ def getParentNodeDict(data, depth, parentNodeNameDict):
parentNodeNameDict = dict(parentNodeNameDict, **tempDict)
return parentNodeNameDict
def parentNodeNameDictPull():
# Load network heirarchy
with open('network.json', 'r') as j:
@ -126,34 +125,43 @@ def refreshBandwidthGraphs():
print("Computing parent node statistics")
parentNodes = getParentNodeStats(parentNodes, devices)
print("Writing data to InfluxDB")
bucket = influxDBBucket
org = influxDBOrg
token = influxDBtoken
url=influxDBurl
client = InfluxDBClient(
url=url,
token=token,
org=org
url=influxDBurl,
token=influxDBtoken,
org=influxDBOrg
)
write_api = client.write_api(write_options=SYNCHRONOUS)
chunkedDevices = list(chunk_list(devices, 200))
queriesToSendCount = 0
for chunk in chunkedDevices:
queriesToSend = []
for device in devices:
for device in chunk:
bitsDownload = int(device['bitsDownloadSinceLastQuery'])
bitsUpload = int(device['bitsUploadSinceLastQuery'])
if (bitsDownload > 0) and (bitsUpload > 0):
percentUtilizationDownload = round((bitsDownload / round(device['downloadMax'] * 1000000)), 4)
percentUtilizationUpload = round((bitsUpload / round(device['uploadMax'] * 1000000)), 4)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", bitsDownload)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag(
"Type", "Device").field("Download", bitsDownload)
queriesToSend.append(p)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", bitsUpload)
p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag(
"Type", "Device").field("Upload", bitsUpload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", percentUtilizationDownload)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag(
"Type", "Device").field("Download", percentUtilizationDownload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", percentUtilizationUpload)
p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag(
"Type", "Device").field("Upload", percentUtilizationUpload)
queriesToSend.append(p)
write_api.write(bucket=influxDBBucket, record=queriesToSend)
# print("Added " + str(len(queriesToSend)) + " points to InfluxDB.")
queriesToSendCount += len(queriesToSend)
queriesToSend = []
for parentNode in parentNodes:
bitsDownload = int(parentNode['bitsDownloadSinceLastQuery'])
bitsUpload = int(parentNode['bitsUploadSinceLastQuery'])
@ -161,24 +169,37 @@ def refreshBandwidthGraphs():
percentUtilizationDownload = round((bitsDownload / round(parentNode['downloadMax'] * 1000000)), 4)
percentUtilizationUpload = round((bitsUpload / round(parentNode['uploadMax'] * 1000000)), 4)
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", bitsDownload)
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode",
parentNode['parentNodeName']).tag(
"Type", "Parent Node").field("Download", bitsDownload)
queriesToSend.append(p)
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", bitsUpload)
p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode",
parentNode['parentNodeName']).tag(
"Type", "Parent Node").field("Upload", bitsUpload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", percentUtilizationDownload)
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode",
parentNode['parentNodeName']).tag(
"Type", "Parent Node").field("Download", percentUtilizationDownload)
queriesToSend.append(p)
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", percentUtilizationUpload)
p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode",
parentNode['parentNodeName']).tag(
"Type", "Parent Node").field("Upload", percentUtilizationUpload)
queriesToSend.append(p)
write_api.write(bucket=bucket, record=queriesToSend)
print("Added " + str(len(queriesToSend)) + " points to InfluxDB.")
write_api.write(bucket=influxDBBucket, record=queriesToSend)
# print("Added " + str(len(queriesToSend)) + " points to InfluxDB.")
queriesToSendCount += len(queriesToSend)
print("Added " + str(queriesToSendCount) + " points to InfluxDB.")
client.close()
with open('statsByParentNode.json', 'w') as infile:
json.dump(parentNodes, infile)
with open('statsByDevice.json', 'w') as infile:
json.dump(devices, infile)
endTime = datetime.now()
durationSeconds = round((endTime - startTime).total_seconds(), 2)
print("Graphs updated within " + str(durationSeconds) + " seconds.")