[Python API] Small fixes in tests and change of format string to f-string (#3865)

This commit is contained in:
Anastasia Kuporosova 2021-01-15 12:48:58 +03:00 committed by GitHub
parent 5d63b5f41f
commit 320aafbb59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 121 additions and 124 deletions

View File

@ -36,11 +36,11 @@ class InferReqWrap:
def callback(self, statusCode, userdata):
if (userdata != self.id):
log.error("Request ID {} does not correspond to user data {}".format(self.id, userdata))
log.error(f"Request ID {self.id} does not correspond to user data {userdata}")
elif statusCode != 0:
log.error("Request {} failed with status code {}".format(self.id, statusCode))
log.error(f"Request {self.id} failed with status code {statusCode}")
self.cur_iter += 1
log.info("Completed {} Async request execution".format(self.cur_iter))
log.info(f"Completed {self.cur_iter} Async request execution")
if self.cur_iter < self.num_iter:
# here a user can read output containing inference results and put new input
# to repeat async request again
@ -53,7 +53,7 @@ class InferReqWrap:
def execute(self, mode, input_data):
if (mode == "async"):
log.info("Start inference ({} Asynchronous executions)".format(self.num_iter))
log.info(f"Start inference ({self.num_iter} Asynchronous executions)")
self.input = input_data
# Start async request for the first time. Wait all repetitions of the async request
self.request.async_infer(input_data)
@ -61,12 +61,12 @@ class InferReqWrap:
self.cv.wait()
self.cv.release()
elif (mode == "sync"):
log.info("Start inference ({} Synchronous executions)".format(self.num_iter))
log.info(f"Start inference ({self.num_iter} Synchronous executions)")
for self.cur_iter in range(self.num_iter):
# here we start inference synchronously and wait for
# last inference request execution
self.request.infer(input_data)
log.info("Completed {} Sync request execution".format(self.cur_iter + 1))
log.info(f"Completed {self.cur_iter + 1} Sync request execution")
else:
log.error("wrong inference mode is chosen. Please use \"sync\" or \"async\" mode")
sys.exit(1)
@ -122,11 +122,11 @@ def main():
for i in range(n):
image = cv2.imread(args.input[i])
if image.shape[:-1] != (h, w):
log.warning("Image {} is resized from {} to {}".format(args.input[i], image.shape[:-1], (h, w)))
log.warning(f"Image {args.input[i]} is resized from {image.shape[:-1]} to {(h, w)}")
image = cv2.resize(image, (w, h))
image = image.transpose((2, 0, 1)) # Change data layout from HWC to CHW
images[i] = image
log.info("Batch size is {}".format(n))
log.info(f"Batch size is {n}")
# Loading model to the plugin
log.info("Loading model to the plugin")
@ -144,7 +144,7 @@ def main():
# Processing output blob
log.info("Processing output blob")
res = infer_request.output_blobs[out_blob]
log.info("Top {} results: ".format(args.number_top))
log.info(f"Top {args.number_top} results: ")
if args.labels:
with open(args.labels, 'r') as f:
labels_map = [x.split(sep=' ', maxsplit=1)[-1].strip() for x in f]
@ -155,18 +155,16 @@ def main():
for i, probs in enumerate(res.buffer):
probs = np.squeeze(probs)
top_ind = np.argsort(probs)[-args.number_top:][::-1]
print("Image {}\n".format(args.input[i]))
print(f"Image {args.input[i]}\n")
print(classid_str, probability_str)
print("{} {}".format('-' * len(classid_str), '-' * len(probability_str)))
print(f"{'-' * len(classid_str)} {'-' * len(probability_str)}")
for id in top_ind:
det_label = labels_map[id] if labels_map else "{}".format(id)
det_label = labels_map[id] if labels_map else f"{id}"
label_length = len(det_label)
space_num_before = (7 - label_length) // 2
space_num_after = 7 - (space_num_before + label_length) + 2
space_num_before_prob = (11 - len(str(probs[id]))) // 2
print("{}{}{}{}{:.7f}".format(' ' * space_num_before, det_label,
' ' * space_num_after, ' ' * space_num_before_prob,
probs[id]))
print(f"{' ' * space_num_before}{det_label}"
f"{' ' * space_num_after}{probs[id]:.7f}")
print("\n")
log.info("This sample is an API example, for any performance measurements please use the dedicated benchmark_app tool\n")

View File

@ -75,7 +75,7 @@ def main():
for i in range(n):
image = cv2.imread(args.input)
if image.shape[:-1] != (h, w):
log.warning("Image {} is resized from {} to {}".format(args.input[i], image.shape[:-1], (h, w)))
log.warning(f"Image {args.input} is resized from {image.shape[:-1]} to {(h, w)}")
image = cv2.resize(image, (w, h))
image = image.transpose((2, 0, 1)) # Change data layout from HWC to CHW
images[i] = image
@ -91,7 +91,7 @@ def main():
# Processing output blob
log.info("Processing output blob")
res = res[out_blob]
log.info("Top {} results: ".format(args.number_top))
log.info(f"Top {args.number_top} results: ")
if args.labels:
with open(args.labels, 'r') as f:
labels_map = [x.split(sep=' ', maxsplit=1)[-1].strip() for x in f]
@ -102,18 +102,16 @@ def main():
for i, probs in enumerate(res):
probs = np.squeeze(probs)
top_ind = np.argsort(probs)[-args.number_top:][::-1]
print("Image {}\n".format(args.input[i]))
print(f"Image {args.input}\n")
print(classid_str, probability_str)
print("{} {}".format('-' * len(classid_str), '-' * len(probability_str)))
print(f"{'-' * len(classid_str)} {'-' * len(probability_str)}")
for id in top_ind:
det_label = labels_map[id] if labels_map else "{}".format(id)
det_label = labels_map[id] if labels_map else f"{id}"
label_length = len(det_label)
space_num_before = (len(classid_str) - label_length) // 2
space_num_after = len(classid_str) - (space_num_before + label_length) + 2
space_num_before_prob = (len(probability_str) - len(str(probs[id]))) // 2
print("{}{}{}{}{:.7f}".format(' ' * space_num_before, det_label,
' ' * space_num_after, ' ' * space_num_before_prob,
probs[id]))
print(f"{' ' * space_num_before}{det_label}"
f"{' ' * space_num_after}{probs[id]:.7f}")
print("\n")
log.info("This sample is an API example, for any performance measurements please use the dedicated benchmark_app tool\n")

View File

@ -9,7 +9,7 @@ def param_to_string(metric):
elif isinstance(metric, dict):
str_param_repr = ""
for k, v in metric.items():
str_param_repr += "{}: {}\n".format(k, v)
str_param_repr += f"{k}: {v}\n"
return str_param_repr
else:
return str(metric)
@ -19,22 +19,22 @@ def main():
ie = IECore()
print("Available devices:")
for device in ie.available_devices:
print("\tDevice: {}".format(device))
print(f"\tDevice: {device}")
print("\tMetrics:")
for metric in ie.get_metric(device, "SUPPORTED_METRICS"):
try:
metric_val = ie.get_metric(device, metric)
print("\t\t{}: {}".format(metric, param_to_string(metric_val)))
print(f"\t\t{metric}: {param_to_string(metric_val)}")
except TypeError:
print("\t\t{}: UNSUPPORTED TYPE".format(metric))
print(f"\t\t{metric}: UNSUPPORTED TYPE")
print("\n\tDefault values for device configuration keys:")
for cfg in ie.get_metric(device, "SUPPORTED_CONFIG_KEYS"):
try:
cfg_val = ie.get_config(device, cfg)
print("\t\t{}: {}".format(cfg, param_to_string(cfg_val)))
print(f"\t\t{cfg}: {param_to_string(cfg_val)}")
except TypeError:
print("\t\t{}: UNSUPPORTED TYPE".format(cfg))
print(f"\t\t{cfg}: UNSUPPORTED TYPE")
if __name__ == '__main__':
sys.exit(main() or 0)

View File

@ -34,7 +34,8 @@ def build_argparser() -> ArgumentParser:
args.add_argument('-h', '--help', action='help', default=SUPPRESS, help='Show this help message and exit.')
args.add_argument('-i', '--input', help='Required. Path to a folder with images or path to an image files',
required=True, type=str, nargs="+")
args.add_argument('-m', '--model', help='Required. Path to file where weights for the network are located')
args.add_argument('-m', '--model', help='Required. Path to file where weights for the network are located',
required=True)
args.add_argument('-d', '--device',
help='Optional. Specify the target device to infer on; CPU, GPU, FPGA, HDDL, MYRIAD or HETERO: '
'is acceptable. The sample will look for a suitable plugin for device specified. Default '
@ -218,13 +219,13 @@ def main():
images = np.ndarray(shape=(n, c, h, w))
for i in range(n):
image = read_image(input_images[i])
assert image is not None, log.error("Can't open an image {}".format(input_images[i]))
assert image is not None, log.error(f"Can't open an image {input_images[i]}")
assert len(image.shape) == 2, log.error('Sample supports images with 1 channel only')
if image.shape[:] != (w, h):
log.warning("Image {} is resized from {} to {}".format(input_images[i], image.shape[:], (w, h)))
log.warning(f"Image {input_images[i]} is resized from {image.shape[:]} to {(w, h)}")
image = cv2.resize(image, (w, h))
images[i] = image
log.info("Batch size is {}".format(n))
log.info(f"Batch size is {n}")
log.info("Creating Inference Engine")
ie = IECore()
@ -239,7 +240,7 @@ def main():
# Processing results
log.info("Processing output blob")
res = res[out_blob]
log.info("Top {} results: ".format(args.number_top))
log.info(f"Top {args.number_top} results: ")
# Read labels file if it is provided as argument
labels_map = None
@ -252,18 +253,16 @@ def main():
for i, probs in enumerate(res):
probs = np.squeeze(probs)
top_ind = np.argsort(probs)[-args.number_top:][::-1]
print("Image {}\n".format(input_images[i]))
print(f"Image {input_images[i]}\n")
print(classid_str, probability_str)
print("{} {}".format('-' * len(classid_str), '-' * len(probability_str)))
print(f"{'-' * len(classid_str)} {'-' * len(probability_str)}")
for class_id in top_ind:
det_label = labels_map[class_id] if labels_map else "{}".format(class_id)
det_label = labels_map[class_id] if labels_map else f"{class_id}"
label_length = len(det_label)
space_num_before = (len(classid_str) - label_length) // 2
space_num_after = len(classid_str) - (space_num_before + label_length) + 2
space_num_before_prob = (len(probability_str) - len(str(probs[class_id]))) // 2
print("{}{}{}{}{:.7f}".format(' ' * space_num_before, det_label,
' ' * space_num_after, ' ' * space_num_before_prob,
probs[class_id]))
print(f"{' ' * space_num_before}{det_label}"
f"{' ' * space_num_after}{probs[class_id]:.7f}")
print("\n")
log.info('This sample is an API example, for any performance measurements '

View File

@ -63,14 +63,13 @@ def main():
# ------------- 2. Load Plugin for inference engine and extensions library if specified --------------
log.info("Device info:")
versions = ie.get_versions(args.device)
print("{}{}".format(" " * 8, args.device))
print("{}MKLDNNPlugin version ......... {}.{}".format(" " * 8, versions[args.device].major,
versions[args.device].minor))
print("{}Build ........... {}".format(" " * 8, versions[args.device].build_number))
print(f"{' ' * 8}{args.device}")
print(f"{' ' * 8}MKLDNNPlugin version ......... {versions[args.device].major}.{versions[args.device].minor}")
print(f"{' ' * 8}Build ........... {versions[args.device].build_number}")
if args.cpu_extension and "CPU" in args.device:
ie.add_extension(args.cpu_extension, "CPU")
log.info("CPU extension loaded: {}".format(args.cpu_extension))
log.info(f"CPU extension loaded: {args.cpu_extension}")
# -----------------------------------------------------------------------------------------------------
# --------------------------- 3. Read and preprocess input --------------------------------------------
@ -91,9 +90,9 @@ def main():
ih, iw = image.shape[:-1]
images_hw.append((ih, iw))
log.info("File was added: ")
log.info(" {}".format(args.input))
log.info(f" {args.input}")
if (ih, iw) != (h, w):
log.warning("Image {} is resized from {} to {}".format(args.input, image.shape[:-1], (h, w)))
log.warning(f"Image {args.input} is resized from {image.shape[:-1]} to {(h, w)}")
image = cv2.resize(image, (w, h))
image = image.transpose((2, 0, 1)) # Change data layout from HWC to CHW
images[i] = image
@ -177,8 +176,8 @@ def main():
ymin = np.int(ih * proposal[4])
xmax = np.int(iw * proposal[5])
ymax = np.int(ih * proposal[6])
print("[{},{}] element, prob = {:.6} ({},{})-({},{}) batch id : {}" \
.format(number, label, confidence, xmin, ymin, xmax, ymax, imid), end="")
print(f"[{number},{label}] element, prob = {confidence:.6f} ({xmin},{ymin})-({xmax},{ymax}) "
f"batch id : {imid}", end="")
if proposal[2] > 0.5:
print(" WILL BE PRINTED!")
if not imid in boxes.keys():

View File

@ -81,11 +81,11 @@ def main():
for i in range(n):
image = cv2.imread(args.input[i])
if image.shape[:-1] != (h, w):
log.warning("Image {} is resized from {} to {}".format(args.input[i], image.shape[:-1], (h, w)))
log.warning(f"Image {args.input[i]} is resized from {image.shape[:-1]} to {(h, w)}")
image = cv2.resize(image, (w, h))
image = image.transpose((2, 0, 1)) # Change data layout from HWC to CHW
images[i] = image
log.info("Batch size is {}".format(n))
log.info(f"Batch size is {n}")
# Loading model to the plugin
log.info("Loading model to the plugin")
@ -107,9 +107,9 @@ def main():
data[data < 0] = 0
data[data > 255] = 255
data = data[::] - (args.mean_val_r, args.mean_val_g, args.mean_val_b)
out_img = os.path.join(os.path.dirname(__file__), "out_{}.bmp".format(batch))
out_img = os.path.join(os.path.dirname(__file__), f"out_{batch}.bmp")
cv2.imwrite(out_img, data)
log.info("Result image was saved to {}".format(out_img))
log.info(f"Result image was saved to {out_img}")
log.info("This sample is an API example, for any performance measurements please use the dedicated benchmark_app tool\n")

View File

@ -71,8 +71,7 @@ cdef class TensorDesc:
# @return Instance of defines class
def __cinit__(self, precision : str, dims : [list, tuple], layout : str):
if precision not in supported_precisions:
raise ValueError("Unsupported precision {}! List of supported precisions: {}".format(precision,
supported_precisions))
raise ValueError(f"Unsupported precision {precision}! List of supported precisions: {supported_precisions}")
self.impl = C.CTensorDesc(C.Precision.FromStr(precision.encode()), dims, layout_str_to_enum[layout])
## Shape (dimensions) of the TensorDesc object
@property
@ -88,8 +87,7 @@ cdef class TensorDesc:
@precision.setter
def precision(self, precision : str):
if precision not in supported_precisions:
raise ValueError("Unsupported precision {}! List of supported precisions: {}".format(precision,
supported_precisions))
raise ValueError(f"Unsupported precision {precision}! List of supported precisions: {supported_precisions}")
self.impl.setPrecision(C.Precision.FromStr(precision.encode()))
## Layout of the TensorDesc object
@property
@ -98,8 +96,8 @@ cdef class TensorDesc:
@layout.setter
def layout(self, layout : str):
if layout not in layout_str_to_enum.keys():
raise ValueError("Unsupported layout {}! "
"List of supported layouts: {}".format(layout, list(layout_str_to_enum.keys())))
raise ValueError(f"Unsupported layout {layout}! "
f"List of supported layouts: {list(layout_str_to_enum.keys())}")
self.impl.setLayout(layout_str_to_enum[layout])
## This class represents Blob
@ -153,7 +151,7 @@ cdef class Blob:
elif precision == "I64":
self._ptr = C.make_shared_blob[int64_t](c_tensor_desc)
else:
raise AttributeError("Unsupported precision {} for blob".format(precision))
raise AttributeError(f"Unsupported precision {precision} for blob")
deref(self._ptr).allocate()
elif tensor_desc is not None and self._array_data is not None:
c_tensor_desc = tensor_desc.impl
@ -161,11 +159,11 @@ cdef class Blob:
size_arr = np.prod(array.shape)
size_td = np.prod(tensor_desc.dims)
if size_arr != size_td:
raise AttributeError("Number of elements in provided numpy array {} and "
"required by TensorDesc {} are not equal".format(size_arr, size_td))
raise AttributeError(f"Number of elements in provided numpy array {size_arr} and "
f"required by TensorDesc {size_td} are not equal")
if self._array_data.dtype != format_map[precision]:
raise ValueError("Data type {} of provided numpy array "
"doesn't match to TensorDesc precision {}".format(self._array_data.dtype, precision))
raise ValueError(f"Data type {self._array_data.dtype} of provided numpy array "
f"doesn't match to TensorDesc precision {precision}")
if not self._array_data.flags['C_CONTIGUOUS']:
self._array_data = np.ascontiguousarray(self._array_data)
if precision == "FP32":
@ -195,7 +193,7 @@ cdef class Blob:
I64_array_memview = self._array_data
self._ptr = C.make_shared_blob[int64_t](c_tensor_desc, &I64_array_memview[0], I64_array_memview.shape[0])
else:
raise AttributeError("Unsupported precision {} for blob".format(precision))
raise AttributeError(f"Unsupported precision {precision} for blob")
def __deepcopy__(self, memodict):
res = Blob(deepcopy(self.tensor_desc, memodict), deepcopy(self._array_data, memodict))
@ -277,13 +275,13 @@ cdef class IECore:
model = os.fspath(model)
if not os.path.isfile(model):
raise Exception("Path to the model {} doesn't exist or it's a directory".format(model))
raise Exception(f"Path to the model {model} doesn't exist or it's a directory")
model_ = model.encode()
if not (fnmatch(model, "*.onnx") or fnmatch(model, "*.prototxt")) and weights:
weights = os.fspath(weights)
if not os.path.isfile(weights):
raise Exception("Path to the weights {} doesn't exist or it's a directory".format(weights))
raise Exception(f"Path to the weights {weights} doesn't exist or it's a directory")
weights_ = weights.encode()
net.impl = self.impl.readNetwork(model_, weights_)
@ -311,8 +309,8 @@ cdef class IECore:
cdef ExecutableNetwork exec_net = ExecutableNetwork()
cdef map[string, string] c_config
if num_requests < 0:
raise ValueError("Incorrect number of requests specified: {}. Expected positive integer number "
"or zero for auto detection".format(num_requests))
raise ValueError(f"Incorrect number of requests specified: {num_requests}. Expected positive integer number "
"or zero for auto detection")
if config:
c_config = dict_to_c_map(config)
exec_net.ie_core_impl = self.impl
@ -341,8 +339,8 @@ cdef class IECore:
cdef ExecutableNetwork exec_net = ExecutableNetwork()
cdef map[string, string] c_config
if num_requests < 0:
raise ValueError("Incorrect number of requests specified: {}. Expected positive integer number "
"or zero for auto detection".format(num_requests))
raise ValueError(f"Incorrect number of requests specified: {num_requests}. Expected positive integer number "
"or zero for auto detection")
if config:
c_config = dict_to_c_map(config)
exec_net.ie_core_impl = self.impl
@ -600,8 +598,7 @@ cdef class InputInfoPtr:
@precision.setter
def precision(self, precision : str):
if precision not in supported_precisions:
raise ValueError("Unsupported precision {}! List of supported precisions: {}".format(precision,
supported_precisions))
raise ValueError(f"Unsupported precision {precision}! List of supported precisions: {supported_precisions}")
deref(self._ptr).setPrecision(C.Precision.FromStr(precision.encode()))
## Layout of this input
@ -612,8 +609,8 @@ cdef class InputInfoPtr:
@layout.setter
def layout(self, layout : str):
if layout not in layout_str_to_enum.keys():
raise ValueError("Unsupported layout {}! "
"List of supported layouts: {}".format(layout, list(layout_str_to_enum.keys())))
raise ValueError(f"Unsupported layout {layout}! "
f"List of supported layouts: {list(layout_str_to_enum.keys())}")
deref(self._ptr).setLayout(layout_str_to_enum[layout])
## Gets pre-process info for the input
@ -708,8 +705,7 @@ cdef class DataPtr:
@precision.setter
def precision(self, precision):
if precision not in supported_precisions:
raise ValueError("Unsupported precision {}! List of supported precisions: {}".format(precision,
supported_precisions))
raise ValueError(f"Unsupported precision {precision}! List of supported precisions: {supported_precisions}")
deref(self._ptr).setPrecision(C.Precision.FromStr(precision.encode()))
## Shape (dimensions) of the data object
@ -725,8 +721,8 @@ cdef class DataPtr:
@layout.setter
def layout(self, layout):
if layout not in layout_str_to_enum.keys():
raise ValueError("Unsupported layout {}! "
"List of supported layouts: {}".format(layout, list(layout_str_to_enum.keys())))
raise ValueError(f"Unsupported layout {layout}! "
f"List of supported layouts: {list(layout_str_to_enum.keys())}")
deref(self._ptr).setLayout(layout_str_to_enum[layout])
## Checks if the current data object is resolved
@ -979,7 +975,7 @@ cdef class InferRequest:
#
# Usage example:\n
# ```python
# callback = lambda status, py_data: print("Request with id {} finished with status {}".format(py_data, status))
# callback = lambda status, py_data: print(f"Request with id {py_data} finished with status {status}")
# ie = IECore()
# net = ie.read_network(model="./model.xml", weights="./model.bin")
# exec_net = ie.load_network(net, "CPU", num_requests=4)
@ -1220,12 +1216,12 @@ cdef class InferRequest:
# ```
def set_batch(self, size):
if size <= 0:
raise ValueError("Batch size should be positive integer number but {} specified".format(size))
raise ValueError(f"Batch size should be positive integer number but {size} specified")
deref(self.impl).setBatch(size)
def _fill_inputs(self, inputs):
for k, v in inputs.items():
assert k in self._inputs_list, "No input with name {} found in network".format(k)
assert k in self._inputs_list, f"No input with name {k} found in network"
self.input_blobs[k].buffer[:] = v
@ -1288,9 +1284,9 @@ cdef class IENetwork:
"Please, use IECore.read_network() method instead",
DeprecationWarning)
if not os.path.isfile(model):
raise Exception("Path to the model {} doesn't exist or it's a directory".format(model))
raise Exception(f"Path to the model {model} doesn't exist or it's a directory")
if not os.path.isfile(weights):
raise Exception("Path to the weights {} doesn't exist or it's a directory".format(weights))
raise Exception(f"Path to the weights {weights} doesn't exist or it's a directory")
model_ = model.encode()
weights_ = weights.encode()
self.impl = C.IENetwork(model_, weights_)
@ -1367,7 +1363,7 @@ cdef class IENetwork:
@batch_size.setter
def batch_size(self, batch: int):
if batch <= 0:
raise AttributeError("Invalid batch size {}! Batch size should be positive integer value".format(batch))
raise AttributeError(f"Invalid batch size {batch}! Batch size should be positive integer value")
self.impl.setBatch(batch)
@ -1392,9 +1388,9 @@ cdef class IENetwork:
elif isinstance(l, tuple) and len(l) == 2:
self.impl.addOutput(l[0].encode(), l[1])
else:
raise TypeError("Incorrect type {type} for layer to add at index {ind}. "
raise TypeError(f"Incorrect type {type(l)} for layer to add at index {i}. "
"Expected string with layer name or tuple with two elements: layer name as "
"first element and port id as second".format(type=type(l), ind=i))
"first element and port id as second")
## Serializes the network and stores it in files.
#
@ -1434,7 +1430,7 @@ cdef class IENetwork:
for input, shape in input_shapes.items():
c_shape = []
if input not in net_inputs:
raise AttributeError("Specified '{}' layer not in network inputs '{}'! ".format(input, net_inputs))
raise AttributeError(f"Specified '{input}' layer not in network inputs '{net_inputs}'! ")
for v in shape:
c_shape.push_back(v)
c_input_shapes[input.encode()] = c_shape
@ -1503,7 +1499,7 @@ cdef class BlobBuffer:
'U64': 'Q', # unsigned long int
}
if name not in precision_to_format:
raise ValueError("Unknown Blob precision: {}".format(name))
raise ValueError(f"Unknown Blob precision: {name}")
return precision_to_format[name].encode()

View File

@ -209,6 +209,7 @@ def test_wrong_num_requests(device):
in str(e.value)
del ie_core
def test_wrong_num_requests_core(device):
with pytest.raises(ValueError) as e:
ie_core = ie.IECore()
@ -218,6 +219,7 @@ def test_wrong_num_requests_core(device):
in str(e.value)
del ie_core
def test_plugin_accessible_after_deletion(device):
ie_core = ie.IECore()
net = ie_core.read_network(model=test_net_xml, weights=test_net_bin)
@ -229,6 +231,8 @@ def test_plugin_accessible_after_deletion(device):
del ie_core
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") == "ARM",
reason=f"Cannot run test on device {os.environ.get('TEST_DEVICE')}")
def test_exec_graph(device):
ie_core = ie.IECore()
net = ie_core.read_network(model=test_net_xml, weights=test_net_bin)
@ -245,8 +249,8 @@ def test_exec_graph(device):
del ie_core
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "MYRIAD", reason="Device specific test. "
"Only MYRIAD plugin implements network export")
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "MYRIAD",
reason="Device specific test. Only MYRIAD plugin implements network export")
def test_export_import():
ie_core = ie.IECore()
net = ie_core.read_network(model=test_net_xml, weights=test_net_bin)
@ -264,7 +268,7 @@ def test_export_import():
def test_multi_out_data(device):
# Regression test CVS-23965
# Regression test 23965
# Check that CDataPtr for all output layers not copied between outputs map items
ie_core = ie.IECore()
net = ie_core.read_network(model=test_net_xml, weights=test_net_bin)
@ -282,7 +286,7 @@ def test_multi_out_data(device):
def test_get_metric(device):
ie_core = ie.IECore()
net = ie_core.read_network(model=test_net_xml, weights=test_net_bin)
exec_net = ie_core.load_network(net, "CPU")
exec_net = ie_core.load_network(net, device)
network_name = exec_net.get_metric("NETWORK_NAME")
assert network_name == "test_model"

