Source code

Revision control

Copy as Markdown

Other Tools

# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at
Code for parsing metrics.yaml files.
import functools
from pathlib import Path
import textwrap
from typing import Any, cast, Dict, Generator, Iterable, Optional, Set, Tuple, Union
import jsonschema # type: ignore
from jsonschema.exceptions import ValidationError # type: ignore
from .metrics import Metric, ObjectTree
from .pings import Ping, RESERVED_PING_NAMES
from .tags import Tag
from . import util
from .util import DictWrapper
ROOT_DIR = Path(__file__).parent
SCHEMAS_DIR = ROOT_DIR / "schemas"
def _update_validator(validator):
Adds some custom validators to the jsonschema validator that produce
nicer error messages.
def required(validator, required, instance, schema):
if not validator.is_type(instance, "object"):
missing_properties = set(
property for property in required if property not in instance
if len(missing_properties):
missing_properties = sorted(list(missing_properties))
yield ValidationError(
f"Missing required properties: {', '.join(missing_properties)}"
validator.VALIDATORS["required"] = required
def _load_file(
filepath: Path, parser_config: Dict[str, Any]
) -> Generator[str, None, Tuple[Dict[str, util.JSONType], Optional[str]]]:
Load a metrics.yaml or pings.yaml format file.
If the `filepath` does not exist, raises `FileNotFoundError`, unless
`parser_config["allow_missing_files"]` is `True`.
content = util.load_yaml_or_json(filepath)
except FileNotFoundError:
if not parser_config.get("allow_missing_files", False):
return {}, None
except Exception as e:
yield util.format_error(filepath, "", textwrap.fill(str(e)))
return {}, None
if content is None:
yield util.format_error(filepath, "", f"'{filepath}' file can not be empty.")
return {}, None
if not isinstance(content, dict):
return {}, None
if content == {}:
return {}, None
schema_key = content.get("$schema")
if not isinstance(schema_key, str):
raise TypeError(f"Invalid schema key {schema_key}")
filetype: Optional[str] = None
filetype = schema_key.split("/")[-2]
except IndexError:
filetype = None
if filetype not in ("metrics", "pings", "tags"):
filetype = None
for error in validate(content, filepath):
content = {}
yield error
return content, filetype
def _load_schemas() -> Dict[str, Tuple[Any, Any]]:
Load all of the known schemas from disk, and put them in a map based on the
schema's $id.
schemas = {}
for schema_path in SCHEMAS_DIR.glob("*.yaml"):
schema = util.load_yaml_or_json(schema_path)
resolver = util.get_null_resolver(schema)
validator_class = jsonschema.validators.validator_for(schema)
validator = validator_class(schema, resolver=resolver)
schemas[schema["$id"]] = (schema, validator)
return schemas
def _get_schema(
schema_id: str, filepath: Union[str, Path] = "<input>"
) -> Tuple[Any, Any]:
Get the schema for the given schema $id.
schemas = _load_schemas()
if schema_id not in schemas:
raise ValueError(
f"$schema key must be one of {', '.join(schemas.keys())}",
return schemas[schema_id]
def _get_schema_for_content(
content: Dict[str, util.JSONType], filepath: Union[str, Path]
) -> Tuple[Any, Any]:
Get the appropriate schema for the given JSON content.
schema_url = content.get("$schema")
if not isinstance(schema_url, str):
raise TypeError("Invalid $schema type {schema_url}")
return _get_schema(schema_url, filepath)
def validate(
content: Dict[str, util.JSONType], filepath: Union[str, Path] = "<input>"
) -> Generator[str, None, None]:
Validate the given content against the appropriate schema.
schema, validator = _get_schema_for_content(content, filepath)
except ValueError as e:
yield str(e)
yield from (
util.format_error(filepath, "", util.pprint_validation_error(e))
for e in validator.iter_errors(content)
def _instantiate_metrics(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
Load a list of metrics.yaml files, convert the JSON information into Metric
objects, and merge them into a single tree.
global_no_lint = content.get("no_lint", [])
global_tags = content.get("$tags", [])
assert isinstance(global_tags, list)
for category_key, category_val in sorted(content.items()):
if category_key.startswith("$"):
if category_key == "no_lint":
if not config.get("allow_reserved") and category_key.split(".")[0] == "glean":
yield util.format_error(
f"For category '{category_key}'",
"Categories beginning with 'glean' are reserved for "
"Glean internal use.",
all_objects.setdefault(category_key, DictWrapper())
if not isinstance(category_val, dict):
raise TypeError(f"Invalid content for {category_key}")
for metric_key, metric_val in sorted(category_val.items()):
metric_obj = Metric.make_metric(
category_key, metric_key, metric_val, validated=True, config=config
except Exception as e:
yield util.format_error(
f"On instance {category_key}.{metric_key}",
metric_obj = None
if (
not config.get("allow_reserved")
and "all-pings" in metric_obj.send_in_pings
yield util.format_error(
f"On instance {category_key}.{metric_key}",
'Only internal metrics may specify "all-pings" '
'in "send_in_pings"',
metric_obj = None
if metric_obj is not None:
metric_obj.no_lint = sorted(set(metric_obj.no_lint + global_no_lint))
if len(global_tags):
metric_obj.metadata["tags"] = sorted(
set(metric_obj.metadata.get("tags", []) + global_tags)
if isinstance(filepath, Path):
metric_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get((category_key, metric_key))
if already_seen is not None:
# We've seen this metric name already
yield util.format_error(
f"Duplicate metric name '{category_key}.{metric_key}' "
f"already defined in '{already_seen}'"
all_objects[category_key][metric_key] = metric_obj
sources[(category_key, metric_key)] = filepath
def _instantiate_pings(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
Load a list of pings.yaml files, convert the JSON information into Ping
global_no_lint = content.get("no_lint", [])
assert isinstance(global_no_lint, list)
ping_schedule_reverse_map: Dict[str, Set[str]] = dict()
for ping_key, ping_val in sorted(content.items()):
if ping_key.startswith("$"):
if ping_key == "no_lint":
if not config.get("allow_reserved"):
if ping_key in RESERVED_PING_NAMES:
yield util.format_error(
f"For ping '{ping_key}'",
f"Ping uses a reserved name ({RESERVED_PING_NAMES})",
if not isinstance(ping_val, dict):
raise TypeError(f"Invalid content for ping {ping_key}")
ping_val["name"] = ping_key
if "metadata" in ping_val and "ping_schedule" in ping_val["metadata"]:
if ping_key in ping_val["metadata"]["ping_schedule"]:
yield util.format_error(
f"For ping '{ping_key}'",
"ping_schedule contains its own ping name",
for ping_schedule in ping_val["metadata"]["ping_schedule"]:
if ping_schedule not in ping_schedule_reverse_map:
ping_schedule_reverse_map[ping_schedule] = set()
ping_obj = Ping(
defined_in=getattr(ping_val, "defined_in", None),
except Exception as e:
yield util.format_error(filepath, f"On instance '{ping_key}'", str(e))
if ping_obj is not None:
ping_obj.no_lint = sorted(set(ping_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and ping_obj.defined_in is not None:
ping_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(ping_key)
if already_seen is not None:
# We've seen this ping name already
yield util.format_error(
f"Duplicate ping name '{ping_key}' "
f"already defined in '{already_seen}'",
all_objects.setdefault("pings", {})[ping_key] = ping_obj
sources[ping_key] = filepath
for scheduler, scheduled in ping_schedule_reverse_map.items():
if scheduler in all_objects["pings"] and isinstance(
all_objects["pings"][scheduler], Ping
scheduler_obj: Ping = cast(Ping, all_objects["pings"][scheduler])
scheduler_obj.schedules_pings = sorted(list(scheduled))
def _instantiate_tags(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
Load a list of tags.yaml files, convert the JSON information into Tag
global_no_lint = content.get("no_lint", [])
assert isinstance(global_no_lint, list)
for tag_key, tag_val in sorted(content.items()):
if tag_key.startswith("$"):
if tag_key == "no_lint":
if not isinstance(tag_val, dict):
raise TypeError(f"Invalid content for tag {tag_key}")
tag_val["name"] = tag_key
tag_obj = Tag(
defined_in=getattr(tag_val, "defined_in", None),
except Exception as e:
yield util.format_error(filepath, f"On instance '{tag_key}'", str(e))
if tag_obj is not None:
tag_obj.no_lint = sorted(set(tag_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and tag_obj.defined_in is not None:
tag_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(tag_key)
if already_seen is not None:
# We've seen this tag name already
yield util.format_error(
f"Duplicate tag name '{tag_key}' "
f"already defined in '{already_seen}'",
all_objects.setdefault("tags", {})[tag_key] = tag_obj
sources[tag_key] = filepath
def _preprocess_objects(objs: ObjectTree, config: Dict[str, Any]) -> ObjectTree:
Preprocess the object tree to better set defaults.
for category in objs.values():
for obj in category.values():
if not isinstance(obj, Metric):
if not config.get("do_not_disable_expired", False) and hasattr(
obj, "is_disabled"
obj.disabled = obj.is_disabled()
if hasattr(obj, "send_in_pings"):
if "default" in obj.send_in_pings:
obj.send_in_pings = obj.default_store_names + [
x for x in obj.send_in_pings if x != "default"
obj.send_in_pings = sorted(list(set(obj.send_in_pings)))
return objs
def parse_objects(
filepaths: Iterable[Path], config: Optional[Dict[str, Any]] = None
) -> Generator[str, None, ObjectTree]:
Parse one or more metrics.yaml and/or pings.yaml files, returning a tree of
`metrics.Metric`, `pings.Ping`, and `tags.Tag` instances.
The result is a generator over any errors. If there are no errors, the
actual metrics can be obtained from `result.value`. For example::
result = metrics.parse_metrics(filepaths)
for err in result:
all_metrics = result.value
The result value is a dictionary of category names to categories, where
each category is a dictionary from metric name to `metrics.Metric`
instances. There are also the special categories `pings` and `tags`
containing all of the `pings.Ping` and `tags.Tag` instances, respectively.
:param filepaths: list of Path objects to metrics.yaml, pings.yaml, and/or
tags.yaml files
:param config: A dictionary of options that change parsing behavior.
Supported keys are:
- `allow_reserved`: Allow values reserved for internal Glean use.
- `do_not_disable_expired`: Don't mark expired metrics as disabled.
This is useful when you want to retain the original "disabled"
value from the `metrics.yaml`, rather than having it overridden when
the metric expires.
- `allow_missing_files`: Do not raise a `FileNotFoundError` if any of
the input `filepaths` do not exist.
if config is None:
config = {}
all_objects: ObjectTree = DictWrapper()
sources: Dict[Any, Path] = {}
filepaths = util.ensure_list(filepaths)
for filepath in filepaths:
content, filetype = yield from _load_file(filepath, config)
if filetype == "metrics":
yield from _instantiate_metrics(
all_objects, sources, content, filepath, config
elif filetype == "pings":
yield from _instantiate_pings(
all_objects, sources, content, filepath, config
elif filetype == "tags":
yield from _instantiate_tags(
all_objects, sources, content, filepath, config
return _preprocess_objects(all_objects, config)