Add integrationCommon.py - shared functionality

Creates a new integrationCommon.py file for shared functionality
between integrations.

Move isInAllowedSubnets into the shared file. Add a reciprocal
isInIgnoredSubnets file to utilize the configuration file's
"ignoreSubnets" field.

Add a new function, isIpv4Permitted that combines calls to the
two to return true if an IP address is both allowed and not
ignored, and false otherwise.

Signed-off-by: Herbert Wolverson <herberticus@gmail.com>
This commit is contained in:
Herbert Wolverson
2022-10-27 08:52:24 -05:00
parent 981b2e5b19
commit 532be33d64
3 changed files with 37 additions and 22 deletions

32
v1.3/integrationCommon.py Normal file
View File

@@ -0,0 +1,32 @@
# Provides common functionality shared between
# integrations.
from ispConfig import allowedSubnets, ignoreSubnets
import ipaddress;
def isInAllowedSubnets(inputIP):
# Check whether an IP address occurs inside the allowedSubnets list
isAllowed = False
if '/' in inputIP:
inputIP = inputIP.split('/')[0]
for subnet in allowedSubnets:
if (ipaddress.ip_address(inputIP) in ipaddress.ip_network(subnet)):
isAllowed = True
return isAllowed
def isInIgnoredSubnets(inputIP):
# Check whether an IP address occurs within the ignoreSubnets list
isIgnored = False
if '/' in inputIP:
inputIP = inputIP.split('/')[0]
for subnet in ignoreSubnets:
if (ipaddress.ip_address(inputIP) in ipaddress.ip_network(subnet)):
isIgnored = True
return isIgnored
def isIpv4Permitted(inputIP):
# Checks whether an IP address is in Allowed Subnets.
# If it is, check that it isn't in Ignored Subnets.
# If it is allowed and not ignored, returns true.
# Otherwise, returns false.
return isInIgnoredSubnets(inputIP)==False and isInAllowedSubnets(inputIP)

View File

@@ -2,7 +2,8 @@ import requests
import os
import csv
import ipaddress
from ispConfig import allowedSubnets, ignoreSubnets, excludeSites, findIPv6usingMikrotik, bandwidthOverheadFactor, exceptionCPEs, splynx_api_key, splynx_api_secret, splynx_api_url
from ispConfig import excludeSites, findIPv6usingMikrotik, bandwidthOverheadFactor, exceptionCPEs, splynx_api_key, splynx_api_secret, splynx_api_url
from integrationCommon import isIpv4Permitted
import shutil
import json
import time
@@ -12,16 +13,6 @@ if findIPv6usingMikrotik == True:
from mikrotikFindIPv6 import pullMikrotikIPv6
def isInAllowedSubnets(inputIP):
isAllowed = False
if '/' in inputIP:
inputIP = inputIP.split('/')[0]
for subnet in allowedSubnets:
if (ipaddress.ip_address(inputIP) in ipaddress.ip_network(subnet)):
isAllowed = True
return isAllowed
def createShaper():
print("Creating ShapedDevices.csv")

View File

@@ -2,7 +2,8 @@ import requests
import os
import csv
import ipaddress
from ispConfig import UISPbaseURL, uispAuthToken, allowedSubnets, ignoreSubnets, excludeSites, findIPv6usingMikrotik, bandwidthOverheadFactor, exceptionCPEs
from ispConfig import UISPbaseURL, uispAuthToken, excludeSites, findIPv6usingMikrotik, bandwidthOverheadFactor, exceptionCPEs
from integrationCommon import isIpv4Permitted
import shutil
import json
if findIPv6usingMikrotik == True:
@@ -10,15 +11,6 @@ if findIPv6usingMikrotik == True:
knownRouterModels = ['ACB-AC', 'ACB-ISP']
knownAPmodels = ['LTU-Rocket', 'RP-5AC', 'RP-5AC-Gen2', 'LAP-GPS', 'Wave-AP']
def isInAllowedSubnets(inputIP):
isAllowed = False
if '/' in inputIP:
inputIP = inputIP.split('/')[0]
for subnet in allowedSubnets:
if (ipaddress.ip_address(inputIP) in ipaddress.ip_network(subnet)):
isAllowed = True
return isAllowed
def createTree(sites,accessPoints,bandwidthDL,bandwidthUL,siteParentDict,siteIDtoName,sitesWithParents,currentNode):
currentNodeName = list(currentNode.items())[0][0]
@@ -229,7 +221,7 @@ def createShaper():
ipv6 = ''
if ipv4 in ipv4ToIPv6.keys():
ipv6 = ipv4ToIPv6[ipv4]
if isInAllowedSubnets(ipv4):
if isIpv4Permitted(ipv4):
deviceModel = device['identification']['model']
deviceModelName = device['identification']['modelName']
maxSpeedDown = round(bandwidthOverheadFactor*downloadSpeedMbps)