Skip to content

Daisyden/artifacts #1630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 229 additions & 78 deletions .github/scripts/check-ut.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,46 @@
import argparse
import sys
import os
import re
from junitparser import JUnitXml, Error, Failure, Skipped

parser = argparse.ArgumentParser()
parser.add_argument('junitxml', nargs='+')
parser = argparse.ArgumentParser(description='Test results analyzer')
parser.add_argument('input_files', nargs='+', help='JUnit XML files or log files')
args = parser.parse_args()

failures = []
suites = []
summaries = []

error_types = [
"RuntimeError",
"ValueError",
"TypeError",
"AttributeError",
"KeyError",
"IndexError",
"ImportError",
"AssertionError",
"OSError",
"Failed",
"TimeoutError",
"asyncio.TimeoutError",
"FileNotFoundError",
"PermissionError",
"NotImplementedError",
]

def get_classname(case):
return ' '.join(case.classname.split())
return ' '.join(case.classname.split()) if hasattr(case, 'classname') else case.get('classname', '')

def get_name(case):
if isinstance(case, dict):
return case.get('name', '')
return ' '.join(case.name.split())

def get_result(case):
if isinstance(case, dict):
return case.get('status', 'failed')

result = "passed"
if case.result:
if isinstance(case.result[0], Error):
Expand All @@ -28,88 +52,215 @@ def get_result(case):
return result

def get_message(case):
if isinstance(case, dict):
return case.get('error', '')

if not case.result:
return ""
return f"{case.result[0].message.splitlines()[0]}"
full_text = case.result[0].text if hasattr(case.result[0], 'text') else case.result[0].message
if not full_text:
return ""

error_messages = []
capture_next_lines = False
indent_level = 0

def print_md_row(row, print_header):
for line in full_text.splitlines():
stripped_line = line.strip()
if not stripped_line:
continue

for error_type in error_types:
if stripped_line.startswith(error_type + ": "):
error_msg = stripped_line[len(error_type)+2:]
error_messages.append(f"{error_type}: {error_msg}")
capture_next_lines = True
indent_level = 0
break
elif f"{error_type}:" in stripped_line and "Traceback" not in stripped_line:
error_msg = stripped_line.split(f'{error_type}:')[-1].strip()
error_messages.append(f"{error_type}: {error_msg}")
capture_next_lines = True
indent_level = 0
break

return " ; ".join(error_messages) if error_messages else f"{case.result[0].message.splitlines()[0]}"

def print_md_row(row, print_header=False, fail_list=None):
if print_header:
header = " | ".join([f"{key}" for key, _ in row.items()])
header = " | ".join([f"{key}" for key in row.keys()])
print(f"| {header} |")
header = " | ".join(["-"*len(key) for key, _ in row.items()])
header = " | ".join(["---"] * len(row))
print(f"| {header} |")
row = " | ".join([f"{value}" for _, value in row.items()])
print(f"| {row} |")
row_values = " | ".join([f"{value}" for value in row.values()])
print(f"| {row_values} |")

def print_cases(cases):
print_header = True
for case in cases:
classname = get_classname(case)
name = get_name(case)
result = get_result(case)
message = get_message(case)
row = {
'Class name': classname,
'Test name': name,
'Status': result,
'Message': message,
}
print_md_row(row, print_header)
print_header = False
if fail_list is not None:
fail_list.write(f"| {row_values} |\n")



def print_failures():
if not failures:
return

with open("ut_failure_list.csv", "w") as fail_list:
fail_list.write("sep=\'|\''.\n")

print("### Test Failures")
print_header = True
for case in failures:
print_md_row({
'Class name': get_classname(case),
'Test name': get_name(case),
'Status': get_result(case),
'Message': get_message(case),
'Source': case['source'] if isinstance(case, dict) else 'XML'
}, print_header, fail_list)
print_header = False

def parse_log_file(log_file):
with open(log_file, encoding='utf-8') as f:
content = f.read()

ut_name = os.path.splitext(os.path.basename(log_file))[0]
summary = {
'Category': determine_category(ut_name),
'UT': ut_name,
'Test cases': 0,
'Passed': 0,
'Skipped': 0,
'Failures': 0,
'Errors': 0,
'Source': 'Log'
}

# Extract test counts
test_run_match = re.search(r"Ran (\d+) tests in [\d.]+s", content)
if test_run_match:
summary['Test cases'] = int(test_run_match.group(1))

def print_suite(suite):
# Extract skipped case number
skipped_match = re.search(r"skipped[ =](\d+)", content, re.IGNORECASE)
if skipped_match:
summary['Skipped'] = int(skipped_match.group(1))
else:
skipped_match = re.search(r"skipped (\d+) cases?", content, re.IGNORECASE)
if skipped_match:
summary['Skipped'] = int(skipped_match.group(1))

