Major cleanup. Logging added. Will not reload xdp prog if already loaded.

This commit is contained in:
Robert Chacón 2022-09-08 20:51:08 -06:00 committed by GitHub
parent 6f89e327b2
commit 360c0a0485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,25 +6,45 @@ import io
import ipaddress
import json
import os
import os.path
import subprocess
from datetime import datetime
import multiprocessing
import warnings
import psutil
import argparse
import logging
from ispConfig import fqOrCAKE, upstreamBandwidthCapacityDownloadMbps, upstreamBandwidthCapacityUploadMbps, \
defaultClassCapacityDownloadMbps, defaultClassCapacityUploadMbps, interfaceA, interfaceB, enableActualShellCommands, \
runShellCommandsAsSudo, generatedPNDownloadMbps, generatedPNUploadMbps
def shell(command):
if enableActualShellCommands:
if runShellCommandsAsSudo:
command = 'sudo ' + command
logging.info(command)
commands = command.split(' ')
#print(command)
proc = subprocess.Popen(commands, stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding
result = line
logging.info(result)
else:
logging.info(command)
def checkIfFirstRunSinceBoot():
if os.path.isfile("lastRun.txt"):
with open("lastRun.txt", 'r') as file:
lastRun = datetime.strptime(file.read(), "%d-%b-%Y (%H:%M:%S.%f)")
systemRunningSince = datetime.fromtimestamp(psutil.boot_time())
if systemRunningSince > lastRun:
print("First time run since system boot. Will load xdp-cpumap-tc from scratch.")
return True
else:
print("Not first time run since system boot. Will clear xdp filter rules and reload.")
return False
else:
print("First time run since system boot. Will attach xdp.")
return True
def clearPriorSettings(interfaceA, interfaceB):
if enableActualShellCommands:
@ -37,10 +57,42 @@ def clearPriorSettings(interfaceA, interfaceB):
shell('tc qdisc delete dev ' + interfaceB + ' root')
shell('tc qdisc delete dev ' + interfaceB)
def findQueuesAvailable():
# Find queues and CPU cores available. Use min between those two as queuesAvailable
if enableActualShellCommands:
queuesAvailable = 0
path = '/sys/class/net/' + interfaceA + '/queues/'
directory_contents = os.listdir(path)
for item in directory_contents:
if "tx-" in str(item):
queuesAvailable += 1
print("NIC queues:\t\t\t" + str(queuesAvailable))
cpuCount = multiprocessing.cpu_count()
print("CPU cores:\t\t\t" + str(cpuCount))
queuesAvailable = min(queuesAvailable,cpuCount)
print("queuesAvailable set to:\t" + str(cpuCount))
else:
print("As enableActualShellCommands is False, CPU core / queue count has been set to 12")
logging.info("NIC queues:\t\t\t" + str(12))
cpuCount = multiprocessing.cpu_count()
logging.info("CPU cores:\t\t\t" + str(12))
logging.info("queuesAvailable set to:\t" + str(12))
queuesAvailable = 12
return queuesAvailable
def refreshShapers():
# Warn user if enableActualShellCommands is False, because that would mean no actual commands are executing
if enableActualShellCommands == False:
warnings.warn("enableActualShellCommands is set to False. None of the commands below will actually be executed. Simulated run.", stacklevel=2)
# Check if first run since boot
isThisFirstRunSinceBoot = checkIfFirstRunSinceBoot()
# Automatically account for TCP overhead of plans. For example a 100Mbps plan needs to be set to 109Mbps for the user to ever see that result on a speed test
# Does not apply to nodes of any sort, just endpoint devices
tcpOverheadFactor = 1.09
#Verify ShapedDevices.csv is valid
# Verify ShapedDevices.csv is valid
rowNum = 2
with open('ShapedDevices.csv') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
@ -53,6 +105,7 @@ def refreshShapers():
commentsRemoved.pop(0)
for row in commentsRemoved:
circuitID, circuitName, deviceID, deviceName, ParentNode, mac, ipv4_input, ipv6_input, downloadMin, uploadMin, downloadMax, uploadMax, comment = row
# Each entry in ShapedDevices.csv can have multiple IPv4s or IPv6s seperated by commas. Split them up and parse each to ensure valid
ipv4_hosts = []
ipv6_hosts = []
if ipv4_input != "":
@ -114,22 +167,24 @@ def refreshShapers():
except ValueError as e:
raise Exception("Provided uploadMax '" + uploadMax + "' in ShapedDevices.csv at row " + str(rowNum) + " is not a valid integer.") from e
rowNum += 1
print("ShapedDevices.csv passed validation")
# Load Subscriber Circuits & Devices
subscriberCircuits = []
knownCircuitIDs = []
with open('ShapedDevices.csv') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
#Remove comments if any
# Remove comments if any
commentsRemoved = []
for row in csv_reader:
if not row[0].startswith('#'):
commentsRemoved.append(row)
#Remove header
# Remove header
commentsRemoved.pop(0)
for row in commentsRemoved:
circuitID, circuitName, deviceID, deviceName, ParentNode, mac, ipv4_input, ipv6_input, downloadMin, uploadMin, downloadMax, uploadMax, comment = row
ipv4_hosts = []
# Each entry in ShapedDevices.csv can have multiple IPv4s or IPv6s seperated by commas. Split them up and parse each
if ipv4_input != "":
ipv4_input = ipv4_input.replace(' ','')
if "," in ipv4_input:
@ -164,7 +219,9 @@ def refreshShapers():
ipv6_hosts.append(str(host))
else:
ipv6_hosts.append(ipEntry)
# If there is something in the circuit ID field
if circuitID != "":
# Seen circuit before
if circuitID in knownCircuitIDs:
for circuit in subscriberCircuits:
if circuit['circuitID'] == circuitID:
@ -187,6 +244,7 @@ def refreshShapers():
}
devicesListForCircuit.append(thisDevice)
circuit['devices'] = devicesListForCircuit
# Have not seen circuit before
else:
knownCircuitIDs.append(circuitID)
if ParentNode == "":
@ -213,7 +271,9 @@ def refreshShapers():
"qdisc": '',
}
subscriberCircuits.append(thisCircuit)
# If there is nothing in the circuit ID field
else:
# Copy deviceName to circuitName if none defined already
if circuitName == "":
circuitName = deviceName
if ParentNode == "":
@ -241,7 +301,7 @@ def refreshShapers():
}
subscriberCircuits.append(thisCircuit)
#Verify Network.json is valid json
# Verify Network.json is valid json
with open('network.json') as file:
try:
temporaryVariable = json.load(file) # put JSON-data to a variable
@ -250,24 +310,14 @@ def refreshShapers():
else:
print("network.json appears to be a valid JSON file") # in case json is valid
#Load network heirarchy
# Load network heirarchy
with open('network.json', 'r') as j:
network = json.loads(j.read())
# Find queues and CPU cores available. Use min between those two as queuesAvailable
queuesAvailable = 0
path = '/sys/class/net/' + interfaceA + '/queues/'
directory_contents = os.listdir(path)
for item in directory_contents:
if "tx-" in str(item):
queuesAvailable += 1
# Pull rx/tx queues / CPU cores avaialble
queuesAvailable = findQueuesAvailable()
print("NIC queues:\t" + str(queuesAvailable))
cpuCount = multiprocessing.cpu_count()
print("CPU cores:\t" + str(cpuCount))
queuesAvailable = min(queuesAvailable,cpuCount)
#Generate Parent Nodes. Spread ShapedDevices.csv which lack defined ParentNode across these (balance across CPUs)
# Generate Parent Nodes. Spread ShapedDevices.csv which lack defined ParentNode across these (balance across CPUs)
generatedPNs = []
for x in range(queuesAvailable):
genPNname = "Generated_PN_" + str(x+1)
@ -284,7 +334,7 @@ def refreshShapers():
if genPNcounter >= queuesAvailable:
genPNcounter = 0
#Find the bandwidth minimums for each node by combining mimimums of devices lower in that node's heirarchy
# Find the bandwidth minimums for each node by combining mimimums of devices lower in that node's heirarchy
def findBandwidthMins(data, depth):
tabs = ' ' * depth
minDownload = 0
@ -304,7 +354,7 @@ def refreshShapers():
minDownload, minUpload = findBandwidthMins(network, 0)
#Parse network structure. For each tier, create corresponding HTB and leaf classes. Prepare for execution later
# Parse network structure. For each tier, create corresponding HTB and leaf classes. Prepare for execution later
linuxTCcommands = []
xdpCPUmapCommands = []
devicesShaped = []
@ -312,22 +362,17 @@ def refreshShapers():
def traverseNetwork(data, depth, major, minor, queue, parentClassID, parentMaxDL, parentMaxUL):
tabs = ' ' * depth
for elem in data:
#print(tabs + elem)
elemClassID = hex(major) + ':' + hex(minor)
#Cap based on this node's max bandwidth, or parent node's max bandwidth, whichever is lower
# Cap based on this node's max bandwidth, or parent node's max bandwidth, whichever is lower
elemDownloadMax = min(data[elem]['downloadBandwidthMbps'],parentMaxDL)
elemUploadMax = min(data[elem]['uploadBandwidthMbps'],parentMaxUL)
#Based on calculations done in findBandwidthMins(), determine optimal HTB rates (mins) and ceils (maxs)
#The max calculation is to avoid 0 values, and the min calculation is to ensure rate is not higher than ceil
# Calculations are done in findBandwidthMins(), determine optimal HTB rates (mins) and ceils (maxs)
# For some reason that doesn't always yield the expected result, so it's better to play with ceil more than rate
# Here we override the rate as 95% of ceil.
elemDownloadMin = round(elemDownloadMax*.95)
elemUploadMin = round(elemUploadMax*.95)
#print(tabs + "Download: " + str(elemDownloadMin) + " to " + str(elemDownloadMax) + " Mbps")
#print(tabs + "Upload: " + str(elemUploadMin) + " to " + str(elemUploadMax) + " Mbps")
#print(tabs, end='')
linuxTCcommands.append('class add dev ' + interfaceA + ' parent ' + parentClassID + ' classid ' + hex(minor) + ' htb rate '+ str(round(elemDownloadMin)) + 'mbit ceil '+ str(round(elemDownloadMax)) + 'mbit prio 3')
#print(tabs, end='')
linuxTCcommands.append('class add dev ' + interfaceB + ' parent ' + parentClassID + ' classid ' + hex(minor) + ' htb rate '+ str(round(elemUploadMin)) + 'mbit ceil '+ str(round(elemUploadMax)) + 'mbit prio 3')
#print()
thisParentNode = {
"parentNodeName": elem,
"classID": elemClassID,
@ -337,22 +382,15 @@ def refreshShapers():
parentNodes.append(thisParentNode)
minor += 1
for circuit in subscriberCircuits:
#If a device from Shaper.csv lists this elem as its Parent Node, attach it as a leaf to this elem HTB
#If a device from ShapedDevices.csv lists this elem as its Parent Node, attach it as a leaf to this elem HTB
if elem == circuit['ParentNode']:
maxDownload = min(circuit['downloadMax'],elemDownloadMax)
maxUpload = min(circuit['uploadMax'],elemUploadMax)
minDownload = min(circuit['downloadMin'],maxDownload)
minUpload = min(circuit['uploadMin'],maxUpload)
#print(tabs + ' ' + circuit['circuitName'])
#print(tabs + ' ' + "Download: " + str(minDownload) + " to " + str(maxDownload) + " Mbps")
#print(tabs + ' ' + "Upload: " + str(minUpload) + " to " + str(maxUpload) + " Mbps")
#print(tabs + ' ', end='')
linuxTCcommands.append('class add dev ' + interfaceA + ' parent ' + elemClassID + ' classid ' + hex(minor) + ' htb rate '+ str(minDownload) + 'mbit ceil '+ str(maxDownload) + 'mbit prio 3')
#print(tabs + ' ', end='')
linuxTCcommands.append('qdisc add dev ' + interfaceA + ' parent ' + hex(major) + ':' + hex(minor) + ' ' + fqOrCAKE)
#print(tabs + ' ', end='')
linuxTCcommands.append('class add dev ' + interfaceB + ' parent ' + elemClassID + ' classid ' + hex(minor) + ' htb rate '+ str(minUpload) + 'mbit ceil '+ str(maxUpload) + 'mbit prio 3')
#print(tabs + ' ', end='')
linuxTCcommands.append('qdisc add dev ' + interfaceB + ' parent ' + hex(major) + ':' + hex(minor) + ' ' + fqOrCAKE)
parentString = hex(major) + ':'
flowIDstring = hex(major) + ':' + hex(minor)
@ -360,17 +398,15 @@ def refreshShapers():
for device in circuit['devices']:
if device['ipv4s']:
for ipv4 in device['ipv4s']:
#print(tabs + ' ', end='')
xdpCPUmapCommands.append('./xdp-cpumap-tc/src/xdp_iphash_to_cpu_cmdline --add --ip ' + str(ipv4) + ' --cpu ' + hex(queue-1) + ' --classid ' + flowIDstring)
if device['deviceName'] not in devicesShaped:
devicesShaped.append(device['deviceName'])
#print()
minor += 1
#Recursive call this function for children nodes attached to this node
# Recursive call this function for children nodes attached to this node
if 'children' in data[elem]:
#We need to keep tabs on the minor counter, because we can't have repeating class IDs. Here, we bring back the minor counter from the recursive function
# We need to keep tabs on the minor counter, because we can't have repeating class IDs. Here, we bring back the minor counter from the recursive function
minor = traverseNetwork(data[elem]['children'], depth+1, major, minor+1, queue, elemClassID, elemDownloadMax, elemUploadMax)
#If top level node, increment to next queue / cpu core
# If top level node, increment to next queue / cpu core
if depth == 0:
if queue >= queuesAvailable:
queue = 1
@ -380,24 +416,34 @@ def refreshShapers():
major += 1
return minor
#Here is the actual call to the recursive traverseNetwork() function. finalMinor is not used.
# Print structure of network.json in debug or verbose mode
logging.info(json.dumps(network, indent=4))
# Here is the actual call to the recursive traverseNetwork() function. finalMinor is not used.
finalMinor = traverseNetwork(network, 0, major=1, minor=3, queue=1, parentClassID="1:1", parentMaxDL=upstreamBandwidthCapacityDownloadMbps, parentMaxUL=upstreamBandwidthCapacityUploadMbps)
#Record start time of actual filter reload
# Record start time of actual filter reload
reloadStartTime = datetime.now()
#Clear Prior Settings
# Clear Prior Settings
clearPriorSettings(interfaceA, interfaceB)
# If this is the first time LibreQoS.py has run since system boot, load the XDP program and disable XPS
# Otherwise, just clear the existing IP filter rules for xdp
if isThisFirstRunSinceBoot:
# Set up XDP-CPUMAP-TC
shell('./xdp-cpumap-tc/bin/xps_setup.sh -d ' + interfaceA + ' --default --disable')
shell('./xdp-cpumap-tc/bin/xps_setup.sh -d ' + interfaceB + ' --default --disable')
shell('./xdp-cpumap-tc/src/xdp_iphash_to_cpu --dev ' + interfaceA + ' --lan')
shell('./xdp-cpumap-tc/src/xdp_iphash_to_cpu --dev ' + interfaceB + ' --wan')
if enableActualShellCommands:
# Here we use os.system for the command, because otherwise it sometimes gltiches out with Popen in shell()
result = os.system('./xdp-cpumap-tc/src/xdp_iphash_to_cpu_cmdline --clear')
shell('./xdp-cpumap-tc/src/tc_classify --dev-egress ' + interfaceA)
shell('./xdp-cpumap-tc/src/tc_classify --dev-egress ' + interfaceB)
else:
if enableActualShellCommands:
result = os.system('./xdp-cpumap-tc/src/xdp_iphash_to_cpu_cmdline --clear')
# Create MQ qdisc for each interface
thisInterface = interfaceA
@ -423,40 +469,61 @@ def refreshShapers():
# Default class can use up to defaultClassCapacityUploadMbps when that bandwidth isn't used by known hosts.
shell('tc class add dev ' + thisInterface + ' parent ' + hex(queue+1) + ':1 classid ' + hex(queue+1) + ':2 htb rate ' + str(defaultClassCapacityUploadMbps/4) + 'mbit ceil ' + str(defaultClassCapacityUploadMbps) + 'mbit prio 5')
shell('tc qdisc add dev ' + thisInterface + ' parent ' + hex(queue+1) + ':2 ' + fqOrCAKE)
#print()
#Execute actual Linux TC and XDP-CPUMAP-TC filter commands
# Execute actual Linux TC and XDP-CPUMAP-TC filter commands
print("Executing linux TC class/qdisc commands")
with open('linux_tc.txt', 'w') as f:
for line in linuxTCcommands:
f.write(f"{line}\n")
print("Executing linux TC class/qdisc commands")
logging.info(line)
shell("/sbin/tc -f -b linux_tc.txt")
print("Executed " + str(len(linuxTCcommands)) + " linux TC class/qdisc commands")
print("Executing XDP-CPUMAP-TC IP filter commands")
for command in xdpCPUmapCommands:
logging.info(command)
shell(command)
print("Executed " + str(len(xdpCPUmapCommands)) + " XDP-CPUMAP-TC IP filter commands")
reloadEndTime = datetime.now()
#Recap
# Recap - warn operator if devices were skipped
for circuit in subscriberCircuits:
for device in circuit['devices']:
if device['deviceName'] not in devicesShaped:
warnings.warn('Device ' + device['deviceName'] + ' with device ID of ' + device['deviceID'] + ' was not shaped. Please check to ensure its Parent Node is listed in network.json.')
#Save for stats
# Save for stats
with open('statsByCircuit.json', 'w') as infile:
json.dump(subscriberCircuits, infile)
with open('statsByParentNode.json', 'w') as infile:
json.dump(parentNodes, infile)
#Report reload time
# Record time this run completed at
# filename = os.path.join(_here, 'lastRun.txt')
with open("lastRun.txt", 'w') as file:
file.write(datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)"))
# Report reload time
reloadTimeSeconds = (reloadEndTime - reloadStartTime).seconds
print("Queue and IP filter reload completed in " + str(reloadTimeSeconds) + " seconds")
# Done
currentTimeString = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
print("Successful run completed on " + currentTimeString)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-d', '--debug',
help="Print lots of debugging statements",
action="store_const", dest="loglevel", const=logging.DEBUG,
default=logging.WARNING,
)
parser.add_argument(
'-v', '--verbose',
help="Be verbose",
action="store_const", dest="loglevel", const=logging.INFO,
)
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
# Starting
print("refreshShapers starting at " + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
# Refresh and/or set up queues
refreshShapers()
print("Program complete")
# Done
print("refreshShapers completed on " + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))