Files
openvino/tests/stress_tests/scripts/memcheck_upload.py
Vitaliy Urusovskij 6fcad95c71 [Stress] Redesign of MemCheckTests (#650)
* [Stress] Redesigned MemCheckTests: 1. Added MemCheckPipeline to incapsulate measures and logging. 2. Moved references to array

* [Stress] Added tracking of THREADS in MemCheckTests
2020-07-23 11:07:13 +03:00

274 lines
9.9 KiB
Python

#!/usr/bin/env python3
# Copyright (C) 2020 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
#
"""
Upload metrics gathered by MemCheckTests into Mongo DB
Usage: ./scrips/memcheck_upload.py https://ci.intel.com/job/memchek/1234/ \
./gtest-parallel-logs/**/*.log \
--artifact_root ./gtest-parallel-logs --dryrun
"""
import json
import logging
from types import SimpleNamespace
import os
import re
import sys
import argparse
from inspect import getsourcefile
from glob import glob
import xml.etree.ElementTree as ET
import hashlib
import yaml
from pymongo import MongoClient
# Database arguments
DATABASE = 'memcheck' # database name for memcheck results
DB_COLLECTIONS = ["commit", "nightly", "weekly"]
PRODUCT_NAME = 'dldt' # product name from build manifest
RE_GTEST_MODEL_XML = re.compile(r'<model[^>]*>')
RE_GTEST_CUR_MEASURE = re.compile(r'\[\s*MEASURE\s*\]')
RE_GTEST_REF_MEASURE = re.compile(
r'Reference values of virtual memory consumption')
RE_GTEST_PASSED = re.compile(r'\[\s*PASSED\s*\]')
RE_GTEST_FAILED = re.compile(r'\[\s*FAILED\s*\]')
GTEST_INFO = '[ INFO ]'
PRECISSIONS = ('FP32', 'FP16', 'INT8')
KEY_FIELDS = ('test_name', 'model', 'device', 'build_url')
def abs_path(relative_path):
"""Return absolute path given path relative to the current file.
"""
return os.path.realpath(
os.path.join(os.path.dirname(getsourcefile(lambda: 0)), relative_path))
def metadata_from_manifest(manifest):
""" Extract commit metadata for memcheck record from manifest
"""
with open(manifest, 'r') as manifest_file:
manifest = yaml.safe_load(manifest_file)
repo_trigger = next(
repo for repo in manifest['components'][PRODUCT_NAME]['repository'] if repo['trigger'])
# parse OS name/version
product_type_str = manifest['components'][PRODUCT_NAME]['product_type']
product_type = product_type_str.split('_')
if len(product_type) != 5 or product_type[2] != 'ubuntu':
logging.error('Product type %s is not supported', product_type_str)
return {}
return {
'os_name': product_type[2],
'os_version': [product_type[3], product_type[4]],
'commit_sha': repo_trigger['revision'],
'commit_date': repo_trigger['commit_time'],
'repo_url': repo_trigger['url'],
'target_branch': repo_trigger['target_branch'],
'event_type': manifest['components'][PRODUCT_NAME]['build_event'].lower(),
}
def parse_memcheck_log(log_path):
""" Parse memcheck log
"""
try:
with open(log_path, 'r') as log_file:
log = log_file.read()
except FileNotFoundError:
# Skip read of broken files
return None
passed_match = RE_GTEST_PASSED.search(log)
failed_match = RE_GTEST_FAILED.search(log)
model_match = RE_GTEST_MODEL_XML.search(log)
if not model_match:
return None
model = ET.fromstring(model_match.group(0)).attrib
log_lines = log.splitlines()
for index, line in enumerate(log_lines):
if RE_GTEST_REF_MEASURE.search(line):
heading = [name.lower() for name in log_lines[index+1]
[len(GTEST_INFO):].split()]
values = [int(val) for val in log_lines[index+2]
[len(GTEST_INFO):].split()]
ref_metrics = dict(zip(heading, values))
for index in reversed(range(len(log_lines))):
if RE_GTEST_CUR_MEASURE.search(log_lines[index]):
test_name = log_lines[index].split()[-1]
heading = [name.lower() for name in log_lines[index+1]
[len(GTEST_INFO):].split()]
values = [int(val) for val in log_lines[index+2]
[len(GTEST_INFO):].split()]
entry = SimpleNamespace(
metrics=dict(zip(heading, values)),
test_name=test_name,
model_name=os.path.splitext(
os.path.basename(model['path']))[0],
precision=next(pr for pr in PRECISSIONS if pr.upper()
in model['path'].upper()),
model=model['path'],
device=model['device'].upper(),
status='passed' if passed_match else 'failed' if failed_match else 'started'
)
if ref_metrics:
entry.ref_metrics = ref_metrics
return vars(entry)
return None
def create_memcheck_records(logs, build_url, artifact_root, append=None):
""" Parse memcheck logs and create records for MongoDB
"""
records = []
for log in logs:
data = parse_memcheck_log(log)
if not data:
continue
data['build_url'] = build_url
data['log_path'] = os.path.relpath(log, artifact_root)
if append:
data.update(append)
data['_id'] = hashlib.sha256(
''.join([str(data[key]) for key in KEY_FIELDS]).encode()).hexdigest()
records += [data]
return records
def upload_memcheck_records(records, db_url, db_collection):
""" Upload records created by create_memcheck_records
"""
client = MongoClient(db_url)
collection = client[DATABASE][db_collection]
for record in records:
collection.replace_one({'_id': record['_id']}, record, upsert=True)
def _transpose_dicts(items, template=None):
""" Build dictionary of arrays from array of dictionaries
Example:
> in = [{'a':1, 'b':3}, {'a':2}]
> _transpose_dicts(in, template=in[0])
{'a':[1,2], 'b':[3, None]}
"""
result = {}
if not items:
return result
if not template:
template = items[0]
for key, template_val in template.items():
if isinstance(template_val, dict):
result[key] = _transpose_dicts(
[item[key] for item in items if key in item], template_val)
else:
result[key] = [item.get(key, None) for item in items]
return result
TIMELINE_SIMILARITY = ('test_name', 'model', 'device', 'target_branch')
def query_timeline(records, db_url, db_collection, max_items=20, similarity=TIMELINE_SIMILARITY):
""" Query database for similar memcheck items committed previously
"""
client = MongoClient(db_url)
collection = client[DATABASE][db_collection]
result = []
for record in records:
items = []
try:
query = dict((key, record[key]) for key in similarity)
query['commit_date'] = {'$lt': record['commit_date']}
pipeline = [
{'$match': query},
{'$addFields': {
'commit_date': {'$dateFromString': {'dateString': '$commit_date'}}}},
{'$sort': {'commit_date': -1}},
{'$limit': max_items},
{'$sort': {'commit_date': 1}},
]
items += list(collection.aggregate(pipeline))
except KeyError:
pass # keep only the record if timeline failed to generate
items += [record]
timeline = _transpose_dicts(items, template=record)
result += [timeline]
return result
def create_memcheck_report(records, db_url, db_collection, output_path):
""" Create memcheck timeline HTML report for records.
"""
records.sort(
key=lambda item: f"{item['status']}{item['device']}{item['model']}{item['test_name']}")
timelines = query_timeline(records, db_url, db_collection)
import jinja2 # pylint: disable=import-outside-toplevel
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(
searchpath=os.path.join(abs_path('.'), 'memcheck-template')),
autoescape=False)
template = env.get_template('timeline_report.html')
template.stream(records=records, timelines=timelines).dump(output_path)
def globber(paths):
"""Generator extending paths with wildcards"""
for path in paths:
if any(magic in path for magic in ['*', '?', '!', '[', ']']):
for resolved in glob(path, recursive=True):
yield resolved
else:
yield path
def main():
"""Main entry point.
"""
parser = argparse.ArgumentParser(
description='Upload metrics gathered by memcheck into Mongo DB',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--dryrun', action="store_true",
help='Parse logs, not modify database.')
is_dryrun = parser.parse_known_args()[0].dryrun
parser.add_argument('build_url', help='A place where memcheck execution logs can be found.')
parser.add_argument('log', nargs='+', help='Local path to log. Extended wildcards supported.')
parser.add_argument('--db_url', required=not is_dryrun,
help='MongoDB URL in a for "mongodb://server:port".')
parser.add_argument('--db_collection', required=not is_dryrun,
help=f'Collection name in {DATABASE} database to upload.',
choices=DB_COLLECTIONS)
parser.add_argument('--artifact_root', required=True,
help=f'A root directory to strip from log path before upload.')
parser.add_argument('--append', help='JSON to append to each item.')
args = parser.parse_args()
logging.basicConfig(format="{file}: [ %(levelname)s ] %(message)s".format(
file=os.path.basename(__file__)), level=logging.INFO, stream=sys.stdout)
if args.append:
with open(args.append, 'r') as append_file:
append = json.load(append_file)
else:
append = None
logs = list(globber(args.log))
records = create_memcheck_records(
logs, args.build_url, args.artifact_root, append=append)
logging.info('Prepared %d records', len(records))
if len(records) != len(logs):
logging.warning(
'Skipped %d logs of %d', len(logs) - len(records), len(logs))
if not args.dryrun:
upload_memcheck_records(records, args.db_url, args.db_collection)
logging.info('Uploaded to %s', args.db_url)
else:
print(json.dumps(records, sort_keys=True, indent=4))
if __name__ == "__main__":
main()