# Extract failures
failure_blocks = re.findall(r"(FAIL:.*?)(?:\n\n|\n=+\n|\Z)", content, re.DOTALL)
exist_test_names = set()
failures_number = 0

for block in failure_blocks:
case_match = re.match(r"FAIL: (\w+) \(__mp_main__\.(\w+)\)", block)
if not case_match:
continue

test_name = case_match.group(1)
if test_name in exist_test_names:
continue
exist_test_names.add(test_name)

error_msg = []
error_pattern = r"(" + "|".join(error_types) + r"):.*?(?=\n\S|\n\n|\n=+\n|\Z)"
error_matches = re.finditer(error_pattern, block, re.DOTALL)
if not error_matches and "Traceback" in block:
error_msg.append("Unknown error (see traceback)")
else:
for match in error_matches:
error_msg.append(match.group(0).strip())

failures.append({
'classname': ut_name,
'name': f"{case_match.group(2)}:{test_name}",
'error': " ".join(error_msg),
'status': 'failed',
'source': 'Log'
})
failures_number += 1

if failures_number > summary['Failures']:
summary['Failures'] = failures_number
summary['Passed'] = summary['Test cases'] - summary['Failures'] - summary['Skipped']

return summary

def determine_category(ut):
if ut == 'op_regression':
return 'op_regression'
elif ut == 'op_regression_dev1':
return 'op_regression_dev1'
elif ut == 'op_extended':
return 'op_extended'
elif 'op_ut' in ut:
return 'op_ut'
else:
return 'unknown'

def process_log_file(log_file):
try:
summary = parse_log_file(log_file)
summaries.append(summary)
except Exception as e:
print(f"Error processing {log_file}: {e}", file=sys.stderr)

def process_xml_file(xml_file):
try:
xml = JUnitXml.fromfile(xml_file)
ut = os.path.basename(xml_file).split('.')[0]
category = determine_category(ut)

for suite in xml:
suite_summary = {
'Category': category,
'UT': ut,
'Test cases': suite.tests,
'Passed': suite.tests - suite.skipped - suite.failures - suite.errors,
'Skipped': suite.skipped,
'Failures': suite.failures,
'Errors': suite.errors,
'Source': 'XML'
}
summaries.append(suite_summary)

for case in suite:
if get_result(case) not in ["passed", "skipped"]:
failures.append(case)
except Exception as e:
print(f"Error processing {xml_file}: {e}", file=sys.stderr)

def print_summary():
print("### Results Summary")
print_header = True
for suite in suites:
ut = args.junitxml[0]
del(args.junitxml[0])
ut = os.path.basename(ut).split('.')[0]
tests = suite.tests
skipped = suite.skipped
failures = suite.failures
errors = suite.errors
if ut == 'op_regression':
category = 'op_regression'
elif ut == 'op_regression_dev1':
category = 'op_regression_dev1'
elif ut == 'op_extended':
category = 'op_extended'
elif 'op_ut' in ut:
category = 'op_ut'
row = {
'Category': category,
'UT': ut,
'Test cases': tests,
'Passed': tests-skipped-failures-errors,
'Skipped': skipped,
'Failures': failures,
'Errors': errors,
}
print_md_row(row, print_header)

for summary in summaries:
print_md_row({
'Category': summary['Category'],
'UT': summary['UT'],
'Test cases': summary['Test cases'],
'Passed': summary['Passed'],
'Skipped': summary['Skipped'],
'Failures': summary['Failures'],
'Errors': summary['Errors'],
'Source': summary['Source']
}, print_header)
print_header = False

xmls = [ JUnitXml.fromfile(f) for f in args.junitxml ]
for idx, xml in enumerate(xmls):
for suite in xml:
suites.append(suite)
for case in suite:
classname = get_classname(case)
name = get_name(case)
result = get_result(case)
if result not in ["passed", "skipped"]:
failures.append(case)

printed = False
def print_break(needed):
if needed:
print("")

if failures:
print_break(printed)
print("### Failures")
print_cases(failures)
printed = True

print("### Results Summary")
print_suite(suites)

sys.exit(0)
def main():
for input_file in args.input_files:
if input_file.endswith('.log'):
process_log_file(input_file)
elif input_file.endswith('.xml'):
process_xml_file(input_file)
else:
print(f"Skipping unknown file type: {input_file}", file=sys.stderr)

print_failures()
print_summary()


if __name__ == "__main__":
main()
Loading
Loading