View File

@ -97,57 +97,56 @@ def test_unregister_plugin(device):
net = ie.read_network(model=test_net_xml, weights=test_net_bin)
with pytest.raises(RuntimeError) as e:
ie.load_network(net, device)
assert 'Device with "{}" name is not registered in the InferenceEngine'.format(device) in str(e.value)
assert f"Device with '{device}' name is not registered in the InferenceEngine" in str(e.value)
def test_available_devices(device):
ie = IECore()
devices = ie.available_devices
assert device in devices, "Current device '{}' is not listed in available devices '{}'".format(device,
', '.join(devices))
assert device in devices, f"Current device '{device}' is not listed in available devices '{', '.join(devices)}'"
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "CPU",
reason="Cannot run test on device {}," "Plugin specific test".format(os.environ.get("TEST_DEVICE")))
reason=f"Cannot run test on device {os.environ.get('TEST_DEVICE')}, Plugin specific test")
def test_get_metric_list_of_str():
ie = IECore()
param = ie.get_metric("CPU", "OPTIMIZATION_CAPABILITIES")
assert isinstance(param, list), "Parameter value for 'OPTIMIZATION_CAPABILITIES' " \
"metric must be a list but {} is returned".format(type(param))
f"metric must be a list but {type(param)} is returned"
assert all(isinstance(v, str) for v in param), "Not all of the parameter values for 'OPTIMIZATION_CAPABILITIES' " \
"metric are strings!"
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "CPU",
reason="Cannot run test on device {}," "Plugin specific test".format(os.environ.get("TEST_DEVICE")))
reason=f"Cannot run test on device {os.environ.get('TEST_DEVICE')}, Plugin specific test")
def test_get_metric_tuple_of_two_ints():
ie = IECore()
param = ie.get_metric("CPU", "RANGE_FOR_STREAMS")
assert isinstance(param, tuple), "Parameter value for 'RANGE_FOR_STREAMS' " \
"metric must be tuple but {} is returned".format(type(param))
f"metric must be tuple but {type(param)} is returned"
assert all(isinstance(v, int) for v in param), "Not all of the parameter values for 'RANGE_FOR_STREAMS' " \
"metric are integers!"
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "CPU",
reason="Cannot run test on device {}," "Plugin specific test".format(os.environ.get("TEST_DEVICE")))
reason=f"Cannot run test on device {os.environ.get('TEST_DEVICE')}, Plugin specific test")
def test_get_metric_tuple_of_three_ints():
ie = IECore()
param = ie.get_metric("CPU", "RANGE_FOR_ASYNC_INFER_REQUESTS")
assert isinstance(param, tuple), "Parameter value for 'RANGE_FOR_ASYNC_INFER_REQUESTS' " \
"metric must be tuple but {} is returned".format(type(param))
f"metric must be tuple but {type(param)} is returned"
assert all(isinstance(v, int) for v in param), "Not all of the parameter values for " \
"'RANGE_FOR_ASYNC_INFER_REQUESTS' metric are integers!"
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "CPU",
reason="Cannot run test on device {}," "Plugin specific test".format(os.environ.get("TEST_DEVICE")))
reason=f"Cannot run test on device {os.environ.get('TEST_DEVICE')}, Plugin specific test")
def test_get_metric_str():
ie = IECore()
param = ie.get_metric("CPU", "FULL_DEVICE_NAME")
assert isinstance(param, str), "Parameter value for 'FULL_DEVICE_NAME' " \
"metric must be string but {} is returned".format(type(param))
f"metric must be string but {type(param)} is returned"
def test_read_network_from_xml():

