From 91f573ad82d6706d10e22305be84ceca5bc5b639 Mon Sep 17 00:00:00 2001 From: Warren Date: Mon, 12 Sep 2022 17:05:55 +1200 Subject: [PATCH] minor tidy, but chunk `devices` and send to influx in groups. 20k Points in one payload can be :/ --- v1.1/graphBandwidth.py | 163 +++++++++++++++++++++++------------------ 1 file changed, 92 insertions(+), 71 deletions(-) diff --git a/v1.1/graphBandwidth.py b/v1.1/graphBandwidth.py index 80607d70..4c4aaa17 100644 --- a/v1.1/graphBandwidth.py +++ b/v1.1/graphBandwidth.py @@ -20,64 +20,61 @@ def getInterfaceStats(interface): return jsonDict +def chunk_list(l, n): + for i in range(0, len(l), n): + yield l[i:i + n] + + def getDeviceStats(devices): interfaces = [interfaceA, interfaceB] - for interface in interfaces: - tcShowResults = getInterfaceStats(interface) - if interface == interfaceA: - interfaceAjson = tcShowResults - else: - interfaceBjson = tcShowResults + ifaceStats = list(map(getInterfaceStats, interfaces)) for device in devices: if 'timeQueried' in device: device['priorQueryTime'] = device['timeQueried'] - for interface in interfaces: - if interface == interfaceA: - jsonVersion = interfaceAjson - else: - jsonVersion = interfaceBjson + for (interface, stats, dirSuffix) in zip(interfaces, ifaceStats, ['Download', 'Upload']): - element = jsonVersion[device['qdisc']] if device['qdisc'] in jsonVersion else False + element = stats[device['qdisc']] if device['qdisc'] in stats else False if element: + + bytesSent = int(element['bytes']) drops = int(element['drops']) packets = int(element['packets']) - bytesSent = int(element['bytes']) - if interface == interfaceA: - if 'bytesSentDownload' in device: - device['priorQueryBytesDownload'] = device['bytesSentDownload'] - device['bytesSentDownload'] = bytesSent - else: - if 'bytesSentUpload' in device: - device['priorQueryBytesUpload'] = device['bytesSentUpload'] - device['bytesSentUpload'] = bytesSent + + if 'bytesSent' + dirSuffix in device: + device['priorQueryBytes' + dirSuffix] = device['bytesSent' + dirSuffix] + device['bytesSent' + dirSuffix] = bytesSent + + if 'dropsSent' + dirSuffix in device: + device['priorDropsSent' + dirSuffix] = device['dropsSent' + dirSuffix] + device['dropsSent' + dirSuffix] = drops + + if 'packetsSent' + dirSuffix in device: + device['priorPacketsSent' + dirSuffix] = device['packetsSent' + dirSuffix] + device['packetsSent' + dirSuffix] = packets device['timeQueried'] = datetime.now().isoformat() for device in devices: + device['bitsDownloadSinceLastQuery'] = device['bitsUploadSinceLastQuery'] = 0 if 'priorQueryTime' in device: try: bytesDLSinceLastQuery = device['bytesSentDownload'] - device['priorQueryBytesDownload'] bytesULSinceLastQuery = device['bytesSentUpload'] - device['priorQueryBytesUpload'] except: - bytesDLSinceLastQuery = 0 - bytesULSinceLastQuery = 0 + bytesDLSinceLastQuery = bytesULSinceLastQuery = 0 + currentQueryTime = datetime.fromisoformat(device['timeQueried']) priorQueryTime = datetime.fromisoformat(device['priorQueryTime']) - delta = currentQueryTime - priorQueryTime - deltaSeconds = delta.total_seconds() - if deltaSeconds > 0: - bitsDownload = round((((bytesDLSinceLastQuery*8))/deltaSeconds)) - bitsUpload = round((((bytesULSinceLastQuery*8))/deltaSeconds)) - else: - bitsDownload = 0 - bitsUpload = 0 - device['bitsDownloadSinceLastQuery'] = bitsDownload - device['bitsUploadSinceLastQuery'] = bitsUpload - else: - device['bitsDownloadSinceLastQuery'] = 0 - device['bitsUploadSinceLastQuery'] = 0 - return (devices) + deltaSeconds = (currentQueryTime - priorQueryTime).total_seconds() + + device['bitsDownloadSinceLastQuery'] = round( + ((bytesDLSinceLastQuery * 8) / deltaSeconds)) if deltaSeconds > 0 else 0 + device['bitsUploadSinceLastQuery'] = round( + ((bytesULSinceLastQuery * 8) / deltaSeconds)) if deltaSeconds > 0 else 0 + + return devices + def getParentNodeStats(parentNodes, devices): for parentNode in parentNodes: @@ -92,6 +89,7 @@ def getParentNodeStats(parentNodes, devices): parentNode['bitsUploadSinceLastQuery'] = thisNodeBitsUpload return parentNodes + def getParentNodeDict(data, depth, parentNodeNameDict): if parentNodeNameDict == None: parentNodeNameDict = {} @@ -100,12 +98,13 @@ def getParentNodeDict(data, depth, parentNodeNameDict): if 'children' in data[elem]: for child in data[elem]['children']: parentNodeNameDict[child] = elem - tempDict = getParentNodeDict(data[elem]['children'], depth+1, parentNodeNameDict) + tempDict = getParentNodeDict(data[elem]['children'], depth + 1, parentNodeNameDict) parentNodeNameDict = dict(parentNodeNameDict, **tempDict) return parentNodeNameDict + def parentNodeNameDictPull(): - #Load network heirarchy + # Load network heirarchy with open('network.json', 'r') as j: network = json.loads(j.read()) parentNodeNameDict = getParentNodeDict(network, 0, None) @@ -126,61 +125,83 @@ def refreshBandwidthGraphs(): print("Computing parent node statistics") parentNodes = getParentNodeStats(parentNodes, devices) print("Writing data to InfluxDB") - bucket = influxDBBucket - org = influxDBOrg - token = influxDBtoken - url=influxDBurl client = InfluxDBClient( - url=url, - token=token, - org=org + url=influxDBurl, + token=influxDBtoken, + org=influxDBOrg ) write_api = client.write_api(write_options=SYNCHRONOUS) + chunkedDevices = list(chunk_list(devices, 200)) + + queriesToSendCount = 0 + for chunk in chunkedDevices: + queriesToSend = [] + for device in chunk: + bitsDownload = int(device['bitsDownloadSinceLastQuery']) + bitsUpload = int(device['bitsUploadSinceLastQuery']) + if (bitsDownload > 0) and (bitsUpload > 0): + percentUtilizationDownload = round((bitsDownload / round(device['downloadMax'] * 1000000)), 4) + percentUtilizationUpload = round((bitsUpload / round(device['uploadMax'] * 1000000)), 4) + + p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag( + "Type", "Device").field("Download", bitsDownload) + queriesToSend.append(p) + p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag( + "Type", "Device").field("Upload", bitsUpload) + queriesToSend.append(p) + p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag( + "Type", "Device").field("Download", percentUtilizationDownload) + queriesToSend.append(p) + p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag( + "Type", "Device").field("Upload", percentUtilizationUpload) + queriesToSend.append(p) + + write_api.write(bucket=influxDBBucket, record=queriesToSend) + # print("Added " + str(len(queriesToSend)) + " points to InfluxDB.") + queriesToSendCount += len(queriesToSend) + queriesToSend = [] - for device in devices: - bitsDownload = int(device['bitsDownloadSinceLastQuery']) - bitsUpload = int(device['bitsUploadSinceLastQuery']) - if (bitsDownload > 0) and (bitsUpload > 0): - percentUtilizationDownload = round((bitsDownload / round(device['downloadMax']*1000000)),4) - percentUtilizationUpload = round((bitsUpload / round(device['uploadMax']*1000000)),4) - - p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", bitsDownload) - queriesToSend.append(p) - p = Point('Bandwidth').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", bitsUpload) - queriesToSend.append(p) - p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Download", percentUtilizationDownload) - queriesToSend.append(p) - p = Point('Utilization').tag("Device", device['hostname']).tag("ParentNode", device['ParentNode']).tag("Type", "Device").field("Upload", percentUtilizationUpload) - queriesToSend.append(p) - for parentNode in parentNodes: bitsDownload = int(parentNode['bitsDownloadSinceLastQuery']) bitsUpload = int(parentNode['bitsUploadSinceLastQuery']) if (bitsDownload > 0) and (bitsUpload > 0): - percentUtilizationDownload = round((bitsDownload / round(parentNode['downloadMax']*1000000)),4) - percentUtilizationUpload = round((bitsUpload / round(parentNode['uploadMax']*1000000)),4) + percentUtilizationDownload = round((bitsDownload / round(parentNode['downloadMax'] * 1000000)), 4) + percentUtilizationUpload = round((bitsUpload / round(parentNode['uploadMax'] * 1000000)), 4) - p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", bitsDownload) + p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", + parentNode['parentNodeName']).tag( + "Type", "Parent Node").field("Download", bitsDownload) queriesToSend.append(p) - p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", bitsUpload) + p = Point('Bandwidth').tag("Device", parentNode['parentNodeName']).tag("ParentNode", + parentNode['parentNodeName']).tag( + "Type", "Parent Node").field("Upload", bitsUpload) queriesToSend.append(p) - p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Download", percentUtilizationDownload) + p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", + parentNode['parentNodeName']).tag( + "Type", "Parent Node").field("Download", percentUtilizationDownload) queriesToSend.append(p) - p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", parentNode['parentNodeName']).tag("Type", "Parent Node").field("Upload", percentUtilizationUpload) + p = Point('Utilization').tag("Device", parentNode['parentNodeName']).tag("ParentNode", + parentNode['parentNodeName']).tag( + "Type", "Parent Node").field("Upload", percentUtilizationUpload) queriesToSend.append(p) - write_api.write(bucket=bucket, record=queriesToSend) - print("Added " + str(len(queriesToSend)) + " points to InfluxDB.") + write_api.write(bucket=influxDBBucket, record=queriesToSend) + # print("Added " + str(len(queriesToSend)) + " points to InfluxDB.") + queriesToSendCount += len(queriesToSend) + print("Added " + str(queriesToSendCount) + " points to InfluxDB.") + client.close() + with open('statsByParentNode.json', 'w') as infile: json.dump(parentNodes, infile) with open('statsByDevice.json', 'w') as infile: json.dump(devices, infile) + endTime = datetime.now() - durationSeconds = round((endTime - startTime).total_seconds(),2) + durationSeconds = round((endTime - startTime).total_seconds(), 2) print("Graphs updated within " + str(durationSeconds) + " seconds.") if __name__ == '__main__':