Files
openvino/tests/stress_tests/scripts/memcheck_upload.py

166 lines
5.9 KiB
Python

#!/usr/bin/env python3
# Copyright (C) 2020 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
#
"""
Upload metrics gathered by MemCheckTests into Mongo DB
Usage: ./scrips/memcheck_upload.py https://ci.intel.com/job/memchek/1234/ \
./gtest-parallel-logs/**/*.log \
--artifact_root ./gtest-parallel-logs --dryrun
"""
import json
import logging
from types import SimpleNamespace
import os
import re
import sys
import argparse
from glob import glob
import xml.etree.ElementTree as ET
import hashlib
from pymongo import MongoClient
DATABASE = 'memcheck'
RE_GTEST_MODEL_XML = re.compile(r'<model[^>]*>')
RE_GTEST_CUR_MEASURE = re.compile(
r'Current values of virtual memory consumption')
RE_GTEST_REF_MEASURE = re.compile(
r'Reference values of virtual memory consumption')
RE_GTEST_PASSED = re.compile(r'\[\s*PASSED\s*\]')
RE_GTEST_FAILED = re.compile(r'\[\s*FAILED\s*\]')
GTEST_INFO = '[ INFO ]'
PRECISSIONS = ('FP32', 'FP16', 'INT8')
KEY_FIELDS = ('test_name', 'model', 'device', 'build_url')
def globber(paths):
"""Generator extending paths with wildcards"""
for path in paths:
if any(magic in path for magic in ['*', '?', '!', '[', ']']):
for resolved in glob(path, recursive=True):
yield resolved
else:
yield path
def parse_memcheck_log(log_path):
""" Parse memcheck log
"""
with open(log_path, 'r') as log_file:
log = log_file.read()
passed_match = RE_GTEST_PASSED.search(log)
failed_match = RE_GTEST_FAILED.search(log)
model_match = RE_GTEST_MODEL_XML.search(log)
if not model_match:
return None
model = ET.fromstring(model_match.group(0)).attrib
log_lines = log.splitlines()
for index, line in enumerate(log_lines):
if RE_GTEST_REF_MEASURE.search(line):
heading = [name.lower() for name in log_lines[index+1]
[len(GTEST_INFO):].split()]
values = [int(val) for val in log_lines[index+2]
[len(GTEST_INFO):].split()]
ref_metrics = dict(zip(heading, values))
for index in reversed(range(len(log_lines))):
if RE_GTEST_CUR_MEASURE.search(log_lines[index]):
heading = [name.lower() for name in log_lines[index+1]
[len(GTEST_INFO):].split()]
values = [int(val) for val in log_lines[index+2]
[len(GTEST_INFO):].split()]
entry = SimpleNamespace(
metrics=dict(zip(heading, values)),
test_name=model['test'],
model_name=os.path.splitext(
os.path.basename(model['path']))[0],
precision=next(pr for pr in PRECISSIONS if pr.upper()
in model['path'].upper()),
model=model['path'],
device=model['device'].upper(),
status='passed' if passed_match else 'failed' if failed_match else 'started'
)
if ref_metrics:
entry.ref_metrics = ref_metrics
return vars(entry)
return None
def create_memcheck_records(logs, build_url, artifact_root, append=None):
""" Parse memcheck logs and create records for MongoDB
"""
records = []
for log in logs:
data = parse_memcheck_log(log)
if not data:
continue
data['build_url'] = build_url
data['log_path'] = os.path.relpath(log, artifact_root)
if append:
data.update(append)
data['_id'] = hashlib.sha256(
''.join([str(data[key]) for key in KEY_FIELDS]).encode()).hexdigest()
records += [data]
return records
def upload_memcheck_records(records, db_url, db_collection):
""" Upload records created by create_memcheck_records
"""
client = MongoClient(db_url)
collection = client[DATABASE][db_collection]
for record in records:
collection.replace_one({'_id': record['_id']}, record, upsert=True)
def main():
"""Main entry point.
"""
parser = argparse.ArgumentParser(
description='Upload metrics gathered by memcheck into Mongo DB',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dryrun', action="store_true",
help='Parse logs, not modify database.')
is_dryrun = parser.parse_known_args()[0].dryrun
parser.add_argument('build_url', help='A place where memcheck execution logs can be found.')
parser.add_argument('log', nargs='+', help='Local path to log. Extended wildcards supported.')
parser.add_argument('--db_url', required=not is_dryrun,
help='MongoDB URL in a for "mongodb://server:port".')
parser.add_argument('--db_collection', required=not is_dryrun,
help=f'Collection name in {DATABASE} database to upload')
parser.add_argument('--artifact_root', required=True,
help=f'A root directory to strip from log path before upload.')
parser.add_argument('--append', help='JSON to append to each item.')
args = parser.parse_args()
logging.basicConfig(format="{file}: [ %(levelname)s ] %(message)s".format(
file=os.path.basename(__file__)), level=logging.INFO, stream=sys.stdout)
if args.append:
with open(args.append, 'r') as append_file:
append = json.load(append_file)
else:
append = None
logs = list(globber(args.log))
records = create_memcheck_records(
logs, args.build_url, args.artifact_root, append=append)
logging.info('Prepared %d records', len(records))
if len(records) != len(logs):
logging.warning(
'Skipped %d logs of %d', len(logs) - len(records), len(logs))
if not args.dryrun:
upload_memcheck_records(records, args.db_url, args.db_collection)
logging.info('Uploaded to %s', args.db_url)
else:
print(json.dumps(records, sort_keys=True, indent=4))
if __name__ == "__main__":
main()