Fix XML vulnerabilities found by bandit (#14415)

* Fix vulnerabilities found by bandir

* fix vulnerabilities in tests dir

* fix for mo

* some changes
This commit is contained in:
Anastasia Kuporosova 2022-12-06 11:46:26 +01:00 committed by GitHub
parent 17c55ada85
commit 09b24a1932
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 67 additions and 37 deletions

View File

@ -5,10 +5,17 @@ import argparse
import os import os
import glob import glob
import xml.etree.ElementTree as ET import defusedxml.ElementTree as ET
from defusedxml import defuse_stdlib
from utils import utils from utils import utils
# defuse_stdlib provide patched version of xml.etree.ElementTree which allows to use objects from xml.etree.ElementTree
# in a safe manner without including unsafe xml.etree.ElementTree
ET_defused = defuse_stdlib()[ET]
Element = ET_defused.Element
SubElement = ET_defused.SubElement
logger = utils.get_logger('XmlMerger') logger = utils.get_logger('XmlMerger')
def parse_arguments(): def parse_arguments():
@ -27,7 +34,7 @@ def parse_arguments():
return parser.parse_args() return parser.parse_args()
def update_result_node(xml_node: ET.SubElement, aggregated_res: ET.SubElement): def update_result_node(xml_node: SubElement, aggregated_res: SubElement):
for attr_name in xml_node.attrib: for attr_name in xml_node.attrib:
if attr_name == "passrate": if attr_name == "passrate":
continue continue
@ -44,7 +51,7 @@ def update_result_node(xml_node: ET.SubElement, aggregated_res: ET.SubElement):
aggregated_res.set(attr_name, str(xml_value + aggregated_value)) aggregated_res.set(attr_name, str(xml_value + aggregated_value))
def aggregate_test_results(aggregated_results: ET.SubElement, xml_reports: list, report_type: str): def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, report_type: str):
aggregated_timestamp = None aggregated_timestamp = None
for xml in xml_reports: for xml in xml_reports:
logger.info(f" Processing: {xml}") logger.info(f" Processing: {xml}")
@ -83,8 +90,8 @@ def aggregate_test_results(aggregated_results: ET.SubElement, xml_reports: list,
def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filename: str, report_type: str): def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filename: str, report_type: str):
logger.info(f" Processing is finished") logger.info(f" Processing is finished")
summary = ET.Element("report") summary = Element("report")
results = ET.SubElement(summary, "results") results = SubElement(summary, "results")
entity_name = None entity_name = None
if report_type == "OP": if report_type == "OP":
entity_name = "ops_list" entity_name = "ops_list"
@ -93,7 +100,7 @@ def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filenam
else: else:
raise Exception(f"Error to create aggregated report. Incorrect report type: {report_type}") raise Exception(f"Error to create aggregated report. Incorrect report type: {report_type}")
entity_list = ET.SubElement(summary, entity_name) entity_list = SubElement(summary, entity_name)
for folder_path in input_folder_paths: for folder_path in input_folder_paths:
if not os.path.exists(folder_path): if not os.path.exists(folder_path):
@ -121,7 +128,7 @@ def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filenam
logger.error(f'{folder_path} does not contain the correct xml files') logger.error(f'{folder_path} does not contain the correct xml files')
for entity in xml_root.find(entity_name): for entity in xml_root.find(entity_name):
if entity_list.find(entity.tag) is None: if entity_list.find(entity.tag) is None:
ET.SubElement(entity_list, entity.tag) SubElement(entity_list, entity.tag)
timestamp = aggregate_test_results(results, xml_reports, report_type) timestamp = aggregate_test_results(results, xml_reports, report_type)
if report_type == "OP": if report_type == "OP":
utils.update_passrates(results) utils.update_passrates(results)

View File

@ -1,7 +1,7 @@
# Copyright (C) 2022 Intel Corporation # Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import xml.etree.ElementTree as ET import defusedxml.ElementTree as ET
from argparse import ArgumentParser from argparse import ArgumentParser
from pathlib import Path from pathlib import Path

