Source code
Revision control
Copy as Markdown
Other Tools
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
"""
Code for parsing metrics.yaml files.
"""
import functools
from pathlib import Path
import textwrap
from typing import Any, cast, Dict, Generator, Iterable, Optional, Set, Tuple, Union
import jsonschema # type: ignore
from jsonschema.exceptions import ValidationError # type: ignore
from .metrics import Metric, ObjectTree
from .pings import Ping, RESERVED_PING_NAMES
from .tags import Tag
from . import util
from .util import DictWrapper
ROOT_DIR = Path(__file__).parent
SCHEMAS_DIR = ROOT_DIR / "schemas"
def _update_validator(validator):
"""
Adds some custom validators to the jsonschema validator that produce
nicer error messages.
"""
def required(validator, required, instance, schema):
if not validator.is_type(instance, "object"):
return
missing_properties = set(
property for property in required if property not in instance
)
if len(missing_properties):
missing_properties = sorted(list(missing_properties))
yield ValidationError(
f"Missing required properties: {', '.join(missing_properties)}"
)
validator.VALIDATORS["required"] = required
def _load_file(
filepath: Path, parser_config: Dict[str, Any]
) -> Generator[str, None, Tuple[Dict[str, util.JSONType], Optional[str]]]:
"""
Load a metrics.yaml or pings.yaml format file.
If the `filepath` does not exist, raises `FileNotFoundError`, unless
`parser_config["allow_missing_files"]` is `True`.
"""
try:
content = util.load_yaml_or_json(filepath)
except FileNotFoundError:
if not parser_config.get("allow_missing_files", False):
raise
else:
return {}, None
except Exception as e:
yield util.format_error(filepath, "", textwrap.fill(str(e)))
return {}, None
if content is None:
yield util.format_error(filepath, "", f"'{filepath}' file can not be empty.")
return {}, None
if not isinstance(content, dict):
return {}, None
if content == {}:
return {}, None
schema_key = content.get("$schema")
if not isinstance(schema_key, str):
raise TypeError(f"Invalid schema key {schema_key}")
filetype: Optional[str] = None
try:
filetype = schema_key.split("/")[-2]
except IndexError:
filetype = None
if filetype not in ("metrics", "pings", "tags"):
filetype = None
for error in validate(content, filepath):
content = {}
yield error
return content, filetype
@functools.lru_cache(maxsize=1)
def _load_schemas() -> Dict[str, Tuple[Any, Any]]:
"""
Load all of the known schemas from disk, and put them in a map based on the
schema's $id.
"""
schemas = {}
for schema_path in SCHEMAS_DIR.glob("*.yaml"):
schema = util.load_yaml_or_json(schema_path)
resolver = util.get_null_resolver(schema)
validator_class = jsonschema.validators.validator_for(schema)
_update_validator(validator_class)
validator_class.check_schema(schema)
validator = validator_class(schema, resolver=resolver)
schemas[schema["$id"]] = (schema, validator)
return schemas
def _get_schema(
schema_id: str, filepath: Union[str, Path] = "<input>"
) -> Tuple[Any, Any]:
"""
Get the schema for the given schema $id.
"""
schemas = _load_schemas()
if schema_id not in schemas:
raise ValueError(
util.format_error(
filepath,
"",
f"$schema key must be one of {', '.join(schemas.keys())}",
)
)
return schemas[schema_id]
def _get_schema_for_content(
content: Dict[str, util.JSONType], filepath: Union[str, Path]
) -> Tuple[Any, Any]:
"""
Get the appropriate schema for the given JSON content.
"""
schema_url = content.get("$schema")
if not isinstance(schema_url, str):
raise TypeError("Invalid $schema type {schema_url}")
return _get_schema(schema_url, filepath)
def validate(
content: Dict[str, util.JSONType], filepath: Union[str, Path] = "<input>"
) -> Generator[str, None, None]:
"""
Validate the given content against the appropriate schema.
"""
try:
schema, validator = _get_schema_for_content(content, filepath)
except ValueError as e:
yield str(e)
else:
yield from (
util.format_error(filepath, "", util.pprint_validation_error(e))
for e in validator.iter_errors(content)
)
def _instantiate_metrics(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
"""
Load a list of metrics.yaml files, convert the JSON information into Metric
objects, and merge them into a single tree.
"""
global_no_lint = content.get("no_lint", [])
global_tags = content.get("$tags", [])
assert isinstance(global_tags, list)
for category_key, category_val in sorted(content.items()):
if category_key.startswith("$"):
continue
if category_key == "no_lint":
continue
if not config.get("allow_reserved") and category_key.split(".")[0] == "glean":
yield util.format_error(
filepath,
f"For category '{category_key}'",
"Categories beginning with 'glean' are reserved for "
"Glean internal use.",
)
continue
all_objects.setdefault(category_key, DictWrapper())
if not isinstance(category_val, dict):
raise TypeError(f"Invalid content for {category_key}")
for metric_key, metric_val in sorted(category_val.items()):
try:
metric_obj = Metric.make_metric(
category_key, metric_key, metric_val, validated=True, config=config
)
except Exception as e:
yield util.format_error(
filepath,
f"On instance {category_key}.{metric_key}",
str(e),
metric_val.defined_in["line"],
)
metric_obj = None
else:
if (
not config.get("allow_reserved")
and "all-pings" in metric_obj.send_in_pings
):
yield util.format_error(
filepath,
f"On instance {category_key}.{metric_key}",
'Only internal metrics may specify "all-pings" '
'in "send_in_pings"',
metric_val.defined_in["line"],
)
metric_obj = None
if metric_obj is not None:
metric_obj.no_lint = sorted(set(metric_obj.no_lint + global_no_lint))
if len(global_tags):
metric_obj.metadata["tags"] = sorted(
set(metric_obj.metadata.get("tags", []) + global_tags)
)
if isinstance(filepath, Path):
metric_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get((category_key, metric_key))
if already_seen is not None:
# We've seen this metric name already
yield util.format_error(
filepath,
"",
(
f"Duplicate metric name '{category_key}.{metric_key}' "
f"already defined in '{already_seen}'"
),
metric_obj.defined_in["line"],
)
else:
all_objects[category_key][metric_key] = metric_obj
sources[(category_key, metric_key)] = filepath
def _instantiate_pings(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
"""
Load a list of pings.yaml files, convert the JSON information into Ping
objects.
"""
global_no_lint = content.get("no_lint", [])
assert isinstance(global_no_lint, list)
ping_schedule_reverse_map: Dict[str, Set[str]] = dict()
for ping_key, ping_val in sorted(content.items()):
if ping_key.startswith("$"):
continue
if ping_key == "no_lint":
continue
if not config.get("allow_reserved"):
if ping_key in RESERVED_PING_NAMES:
yield util.format_error(
filepath,
f"For ping '{ping_key}'",
f"Ping uses a reserved name ({RESERVED_PING_NAMES})",
)
continue
if not isinstance(ping_val, dict):
raise TypeError(f"Invalid content for ping {ping_key}")
ping_val["name"] = ping_key
if "metadata" in ping_val and "ping_schedule" in ping_val["metadata"]:
if ping_key in ping_val["metadata"]["ping_schedule"]:
yield util.format_error(
filepath,
f"For ping '{ping_key}'",
"ping_schedule contains its own ping name",
)
continue
for ping_schedule in ping_val["metadata"]["ping_schedule"]:
if ping_schedule not in ping_schedule_reverse_map:
ping_schedule_reverse_map[ping_schedule] = set()
ping_schedule_reverse_map[ping_schedule].add(ping_key)
try:
ping_obj = Ping(
defined_in=getattr(ping_val, "defined_in", None),
_validated=True,
**ping_val,
)
except Exception as e:
yield util.format_error(filepath, f"On instance '{ping_key}'", str(e))
continue
if ping_obj is not None:
ping_obj.no_lint = sorted(set(ping_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and ping_obj.defined_in is not None:
ping_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(ping_key)
if already_seen is not None:
# We've seen this ping name already
yield util.format_error(
filepath,
"",
f"Duplicate ping name '{ping_key}' "
f"already defined in '{already_seen}'",
)
else:
all_objects.setdefault("pings", {})[ping_key] = ping_obj
sources[ping_key] = filepath
for scheduler, scheduled in ping_schedule_reverse_map.items():
if scheduler in all_objects["pings"] and isinstance(
all_objects["pings"][scheduler], Ping
):
scheduler_obj: Ping = cast(Ping, all_objects["pings"][scheduler])
scheduler_obj.schedules_pings = sorted(list(scheduled))
def _instantiate_tags(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
"""
Load a list of tags.yaml files, convert the JSON information into Tag
objects.
"""
global_no_lint = content.get("no_lint", [])
assert isinstance(global_no_lint, list)
for tag_key, tag_val in sorted(content.items()):
if tag_key.startswith("$"):
continue
if tag_key == "no_lint":
continue
if not isinstance(tag_val, dict):
raise TypeError(f"Invalid content for tag {tag_key}")
tag_val["name"] = tag_key
try:
tag_obj = Tag(
defined_in=getattr(tag_val, "defined_in", None),
_validated=True,
**tag_val,
)
except Exception as e:
yield util.format_error(filepath, f"On instance '{tag_key}'", str(e))
continue
if tag_obj is not None:
tag_obj.no_lint = sorted(set(tag_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and tag_obj.defined_in is not None:
tag_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(tag_key)
if already_seen is not None:
# We've seen this tag name already
yield util.format_error(
filepath,
"",
f"Duplicate tag name '{tag_key}' "
f"already defined in '{already_seen}'",
)
else:
all_objects.setdefault("tags", {})[tag_key] = tag_obj
sources[tag_key] = filepath
def _preprocess_objects(objs: ObjectTree, config: Dict[str, Any]) -> ObjectTree:
"""
Preprocess the object tree to better set defaults.
"""
for category in objs.values():
for obj in category.values():
if not isinstance(obj, Metric):
continue
if not config.get("do_not_disable_expired", False) and hasattr(
obj, "is_disabled"
):
obj.disabled = obj.is_disabled()
if hasattr(obj, "send_in_pings"):
if "default" in obj.send_in_pings:
obj.send_in_pings = obj.default_store_names + [
x for x in obj.send_in_pings if x != "default"
]
obj.send_in_pings = sorted(list(set(obj.send_in_pings)))
return objs
@util.keep_value
def parse_objects(
filepaths: Iterable[Path], config: Optional[Dict[str, Any]] = None
) -> Generator[str, None, ObjectTree]:
"""
Parse one or more metrics.yaml and/or pings.yaml files, returning a tree of
`metrics.Metric`, `pings.Ping`, and `tags.Tag` instances.
The result is a generator over any errors. If there are no errors, the
actual metrics can be obtained from `result.value`. For example::
result = metrics.parse_metrics(filepaths)
for err in result:
print(err)
all_metrics = result.value
The result value is a dictionary of category names to categories, where
each category is a dictionary from metric name to `metrics.Metric`
instances. There are also the special categories `pings` and `tags`
containing all of the `pings.Ping` and `tags.Tag` instances, respectively.
:param filepaths: list of Path objects to metrics.yaml, pings.yaml, and/or
tags.yaml files
:param config: A dictionary of options that change parsing behavior.
Supported keys are:
- `allow_reserved`: Allow values reserved for internal Glean use.
- `do_not_disable_expired`: Don't mark expired metrics as disabled.
This is useful when you want to retain the original "disabled"
value from the `metrics.yaml`, rather than having it overridden when
the metric expires.
- `allow_missing_files`: Do not raise a `FileNotFoundError` if any of
the input `filepaths` do not exist.
"""
if config is None:
config = {}
all_objects: ObjectTree = DictWrapper()
sources: Dict[Any, Path] = {}
filepaths = util.ensure_list(filepaths)
for filepath in filepaths:
content, filetype = yield from _load_file(filepath, config)
if filetype == "metrics":
yield from _instantiate_metrics(
all_objects, sources, content, filepath, config
)
elif filetype == "pings":
yield from _instantiate_pings(
all_objects, sources, content, filepath, config
)
elif filetype == "tags":
yield from _instantiate_tags(
all_objects, sources, content, filepath, config
)
return _preprocess_objects(all_objects, config)