View File

@ -181,11 +181,20 @@ def test_batch_size_after_reshape():
def test_serialize():
with pytest.raises(RuntimeError) as excinfo:
ie = IECore()
net = ie.read_network(model=test_net_xml, weights=test_net_bin)
net.serialize("./serialized_net.xml", "./serialized_net.bin")
assert "The serialize for IR v10 is not implemented" in str(excinfo.value)
import ngraph as ng
ie = IECore()
net = ie.read_network(model=test_net_xml, weights=test_net_bin)
net.serialize("./serialized_net.xml", "./serialized_net.bin")
serialized_net = ie.read_network(model="./serialized_net.xml", weights="./serialized_net.bin")
func_net = ng.function_from_cnn(net)
ops_net = func_net.get_ordered_ops()
ops_net_names = [op.friendly_name for op in ops_net]
func_serialized_net = ng.function_from_cnn(serialized_net)
ops_serialized_net = func_serialized_net.get_ordered_ops()
ops_serialized_net_names = [op.friendly_name for op in ops_serialized_net]
assert ops_serialized_net_names == ops_net_names
os.remove("./serialized_net.xml")
os.remove("./serialized_net.bin")
def test_reshape():
@ -227,7 +236,7 @@ def test_net_from_buffer_valid_deprecated():
def test_multi_out_data():
# Regression test CVS-23965
# Regression test 23965
# Check that DataPtr for all output layers not copied between outputs map items
ie = IECore()
net = ie.read_network(model=test_net_xml, weights=test_net_bin)