View File

@ -1,2 +1,3 @@
jinja2==3.1.2 jinja2==3.1.2
gitpython gitpython
defusedxml>=0.7.1

View File

@ -12,7 +12,7 @@ from merge_xmls import merge_xml
from pathlib import Path, PurePath from pathlib import Path, PurePath
from sys import version, platform from sys import version, platform
import xml.etree.ElementTree as ET import defusedxml.ElementTree as ET
import os import os

View File

@ -4,12 +4,19 @@
import argparse import argparse
import os import os
import csv import csv
import xml.etree.ElementTree as ET import defusedxml.ElementTree as ET
from defusedxml import defuse_stdlib
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
from utils import utils from utils import utils
# defuse_stdlib provide patched version of xml.etree.ElementTree which allows to use objects from xml.etree.ElementTree
# in a safe manner without including unsafe xml.etree.ElementTree
ET_defused = defuse_stdlib()[ET]
Element = ET_defused.Element
SubElement = ET_defused.SubElement
NOT_RUN = "NOT RUN" NOT_RUN = "NOT RUN"
NA = "N/A" NA = "N/A"
@ -50,10 +57,10 @@ def parse_arguments():
def merge_xmls(xml_paths: list): def merge_xmls(xml_paths: list):
logger.info("Merging XML files is started") logger.info("Merging XML files is started")
summary = ET.Element("report") summary = Element("report")
timestamp = None timestamp = None
summary_results = ET.SubElement(summary, "results") summary_results = SubElement(summary, "results")
ops_list = ET.SubElement(summary, "ops_list") ops_list = SubElement(summary, "ops_list")
for xml_path in xml_paths: for xml_path in xml_paths:
try: try:
xml_root = ET.parse(xml_path).getroot() xml_root = ET.parse(xml_path).getroot()
@ -67,7 +74,7 @@ def merge_xmls(xml_paths: list):
for op in xml_root.find("ops_list"): for op in xml_root.find("ops_list"):
if ops_list.find(op.tag) is None: if ops_list.find(op.tag) is None:
ET.SubElement(ops_list, op.tag) SubElement(ops_list, op.tag)
for device in xml_root.find("results"): for device in xml_root.find("results"):
device_results = summary_results.find(device.tag) device_results = summary_results.find(device.tag)
@ -103,7 +110,7 @@ def merge_xmls(xml_paths: list):
return summary return summary
def collect_statistic(root: ET.Element, is_conformance_mode: bool): def collect_statistic(root: Element, is_conformance_mode: bool):
logger.info("Statistic collecting is started") logger.info("Statistic collecting is started")
trusted_ops = dict() trusted_ops = dict()
pass_rate_avg = dict() pass_rate_avg = dict()
@ -218,7 +225,7 @@ def serialize_to_csv(report_filename: str, output_dir: os.path, op_list: list, d
logger.info(f'Final CSV report is saved to {csv_filename}') logger.info(f'Final CSV report is saved to {csv_filename}')
def create_summary(summary_root: ET.Element, output_folder: os.path, expected_devices:list, report_tag: str, report_version: str, def create_summary(summary_root: Element, output_folder: os.path, expected_devices:list, report_tag: str, report_version: str,
is_conformance_mode: bool, is_serialize_to_csv: bool, output_filename='report'): is_conformance_mode: bool, is_serialize_to_csv: bool, output_filename='report'):
if is_conformance_mode: if is_conformance_mode:
utils.update_conformance_test_counters(summary_root, logger) utils.update_conformance_test_counters(summary_root, logger)

View File

@ -9,9 +9,16 @@ from argparse import ArgumentParser
from typing import Union from typing import Union
import xml.etree.cElementTree as et import defusedxml.ElementTree as ET
import xml.dom.minidom as dom import defusedxml.minidom as dom
from defusedxml import defuse_stdlib
# defuse_stdlib provide patched version of xml.etree.ElementTree which allows to use objects from xml.etree.ElementTree
# in a safe manner without including unsafe xml.etree.ElementTree
ET_defused = defuse_stdlib()[ET]
Element = ET_defused.Element
dom_defused = defuse_stdlib()[dom]
Document = dom_defused.Document
def get_path(entry: Union[str, Path], is_directory=False, check_exists=True, file_or_directory=False): def get_path(entry: Union[str, Path], is_directory=False, check_exists=True, file_or_directory=False):
try: try:
@ -75,7 +82,7 @@ def build_argument_parser():
def tostring(layer, num_tabs=0): def tostring(layer, num_tabs=0):
return '\t' * num_tabs + str(et.tostring(layer, encoding='utf-8').decode()) return '\t' * num_tabs + str(ET.tostring(layer, encoding='utf-8').decode())
def tags(node): def tags(node):
@ -86,17 +93,17 @@ def tags(node):
def make_input(layer, port_id): def make_input(layer, port_id):
output = et.Element('output') output = Element('output')
output_port = next(port for port in tags(layer)['output'] if int(port.attrib['id']) == int(port_id)) output_port = next(port for port in tags(layer)['output'] if int(port.attrib['id']) == int(port_id))
output.append(output_port) output.append(output_port)
precision_to_element_type = {'FP16': 'f16', 'I32': 'i32', 'FP32': 'f32'} precision_to_element_type = {'FP16': 'f16', 'I32': 'i32', 'FP32': 'f32'}
data = et.Element('data', attrib={ data = Element('data', attrib={
'element_type': precision_to_element_type[output_port.attrib['precision']], 'element_type': precision_to_element_type[output_port.attrib['precision']],
'shape': ','.join([dim.text for dim in output_port])} 'shape': ','.join([dim.text for dim in output_port])}
) )
input_layer = et.Element('layer', attrib={ input_layer = Element('layer', attrib={
'id': layer.attrib['id'], 'id': layer.attrib['id'],
'name': layer.attrib['name'], 'name': layer.attrib['name'],
'type': 'Parameter', 'type': 'Parameter',
@ -109,11 +116,11 @@ def make_input(layer, port_id):
def make_output(layer, port_id): def make_output(layer, port_id):
input_section = et.Element('input') input_section = Element('input')
input_port = next(port for port in tags(layer)['input'] if int(port.attrib['id']) == int(port_id)) input_port = next(port for port in tags(layer)['input'] if int(port.attrib['id']) == int(port_id))
input_section.append(input_port) input_section.append(input_port)
output_layer = et.Element('layer', attrib={ output_layer = Element('layer', attrib={
'id': layer.attrib['id'], 'id': layer.attrib['id'],
'name': layer.attrib['name'], 'name': layer.attrib['name'],
'type': 'Result', 'type': 'Result',
@ -158,20 +165,20 @@ def extract_weights(source, layers, destination):
def prettify(element, indent=0): def prettify(element, indent=0):
header = dom.Document().toxml() header = Document().toxml()
string = dom.parseString(tostring(element)).toprettyxml()[len(header) + 1:] string = dom.parseString(tostring(element)).toprettyxml()[len(header) + 1:]
return '\n'.join(['\t' * indent + line for line in string.split('\n') if line.strip()]) return '\n'.join(['\t' * indent + line for line in string.split('\n') if line.strip()])
def dump(input_model, elements, output_model): def dump(input_model, elements, output_model):
root = et.parse(str(input_model)).getroot() root = ET.parse(str(input_model)).getroot()
net = et.Element('net', attrib={'name': root.attrib['name'], 'version': root.attrib['version']}) net = Element('net', attrib={'name': root.attrib['name'], 'version': root.attrib['version']})
layers = et.Element('layers') layers = Element('layers')
for layer in elements['layers']: for layer in elements['layers']:
layers.append(layer) layers.append(layer)
edges = et.Element('edges') edges = Element('edges')
for edge in elements['edges']: for edge in elements['edges']:
edges.append(edge) edges.append(edge)
@ -190,7 +197,7 @@ def main():
print('Error: only IR version 10 or newer is supported, IRv{} has been given'.format(model.version)) print('Error: only IR version 10 or newer is supported, IRv{} has been given'.format(model.version))
sys.exit(-1) sys.exit(-1)
layers, edges, _ = et.parse(str(arguments.model)).getroot() layers, edges, _ = ET.parse(str(arguments.model)).getroot()
layers_identifiers = {} layers_identifiers = {}
for layer in layers: for layer in layers:
layers_identifiers[int(layer.attrib['id'])] = layer layers_identifiers[int(layer.attrib['id'])] = layer

View File

@ -5,7 +5,7 @@ import itertools
import os import os
import re import re
import warnings import warnings
import xml.etree.ElementTree as ET import defusedxml.ElementTree as ET
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np

View File

@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import os import os
import xml.etree.ElementTree import defusedxml.ElementTree as ET
def mapping_parser(file): def mapping_parser(file):
@ -13,7 +13,7 @@ def mapping_parser(file):
""" """
mapping_dict = {} mapping_dict = {}
if os.path.splitext(file)[1] == '.mapping' and os.path.isfile(file): if os.path.splitext(file)[1] == '.mapping' and os.path.isfile(file):
xml_tree = xml.etree.ElementTree.parse(file) xml_tree = ET.parse(file)
xml_root = xml_tree.getroot() xml_root = xml_tree.getroot()
for child in xml_root: for child in xml_root:
framework_info = child.find('.//framework') framework_info = child.find('.//framework')

View File

@ -18,7 +18,7 @@ import sys
from distutils.dir_util import copy_tree from distutils.dir_util import copy_tree
from inspect import getsourcefile from inspect import getsourcefile
from pathlib import Path from pathlib import Path
from xml.etree import ElementTree as ET import defusedxml.ElementTree as ET
log.basicConfig(format="{file}: [ %(levelname)s ] %(message)s".format(file=os.path.basename(__file__)), log.basicConfig(format="{file}: [ %(levelname)s ] %(message)s".format(file=os.path.basename(__file__)),
level=log.INFO, stream=sys.stdout) level=log.INFO, stream=sys.stdout)

View File

@ -17,7 +17,7 @@ import logging
import os import os
import re import re
import sys import sys
import xml.etree.ElementTree as ET import defusedxml.ElementTree as ET
from glob import glob from glob import glob
from inspect import getsourcefile from inspect import getsourcefile
from types import SimpleNamespace from types import SimpleNamespace

View File

@ -6,3 +6,4 @@ pandas>=1.3.5
h5py>=3.1.0 h5py>=3.1.0
scipy~=1.7; python_version == '3.7' scipy~=1.7; python_version == '3.7'
scipy>=1.8; python_version >= '3.8' scipy>=1.8; python_version >= '3.8'
defusedxml>=0.7.1

View File

@ -3,7 +3,8 @@
import unittest import unittest
from unittest.mock import MagicMock from unittest.mock import MagicMock
from xml.etree.ElementTree import Element, tostring import defusedxml.ElementTree as ET
from defusedxml import defuse_stdlib
import numpy as np import numpy as np
@ -13,6 +14,12 @@ from openvino.tools.mo.utils.error import Error
from openvino.tools.mo.utils.runtime_info import RTInfo, OldAPIMapOrder, OldAPIMapElementType from openvino.tools.mo.utils.runtime_info import RTInfo, OldAPIMapOrder, OldAPIMapElementType
from unit_tests.utils.graph import build_graph, result, regular_op from unit_tests.utils.graph import build_graph, result, regular_op
# defuse_stdlib provide patched version of xml.etree.ElementTree which allows to use objects from xml.etree.ElementTree
# in a safe manner without including unsafe xml.etree.ElementTree
ET_defused = defuse_stdlib()[ET]
Element = ET_defused.Element
tostring = ET_defused.tostring
expected_result = b'<net><dim>2</dim><dim>10</dim><dim>50</dim><dim>50</dim></net>' expected_result = b'<net><dim>2</dim><dim>10</dim><dim>50</dim><dim>50</dim></net>'