Save queueingStructure.json Useful for debugging

This commit is contained in:
Robert Chacón 2022-09-17 07:49:43 -06:00 committed by GitHub
parent 2161e3e4d8
commit 2beac8ce19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -74,11 +74,11 @@ def tearDown(interfaceA, interfaceB):
shell('ip link set dev ' + interfaceB + ' xdp off')
clearPriorSettings(interfaceA, interfaceB)
def printStatsFromIP(ipAddress):
with open('statsByCircuit.json', 'r') as j:
subscriberCircuits = json.loads(j.read())
def getQdiscForIPaddress(ipAddress):
qDiscID = ''
foundQdisc = False
with open('statsByCircuit.json', 'r') as j:
subscriberCircuits = json.loads(j.read())
for circuit in subscriberCircuits:
for device in circuit['devices']:
for ipv4 in device['ipv4s']:
@ -90,6 +90,13 @@ def printStatsFromIP(ipAddress):
qDiscID = circuit['qdisc']
foundQdisc = True
if foundQdisc:
return qDiscID
else:
return None
def printStatsFromIP(ipAddress):
qDiscID = getQdiscForIPaddress(ipAddress)
if qDiscID != None:
interfaces = [interfaceA, interfaceB]
for interface in interfaces:
command = 'tc -s qdisc show dev ' + interface + ' parent ' + qDiscID
@ -101,21 +108,8 @@ def printStatsFromIP(ipAddress):
print("Invalid IP address provided")
def printCircuitClassInfo(ipAddress):
with open('statsByCircuit.json', 'r') as j:
subscriberCircuits = json.loads(j.read())
qDiscID = ''
foundQdisc = False
for circuit in subscriberCircuits:
for device in circuit['devices']:
for ipv4 in device['ipv4s']:
if ipv4 == ipAddress:
qDiscID = circuit['qdisc']
foundQdisc = True
for ipv6 in device['ipv6s']:
if ipv6 == ipAddress:
qDiscID = circuit['qdisc']
foundQdisc = True
if foundQdisc:
qDiscID = getQdiscForIPaddress(ipAddress)
if qDiscID != None:
print("IP: " + ipAddress + " | Class ID: " + qDiscID)
print()
theClassID = ''
@ -300,24 +294,30 @@ def validateNetworkAndDevices():
return False
def refreshShapers():
# Starting
print("refreshShapers starting at " + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
# Warn user if enableActualShellCommands is False, because that would mean no actual commands are executing
if enableActualShellCommands == False:
warnings.warn("enableActualShellCommands is set to False. None of the commands below will actually be executed. Simulated run.", stacklevel=2)
# Check if first run since boot
isThisFirstRunSinceBoot = checkIfFirstRunSinceBoot()
# Automatically account for TCP overhead of plans. For example a 100Mbps plan needs to be set to 109Mbps for the user to ever see that result on a speed test
# Does not apply to nodes of any sort, just endpoint devices
tcpOverheadFactor = 1.09
# Files
shapedDevicesFile = 'ShapedDevices.csv'
networkJSONfile = 'network.json'
# Check validation
safeToRunRefresh = False
if (validateNetworkAndDevices() == True):
@ -336,6 +336,7 @@ def refreshShapers():
safeToRunRefresh = True
if safeToRunRefresh == True:
# Load Subscriber Circuits & Devices
subscriberCircuits = []
knownCircuitIDs = []
@ -475,10 +476,12 @@ def refreshShapers():
counterForCircuitsWithoutParentNodes += 1
subscriberCircuits.append(thisCircuit)
# Load network heirarchy
with open(networkJSONfile, 'r') as j:
network = json.loads(j.read())
# Pull rx/tx queues / CPU cores available
if usingXDP:
queuesAvailable = findQueuesAvailable()
@ -492,6 +495,7 @@ def refreshShapers():
else:
queuesAvailable = 1
# Generate Parent Nodes. Spread ShapedDevices.csv which lack defined ParentNode across these (balance across CPUs)
generatedPNs = []
for x in range(queuesAvailable):
@ -515,6 +519,7 @@ def refreshShapers():
if genPNcounter >= queuesAvailable:
genPNcounter = 0
# Find the bandwidth minimums for each node by combining mimimums of devices lower in that node's heirarchy
def findBandwidthMins(data, depth):
tabs = ' ' * depth
@ -532,16 +537,17 @@ def refreshShapers():
data[elem]['downloadBandwidthMbpsMin'] = minDownload
data[elem]['uploadBandwidthMbpsMin'] = minUpload
return minDownload, minUpload
minDownload, minUpload = findBandwidthMins(network, 0)
# Define lists for hash filters
ipv4FiltersSrc = []
ipv4FiltersDst = []
ipv6FiltersSrc = []
ipv6FiltersDst = []
# Parse network structure. For each tier, create corresponding HTB and leaf classes. Prepare for execution later
# Parse network structure. For each tier, create corresponding HTB and leaf classes. Prepare commands for execution later
linuxTCcommands = []
xdpCPUmapCommands = []
devicesShaped = []
@ -549,6 +555,7 @@ def refreshShapers():
def traverseNetwork(data, depth, major, minor, queue, parentClassID, parentMaxDL, parentMaxUL):
tabs = ' ' * depth
for elem in data:
circuitsForThisNetworkNode = {}
elemClassID = hex(major) + ':' + hex(minor)
# Cap based on this node's max bandwidth, or parent node's max bandwidth, whichever is lower
elemDownloadMax = min(data[elem]['downloadBandwidthMbps'],parentMaxDL)
@ -581,6 +588,25 @@ def refreshShapers():
maxUpload = min(circuit['uploadMax'],elemUploadMax)
minDownload = min(circuit['downloadMin'],maxDownload)
minUpload = min(circuit['uploadMin'],maxUpload)
parentString = hex(major) + ':'
flowIDstring = hex(major) + ':' + hex(minor)
circuit['qdisc'] = flowIDstring
thisNewCircuitItemForNetwork = {
'maxDownload' : maxDownload,
'maxUpload' : maxUpload,
'minDownload' : minDownload,
'minUpload' : minUpload,
"circuitID": circuit['circuitID'],
"circuitName": circuit['circuitName'],
"ParentNode": circuit['ParentNode'],
"devices": circuit['devices'],
"downloadMin": circuit['downloadMin'],
"uploadMin": circuit['uploadMin'],
"downloadMax": circuit['downloadMax'],
"uploadMax": circuit['uploadMax'],
"qdisc": flowIDstring,
"comment": circuit['comment']
}
comment = " # CircuitID: " + circuit['circuitID'] + " DeviceIDs: "
for device in circuit['devices']:
comment = comment + device['deviceID'] + ', '
@ -593,9 +619,8 @@ def refreshShapers():
linuxTCcommands.append(command)
command = 'qdisc add dev ' + interfaceB + ' parent ' + hex(major) + ':' + hex(minor) + ' ' + fqOrCAKE
linuxTCcommands.append(command)
parentString = hex(major) + ':'
flowIDstring = hex(major) + ':' + hex(minor)
circuit['qdisc'] = flowIDstring
thisNewCircuitItemForNetwork['devices'] = circuit['devices']
for device in circuit['devices']:
if device['ipv4s']:
for ipv4 in device['ipv4s']:
@ -611,6 +636,8 @@ def refreshShapers():
ipv6FiltersDst.append((ipv6, parentString, flowIDstring))
if device['deviceName'] not in devicesShaped:
devicesShaped.append(device['deviceName'])
circuitsForThisNetworkNode = circuitsForThisNetworkNode | thisNewCircuitItemForNetwork
data[elem]['circuits'] = circuitsForThisNetworkNode
minor += 1
# Recursive call this function for children nodes attached to this node
if 'children' in data[elem]:
@ -625,13 +652,17 @@ def refreshShapers():
queue += 1
major += 1
return minor
# Here is the actual call to the recursive traverseNetwork() function. finalMinor is not used.
finalMinor = traverseNetwork(network, 0, major=1, minor=3, queue=1, parentClassID="1:1", parentMaxDL=upstreamBandwidthCapacityDownloadMbps, parentMaxUL=upstreamBandwidthCapacityUploadMbps)
# Save queuingStructure
with open('queuingStructure.json', 'w') as infile:
json.dump(network, infile, indent=4)
# Print structure of network.json in debug or verbose mode
logging.info(json.dumps(network, indent=4))
# If XDP off - prepare commands for Hash Tables
# IPv4 Hash Filters
# Dst
@ -710,13 +741,16 @@ def refreshShapers():
linuxTCcommands.append(command)
filterHandleCounter += 1
# Record start time of actual filter reload
reloadStartTime = datetime.now()
# Clear Prior Settings
clearPriorSettings(interfaceA, interfaceB)
# Setup XDP and disable XPS regardless of whether it is first run or not (necessary to handle cases where systemctl stop was used)
# Root HTB Setup
# If using XDP, Setup XDP and disable XPS regardless of whether it is first run or not (necessary to handle cases where systemctl stop was used)
if usingXDP:
if enableActualShellCommands:
# Here we use os.system for the command, because otherwise it sometimes gltiches out with Popen in shell()
@ -729,7 +763,7 @@ def refreshShapers():
shell('./xdp-cpumap-tc/src/xdp_iphash_to_cpu --dev ' + interfaceB + ' --wan')
shell('./xdp-cpumap-tc/src/tc_classify --dev-egress ' + interfaceA)
shell('./xdp-cpumap-tc/src/tc_classify --dev-egress ' + interfaceB)
# If using XDP, Setup MQ
if usingXDP:
# Create MQ qdisc for each CPU core / rx-tx queue (XDP method - requires IPv4)
thisInterface = interfaceA
@ -757,6 +791,7 @@ def refreshShapers():
# Only 1/4 of defaultClassCapacity is guarenteed (to prevent hitting ceiling of upstream), for the most part it serves as an "up to" ceiling.
shell('tc class add dev ' + thisInterface + ' parent ' + hex(queue+1) + ':1 classid ' + hex(queue+1) + ':2 htb rate ' + str(round((upstreamBandwidthCapacityUploadMbps-1)/4)) + 'mbit ceil ' + str(upstreamBandwidthCapacityUploadMbps-1) + 'mbit prio 5')
shell('tc qdisc add dev ' + thisInterface + ' parent ' + hex(queue+1) + ':2 ' + fqOrCAKE)
# If not using XDP, Setup single HTB
else:
# Create single HTB qdisc (non XDP method - allows IPv6)
thisInterface = interfaceA
@ -783,6 +818,7 @@ def refreshShapers():
shell('tc class add dev ' + thisInterface + ' parent ' + hex(queue+1) + ':1 classid ' + hex(queue+1) + ':2 htb rate ' + str(round((upstreamBandwidthCapacityUploadMbps-1)/4)) + 'mbit ceil ' + str(upstreamBandwidthCapacityUploadMbps-1) + 'mbit prio 5')
shell('tc qdisc add dev ' + thisInterface + ' parent ' + hex(queue+1) + ':2 ' + fqOrCAKE)
# Execute actual Linux TC commands
print("Executing linux TC class/qdisc commands")
with open('linux_tc.txt', 'w') as f:
@ -796,6 +832,7 @@ def refreshShapers():
shell("/sbin/tc -f -b linux_tc.txt")
print("Executed " + str(len(linuxTCcommands)) + " linux TC class/qdisc commands")
# Execute actual XDP-CPUMAP-TC filter commands
if usingXDP:
print("Executing XDP-CPUMAP-TC IP filter commands")
@ -804,9 +841,11 @@ def refreshShapers():
shell(command)
print("Executed " + str(len(xdpCPUmapCommands)) + " XDP-CPUMAP-TC IP filter commands")
# Record end time
reloadEndTime = datetime.now()
# Recap - warn operator if devices were skipped
devicesSkipped = []
for circuit in subscriberCircuits:
@ -820,21 +859,25 @@ def refreshShapers():
name, idNum = entry
print('DeviceID: ' + idNum + '\t DeviceName: ' + name)
# Save for stats
with open('statsByCircuit.json', 'w') as infile:
json.dump(subscriberCircuits, infile)
with open('statsByParentNode.json', 'w') as infile:
json.dump(parentNodes, infile)
# Record time this run completed at
# filename = os.path.join(_here, 'lastRun.txt')
with open("lastRun.txt", 'w') as file:
file.write(datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)"))
# Report reload time
reloadTimeSeconds = (reloadEndTime - reloadStartTime).seconds
print("Queue and IP filter reload completed in " + str(reloadTimeSeconds) + " seconds")
# Done
print("refreshShapers completed on " + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))