View File

@ -387,9 +387,9 @@ def test_get_perf_counts(device):
del net
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "CPU", reason="Can't run test on device {},"
"Dynamic batch fully supported only on CPU".format(
os.environ.get("TEST_DEVICE", "CPU")))
@pytest.mark.skipif(os.environ.get("TEST_DEVICE", "CPU") != "CPU",
reason=f"Can't run test on device {os.environ.get('TEST_DEVICE', 'CPU')}, "
"Dynamic batch fully supported only on CPU")
def test_set_batch_size(device):
ie_core = ie.IECore()
ie_core.set_config({"DYN_BATCH_ENABLED": "YES"}, device)
@ -491,7 +491,7 @@ def test_resize_algorithm_work(device):
net.input_info['data'].preprocess_info.resize_algorithm = ie.ResizeAlgorithm.RESIZE_BILINEAR
exec_net_2 = ie_core.load_network(net, 'CPU')
exec_net_2 = ie_core.load_network(net, device)
import cv2

View File

@ -41,13 +41,8 @@ def test_get_ops_from_IENetwork():
func = ng.function_from_cnn(net)
ops = func.get_ordered_ops()
ops_names = [op.friendly_name for op in ops]
assert ops_names == ['data', '20/mean/Fused_Mul_614616_const', '19/WithoutBiases', 'data_add_575/copy_const',
'19/Fused_Add_', '21', '22', 'onnx_initializer_node_8/Output_0/Data__const',
'23/WithoutBiases', '23/Dims357/copy_const', '23', '25/mean/Fused_Mul_618620_const',
'24/WithoutBiases', 'data_add_578583/copy_const', '24/Fused_Add_', '26', '27',
'Constant_223', 'onnx_initializer_node_17/Output_0/Data__const', 'ShapeOf_219', 'Constant_221',
'Constant_220', 'Gather_222', '28/Reshape/Cast_1955_const', '28/Reshape', '29/WithoutBiases',
'onnx_initializer_node_18/Output_0/Data_/copy_const', '29', 'fc_out', 'fc_out/sink_port_0']
assert len(ops_names) != 0
assert 'data' in ops_names
def test_get_type_name():