Source code

Revision control

Copy as Markdown

Other Tools

# ***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
# ***** END LICENSE BLOCK *****
import json
from collections import defaultdict, namedtuple
from mozsystemmonitor.resourcemonitor import SystemResourceMonitor
from mozharness.base import log
from mozharness.base.log import ERROR, INFO, WARNING, OutputParser
from mozharness.mozilla.automation import (
TBPL_FAILURE,
TBPL_RETRY,
TBPL_SUCCESS,
TBPL_WARNING,
TBPL_WORST_LEVEL_TUPLE,
)
from mozharness.mozilla.testing.errors import TinderBoxPrintRe
from mozharness.mozilla.testing.unittest import tbox_print_summary
class StructuredOutputParser(OutputParser):
# The script class using this must inherit the MozbaseMixin to ensure
# that mozlog is available.
def __init__(self, **kwargs):
"""Object that tracks the overall status of the test run"""
# The 'strict' argument dictates whether the presence of output
# from the harness process other than line-delimited json indicates
# failure. If it does not, the errors_list parameter may be used
# to detect additional failure output from the harness process.
if "strict" in kwargs:
self.strict = kwargs.pop("strict")
else:
self.strict = True
self.suite_category = kwargs.pop("suite_category", None)
tbpl_compact = kwargs.pop("log_compact", False)
super(StructuredOutputParser, self).__init__(**kwargs)
self.allow_crashes = kwargs.pop("allow_crashes", False)
mozlog = self._get_mozlog_module()
self.formatter = mozlog.formatters.TbplFormatter(compact=tbpl_compact)
self.handler = mozlog.handlers.StatusHandler()
self.log_actions = mozlog.structuredlog.log_actions()
self.worst_log_level = INFO
self.tbpl_status = TBPL_SUCCESS
self.harness_retry_re = TinderBoxPrintRe["harness_error"]["retry_regex"]
self.prev_was_unstructured = False
def _get_mozlog_module(self):
try:
import mozlog
except ImportError:
self.fatal(
"A script class using structured logging must inherit "
"from the MozbaseMixin to ensure that mozlog is available."
)
return mozlog
def _handle_unstructured_output(self, line, log_output=True):
self.log_output = log_output
return super(StructuredOutputParser, self).parse_single_line(line)
def parse_single_line(self, line):
"""Parses a line of log output from the child process and passes
it to mozlog to update the overall status of the run.
Re-emits the logged line in human-readable format.
"""
level = INFO
tbpl_level = TBPL_SUCCESS
data = None
try:
candidate_data = json.loads(line)
if (
isinstance(candidate_data, dict)
and "action" in candidate_data
and candidate_data["action"] in self.log_actions
):
data = candidate_data
except ValueError:
pass
if data is None:
if self.strict:
if not self.prev_was_unstructured:
self.info(
"Test harness output was not a valid structured log message"
)
self.info(line)
else:
self.info(line)
self.prev_was_unstructured = True
else:
self._handle_unstructured_output(line)
return
self.prev_was_unstructured = False
self.handler(data)
action = data["action"]
if action == "test_start":
SystemResourceMonitor.begin_marker("test", data["test"])
elif action == "test_end":
SystemResourceMonitor.end_marker("test", data["test"])
elif action == "suite_start":
SystemResourceMonitor.begin_marker("suite", data["source"])
elif action == "suite_end":
SystemResourceMonitor.end_marker("suite", data["source"])
elif action == "group_start":
SystemResourceMonitor.begin_marker("test", data["name"])
elif action == "group_end":
SystemResourceMonitor.end_marker("test", data["name"])
if line.startswith("TEST-UNEXPECTED-FAIL"):
SystemResourceMonitor.record_event(line)
if action in ("log", "process_output"):
if action == "log":
message = data["message"]
level = getattr(log, data["level"].upper())
else:
message = data["data"]
# Run log and process_output actions through the error lists, but make sure
# the super parser doesn't print them to stdout (they should go through the
# log formatter).
error_level = self._handle_unstructured_output(message, log_output=False)
if error_level is not None:
level = self.worst_level(error_level, level)
if self.harness_retry_re.search(message):
self.update_levels(TBPL_RETRY, log.CRITICAL)
tbpl_level = TBPL_RETRY
level = log.CRITICAL
log_data = self.formatter(data)
if log_data is not None:
self.log(log_data, level=level)
self.update_levels(tbpl_level, level)
def _subtract_tuples(self, old, new):
items = set(list(old.keys()) + list(new.keys()))
merged = defaultdict(int)
for item in items:
merged[item] = new.get(item, 0) - old.get(item, 0)
if merged[item] <= 0:
del merged[item]
return merged
def evaluate_parser(self, return_code, success_codes=None, previous_summary=None):
success_codes = success_codes or [0]
summary = self.handler.summarize()
"""
We can run evaluate_parser multiple times, it will duplicate failures
and status which can mean that future tests will fail if a previous test fails.
When we have a previous summary, we want to do 2 things:
1) Remove previous data from the new summary to only look at new data
2) Build a joined summary to include the previous + new data
"""
RunSummary = namedtuple(
"RunSummary",
(
"unexpected_statuses",
"expected_statuses",
"known_intermittent_statuses",
"log_level_counts",
"action_counts",
),
)
if previous_summary == {}:
previous_summary = RunSummary(
defaultdict(int),
defaultdict(int),
defaultdict(int),
defaultdict(int),
defaultdict(int),
)
if previous_summary:
# Always preserve retry status: if any failure triggers retry, the script
# must exit with TBPL_RETRY to trigger task retry.
if self.tbpl_status != TBPL_RETRY:
self.tbpl_status = TBPL_SUCCESS
joined_summary = summary
# Remove previously known status messages
if "ERROR" in summary.log_level_counts:
summary.log_level_counts["ERROR"] -= self.handler.no_tests_run_count
summary = RunSummary(
self._subtract_tuples(
previous_summary.unexpected_statuses, summary.unexpected_statuses
),
self._subtract_tuples(
previous_summary.expected_statuses, summary.expected_statuses
),
self._subtract_tuples(
previous_summary.known_intermittent_statuses,
summary.known_intermittent_statuses,
),
self._subtract_tuples(
previous_summary.log_level_counts, summary.log_level_counts
),
summary.action_counts,
)
# If we have previous data to ignore,
# cache it so we don't parse the log multiple times
self.summary = summary
else:
joined_summary = summary
fail_pair = TBPL_WARNING, WARNING
error_pair = TBPL_FAILURE, ERROR
# These are warning/orange statuses.
failure_conditions = [
(sum(summary.unexpected_statuses.values()), 0, "statuses", False),
(
summary.action_counts.get("crash", 0),
summary.expected_statuses.get("CRASH", 0),
"crashes",
self.allow_crashes,
),
(
summary.action_counts.get("valgrind_error", 0),
0,
"valgrind errors",
False,
),
]
for value, limit, type_name, allow in failure_conditions:
if value > limit:
msg = "%d unexpected %s" % (value, type_name)
if limit != 0:
msg += " expected at most %d" % (limit)
if not allow:
self.update_levels(*fail_pair)
msg = "Got " + msg
# Force level to be WARNING as message is not necessary in Treeherder
self.warning(msg)
else:
msg = "Ignored " + msg
self.warning(msg)
# These are error/red statuses. A message is output here every time something
# wouldn't otherwise be highlighted in the UI.
required_actions = {
"suite_end": "No suite end message was emitted by this harness.",
"test_end": "No checks run.",
}
for action, diagnostic_message in required_actions.items():
if action not in summary.action_counts:
self.log(diagnostic_message, ERROR)
self.update_levels(*error_pair)
failure_log_levels = ["ERROR", "CRITICAL"]
for level in failure_log_levels:
if level in summary.log_level_counts:
self.update_levels(*error_pair)
# If a superclass was used to detect errors with a regex based output parser,
# this will be reflected in the status here.
if self.num_errors:
self.update_levels(*error_pair)
# Harnesses typically return non-zero on test failure, so don't promote
# to error if we already have a failing status.
if return_code not in success_codes and self.tbpl_status == TBPL_SUCCESS:
self.update_levels(*error_pair)
return self.tbpl_status, self.worst_log_level, joined_summary
def update_levels(self, tbpl_level, log_level):
self.worst_log_level = self.worst_level(log_level, self.worst_log_level)
self.tbpl_status = self.worst_level(
tbpl_level, self.tbpl_status, levels=TBPL_WORST_LEVEL_TUPLE
)
def print_summary(self, suite_name):
# Summary text provided for compatibility. Counts are currently
# in the format <pass count>/<fail count>/<todo count>,
# <expected count>/<unexpected count>/<expected fail count> will yield the
# expected info from a structured log (fail count from the prior implementation
# includes unexpected passes from "todo" assertions).
try:
summary = self.summary
except AttributeError:
summary = self.handler.summarize()
unexpected_count = sum(summary.unexpected_statuses.values())
expected_count = sum(summary.expected_statuses.values())
expected_failures = summary.expected_statuses.get("FAIL", 0)
if unexpected_count:
fail_text = '<em class="testfail">%s</em>' % unexpected_count
else:
fail_text = "0"
text_summary = "%s/%s/%s" % (expected_count, fail_text, expected_failures)
self.info("TinderboxPrint: %s<br/>%s\n" % (suite_name, text_summary))
def append_tinderboxprint_line(self, suite_name):
try:
summary = self.summary
except AttributeError:
summary = self.handler.summarize()
unexpected_count = sum(summary.unexpected_statuses.values())
expected_count = sum(summary.expected_statuses.values())
expected_failures = summary.expected_statuses.get("FAIL", 0)
crashed = 0
if "crash" in summary.action_counts:
crashed = summary.action_counts["crash"]
text_summary = tbox_print_summary(
expected_count, unexpected_count, expected_failures, crashed > 0, False
)
self.info("TinderboxPrint: %s<br/>%s\n" % (suite_name, text_summary))