Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import enum
from pathlib import Path
import re
import sys
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Iterable,
Optional,
Tuple,
Union,
) # noqa
from . import metrics
from . import parser
from . import pings
from . import tags
from . import util
# Yield only an error message
LintGenerator = Generator[str, None, None]
# Yield fully constructed GlinterNits
NitGenerator = Generator["GlinterNit", None, None]
class CheckType(enum.Enum):
warning = 0
error = 1
def _split_words(name: str) -> List[str]:
"""
Helper function to split words on either `.` or `_`.
"""
return re.split("[._-]", name)
def _english_list(items: List[str]) -> str:
"""
Helper function to format a list [A, B, C] as "'A', 'B', or 'C'".
"""
if len(items) == 0:
return ""
elif len(items) == 1:
return f"'{items[0]}'"
else:
return "{}, or '{}'".format(
", ".join([f"'{x}'" for x in items[:-1]]), items[-1]
)
def _hamming_distance(str1: str, str2: str) -> int:
"""
Count the # of differences between strings str1 and str2,
padding the shorter one with whitespace
"""
diffs = 0
if len(str1) < len(str2):
str1, str2 = str2, str1
len_dist = len(str1) - len(str2)
str2 += " " * len_dist
for ch1, ch2 in zip(str1, str2):
if ch1 != ch2:
diffs += 1
return diffs
def check_common_prefix(
category_name: str, metrics: Iterable[metrics.Metric]
) -> LintGenerator:
"""
Check if all metrics begin with a common prefix.
"""
metric_words = sorted([_split_words(metric.name) for metric in metrics])
if len(metric_words) < 2:
return
first = metric_words[0]
last = metric_words[-1]
for i in range(min(len(first), len(last))):
if first[i] != last[i]:
break
if i > 0:
common_prefix = "_".join(first[:i])
yield (
f"Within category '{category_name}', all metrics begin with "
f"prefix '{common_prefix}'."
"Remove the prefixes on the metric names and (possibly) "
"rename the category."
)
def check_unit_in_name(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
"""
The metric name ends in a unit.
"""
TIME_UNIT_ABBREV = {
"nanosecond": "ns",
"microsecond": "us",
"millisecond": "ms",
"second": "s",
"minute": "m",
"hour": "h",
"day": "d",
}
MEMORY_UNIT_ABBREV = {
"byte": "b",
"kilobyte": "kb",
"megabyte": "mb",
"gigabyte": "gb",
}
name_words = _split_words(metric.name)
unit_in_name = name_words[-1]
time_unit = getattr(metric, "time_unit", None)
memory_unit = getattr(metric, "memory_unit", None)
unit = getattr(metric, "unit", None)
if time_unit is not None:
if (
unit_in_name == TIME_UNIT_ABBREV.get(time_unit.name)
or unit_in_name == time_unit.name
):
yield (
f"Suffix '{unit_in_name}' is redundant with time_unit "
f"'{time_unit.name}'. Only include time_unit."
)
elif (
unit_in_name in TIME_UNIT_ABBREV.keys()
or unit_in_name in TIME_UNIT_ABBREV.values()
):
yield (
f"Suffix '{unit_in_name}' doesn't match time_unit "
f"'{time_unit.name}'. "
"Confirm the unit is correct and only include time_unit."
)
elif memory_unit is not None:
if (
unit_in_name == MEMORY_UNIT_ABBREV.get(memory_unit.name)
or unit_in_name == memory_unit.name
):
yield (
f"Suffix '{unit_in_name}' is redundant with memory_unit "
f"'{memory_unit.name}'. "
"Only include memory_unit."
)
elif (
unit_in_name in MEMORY_UNIT_ABBREV.keys()
or unit_in_name in MEMORY_UNIT_ABBREV.values()
):
yield (
f"Suffix '{unit_in_name}' doesn't match memory_unit "
f"{memory_unit.name}'. "
"Confirm the unit is correct and only include memory_unit."
)
elif unit is not None:
if unit_in_name == unit:
yield (
f"Suffix '{unit_in_name}' is redundant with unit param "
f"'{unit}'. "
"Only include unit."
)
def check_category_generic(
category_name: str, metrics: Iterable[metrics.Metric]
) -> LintGenerator:
"""
The category name is too generic.
"""
GENERIC_CATEGORIES = ["metrics", "events"]
if category_name in GENERIC_CATEGORIES:
yield (
f"Category '{category_name}' is too generic. "
f"Don't use {_english_list(GENERIC_CATEGORIES)} for category names"
)
def check_bug_number(
metric: Union[metrics.Metric, pings.Ping], parser_config: Dict[str, Any]
) -> LintGenerator:
number_bugs = [str(bug) for bug in metric.bugs if isinstance(bug, int)]
if len(number_bugs):
yield (
f"For bugs {', '.join(number_bugs)}: "
"Bug numbers are deprecated and should be changed to full URLs. "
f"instead of '{number_bugs[0]}'."
)
def check_valid_in_baseline(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
allow_reserved = parser_config.get("allow_reserved", False)
if not allow_reserved and "baseline" in metric.send_in_pings:
yield (
"The baseline ping is Glean-internal. "
"Remove 'baseline' from the send_in_pings array."
)
def check_misspelled_pings(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
for ping in metric.send_in_pings:
for builtin in pings.RESERVED_PING_NAMES:
distance = _hamming_distance(ping, builtin)
if distance == 1:
yield f"Ping '{ping}' seems misspelled. Did you mean '{builtin}'?"
def check_tags_required(
metric_or_ping: Union[metrics.Metric, pings.Ping], parser_config: Dict[str, Any]
) -> LintGenerator:
if parser_config.get("require_tags", False) and not len(
metric_or_ping.metadata.get("tags", [])
):
yield "Tags are required but no tags specified"
def check_user_lifetime_expiration(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
if metric.lifetime == metrics.Lifetime.user and metric.expires != "never":
yield (
"Metrics with 'user' lifetime cannot have an expiration date. "
"They live as long as the user profile does. "
"Set expires to 'never'."
)
def check_expired_date(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
try:
metric.validate_expires()
except ValueError as e:
yield (str(e))
def check_expired_metric(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
if metric.is_expired():
yield ("Metric has expired. Please consider removing it.")
def check_old_event_api(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
# Glean v52.0.0 removed the old events API.
# The metrics-2-0-0 schema still supports it.
# We want to warn about it.
# This can go when we introduce 3-0-0
if not isinstance(metric, metrics.Event):
return
if not all("type" in x for x in metric.extra_keys.values()):
yield ("The old event API is gone. Extra keys require a type.")
def check_metric_on_events_lifetime(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
"""A non-event metric on the Events ping only makes sense if its value
is immutable over the life of the ping."""
if (
"events" in metric.send_in_pings
and "all_pings" not in metric.send_in_pings
and metric.type != "event"
and metric.lifetime == metrics.Lifetime.ping
):
yield (
"Non-event metrics sent on the Events ping should not have the ping"
" lifetime."
)
def check_unexpected_unit(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
"""
`unit` was allowed on all metrics and recently disallowed.
We now warn about its use on all but quantity and custom distribution
metrics.
"""
allowed_types = [metrics.Quantity, metrics.CustomDistribution]
if not any([isinstance(metric, ty) for ty in allowed_types]) and metric.unit:
yield (
"The `unit` property is only allowed for quantity "
+ "and custom distribution metrics."
)
def check_empty_datareview(
metric: metrics.Metric, parser_config: Dict[str, Any]
) -> LintGenerator:
disallowed_datareview = ["", "todo"]
data_reviews = [dr.lower() in disallowed_datareview for dr in metric.data_reviews]
if any(data_reviews):
yield "List of data reviews should not contain empty strings or TODO markers."
def check_redundant_ping(
pings: pings.Ping, parser_config: Dict[str, Any]
) -> LintGenerator:
"""
Check if the pings contains 'ping' as the prefix or suffix, or 'ping' or 'custom'
"""
ping_words = _split_words(pings.name)
if len(ping_words) != 0:
ping_first_word = ping_words[0]
ping_last_word = ping_words[-1]
if ping_first_word == "ping":
yield ("The prefix 'ping' is redundant.")
elif ping_last_word == "ping":
yield ("The suffix 'ping' is redundant.")
elif "ping" in ping_words:
yield ("The word 'ping' is redundant.")
elif "custom" in ping_words:
yield ("The word 'custom' is redundant.")
def check_unknown_ping(
check_name: str,
check_type: CheckType,
all_pings: Dict[str, pings.Ping],
metrics: Dict[str, metrics.Metric],
parser_config: Dict[str, Any],
) -> NitGenerator:
"""
Check that all pings in `send_in_pings` for all metrics are either a builtin ping
or in the list of defined custom pings.
"""
available_pings = [p for p in all_pings]
for _, metric in metrics.items():
if check_name in metric.no_lint:
continue
send_in_pings = metric.send_in_pings
for target_ping in send_in_pings:
if target_ping in pings.RESERVED_PING_NAMES:
continue
if target_ping not in available_pings:
msg = f"Ping `{target_ping} `in `send_in_pings` is unknown."
name = ".".join([metric.category, metric.name])
nit = GlinterNit(
check_name,
name,
msg,
check_type,
)
yield nit
# The checks that operate on an entire category of metrics:
# {NAME: (function, is_error)}
CATEGORY_CHECKS: Dict[
str, Tuple[Callable[[str, Iterable[metrics.Metric]], LintGenerator], CheckType]
] = {
"COMMON_PREFIX": (check_common_prefix, CheckType.error),
"CATEGORY_GENERIC": (check_category_generic, CheckType.error),
}
# The checks that operate on individual metrics:
# {NAME: (function, is_error)}
METRIC_CHECKS: Dict[
str, Tuple[Callable[[metrics.Metric, dict], LintGenerator], CheckType]
] = {
"UNIT_IN_NAME": (check_unit_in_name, CheckType.error),
"BUG_NUMBER": (check_bug_number, CheckType.error),
"BASELINE_PING": (check_valid_in_baseline, CheckType.error),
"MISSPELLED_PING": (check_misspelled_pings, CheckType.error),
"TAGS_REQUIRED": (check_tags_required, CheckType.error),
"EXPIRATION_DATE_TOO_FAR": (check_expired_date, CheckType.warning),
"USER_LIFETIME_EXPIRATION": (check_user_lifetime_expiration, CheckType.warning),
"EXPIRED": (check_expired_metric, CheckType.warning),
"OLD_EVENT_API": (check_old_event_api, CheckType.warning),
"METRIC_ON_EVENTS_LIFETIME": (check_metric_on_events_lifetime, CheckType.error),
"UNEXPECTED_UNIT": (check_unexpected_unit, CheckType.warning),
"EMPTY_DATAREVIEW": (check_empty_datareview, CheckType.warning),
}
# The checks that operate on individual pings:
# {NAME: (function, is_error)}
PING_CHECKS: Dict[
str, Tuple[Callable[[pings.Ping, dict], LintGenerator], CheckType]
] = {
"BUG_NUMBER": (check_bug_number, CheckType.error),
"TAGS_REQUIRED": (check_tags_required, CheckType.error),
"REDUNDANT_PING": (check_redundant_ping, CheckType.error),
}
ALL_OBJECT_CHECKS: Dict[
str,
Tuple[
Callable[
# check name, check type, pings, metrics, config
[str, CheckType, dict, dict, dict],
NitGenerator,
],
CheckType,
],
] = {
"UNKNOWN_PING_REFERENCED": (check_unknown_ping, CheckType.error),
}
class GlinterNit:
def __init__(self, check_name: str, name: str, msg: str, check_type: CheckType):
self.check_name = check_name
self.name = name
self.msg = msg
self.check_type = check_type
def format(self):
return (
f"{self.check_type.name.upper()}: {self.check_name}: "
f"{self.name}: {self.msg}"
)
def _lint_item_tags(
item_name: str,
item_type: str,
item_tag_names: List[str],
valid_tag_names: List[str],
) -> List[GlinterNit]:
invalid_tags = [tag for tag in item_tag_names if tag not in valid_tag_names]
return (
[
GlinterNit(
"INVALID_TAGS",
item_name,
f"Invalid tags specified in {item_type}: {', '.join(invalid_tags)}",
CheckType.error,
)
]
if len(invalid_tags)
else []
)
def _lint_pings(
category: Dict[str, Union[metrics.Metric, pings.Ping, tags.Tag]],
parser_config: Dict[str, Any],
valid_tag_names: List[str],
) -> List[GlinterNit]:
nits: List[GlinterNit] = []
for ping_name, ping in sorted(list(category.items())):
assert isinstance(ping, pings.Ping)
for check_name, (check_func, check_type) in PING_CHECKS.items():
new_nits = list(check_func(ping, parser_config))
if len(new_nits):
if check_name not in ping.no_lint:
nits.extend(
GlinterNit(
check_name,
ping_name,
msg,
check_type,
)
for msg in new_nits
)
nits.extend(
_lint_item_tags(
ping_name,
"ping",
ping.metadata.get("tags", []),
valid_tag_names,
)
)
return nits
def _lint_all_objects(
objects: Dict[str, Dict[str, Union[metrics.Metric, pings.Ping, tags.Tag]]],
parser_config: Dict[str, Any],
) -> List[GlinterNit]:
nits: List[GlinterNit] = []
pings = objects.get("pings")
if not pings:
return []
metrics = objects.get("all_metrics")
if not metrics:
return []
for check_name, (check_func, check_type) in ALL_OBJECT_CHECKS.items():
new_nits = list(
check_func(check_name, check_type, pings, metrics, parser_config)
)
nits.extend(new_nits)
return nits
def lint_metrics(
objs: metrics.ObjectTree,
parser_config: Optional[Dict[str, Any]] = None,
file=sys.stderr,
) -> List[GlinterNit]:
"""
Performs glinter checks on a set of metrics objects.
:param objs: Tree of metric objects, as returns by `parser.parse_objects`.
:param file: The stream to write errors to.
:returns: List of nits.
"""
if parser_config is None:
parser_config = {}
nits: List[GlinterNit] = []
valid_tag_names = [tag for tag in objs.get("tags", [])]
nits.extend(_lint_all_objects(objs, parser_config))
for category_name, category in sorted(list(objs.items())):
if category_name == "pings":
nits.extend(_lint_pings(category, parser_config, valid_tag_names))
continue
if category_name == "tags":
# currently we have no linting for tags
continue
# Make sure the category has only Metrics, not Pings or Tags
category_metrics = dict(
(name, metric)
for (name, metric) in category.items()
if isinstance(metric, metrics.Metric)
)
for cat_check_name, (cat_check_func, check_type) in CATEGORY_CHECKS.items():
if any(
cat_check_name in metric.no_lint for metric in category_metrics.values()
):
continue
nits.extend(
GlinterNit(cat_check_name, category_name, msg, check_type)
for msg in cat_check_func(category_name, category_metrics.values())
)
for _metric_name, metric in sorted(list(category_metrics.items())):
for check_name, (check_func, check_type) in METRIC_CHECKS.items():
new_nits = list(check_func(metric, parser_config))
if len(new_nits):
if check_name not in metric.no_lint:
nits.extend(
GlinterNit(
check_name,
".".join([metric.category, metric.name]),
msg,
check_type,
)
for msg in new_nits
)
# also check that tags for metric are valid
nits.extend(
_lint_item_tags(
".".join([metric.category, metric.name]),
"metric",
metric.metadata.get("tags", []),
valid_tag_names,
)
)
if len(nits):
print("Sorry, Glean found some glinter nits:", file=file)
for nit in nits:
print(nit.format(), file=file)
print("", file=file)
print("Please fix the above nits to continue.", file=file)
print(
"To disable a check, add a `no_lint` parameter "
"with a list of check names to disable.\n"
"This parameter can appear with each individual metric, or at the "
"top-level to affect the entire file.",
file=file,
)
return nits
def glinter(
input_filepaths: Iterable[Path],
parser_config: Optional[Dict[str, Any]] = None,
file=sys.stderr,
) -> int:
"""
Commandline helper for glinter.
:param input_filepaths: List of Path objects to load metrics from.
:param parser_config: Parser configuration object, passed to
`parser.parse_objects`.
:param file: The stream to write the errors to.
:return: Non-zero if there were any glinter errors.
"""
if parser_config is None:
parser_config = {}
errors = 0
objs = parser.parse_objects(input_filepaths, parser_config)
errors += util.report_validation_errors(objs)
nits = lint_metrics(objs.value, parser_config=parser_config, file=file)
errors += len([nit for nit in nits if nit.check_type == CheckType.error])
if errors == 0:
print("✨ Your metrics are Glean! ✨", file=file)
return 0
print(f"❌ Found {errors} errors.")
return 1