Source code

Revision control

Copy as Markdown

Other Tools

# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Code for parsing metrics.yaml files.
"""
import functools
from pathlib import Path
import textwrap
from typing import Any, cast, Dict, Generator, Iterable, Optional, Set, Tuple, Union
import jsonschema # type: ignore
from jsonschema.exceptions import ValidationError # type: ignore
from .metrics import Metric, ObjectTree
from .pings import Ping, RESERVED_PING_NAMES
from .tags import Tag
from . import util
from .util import DictWrapper
ROOT_DIR = Path(__file__).parent
SCHEMAS_DIR = ROOT_DIR / "schemas"
def _update_validator(validator):
"""
Adds some custom validators to the jsonschema validator that produce
nicer error messages.
"""
def required(validator, required, instance, schema):
if not validator.is_type(instance, "object"):
return
missing_properties = set(
property for property in required if property not in instance
)
if len(missing_properties):
missing_properties = sorted(list(missing_properties))
yield ValidationError(
f"Missing required properties: {', '.join(missing_properties)}"
)
validator.VALIDATORS["required"] = required
def _load_file(
filepath: Path, parser_config: Dict[str, Any]
) -> Generator[str, None, Tuple[Dict[str, util.JSONType], Optional[str]]]:
"""
Load a metrics.yaml or pings.yaml format file.
If the `filepath` does not exist, raises `FileNotFoundError`, unless
`parser_config["allow_missing_files"]` is `True`.
"""
try:
content = util.load_yaml_or_json(filepath)
except FileNotFoundError:
if not parser_config.get("allow_missing_files", False):
raise
else:
return {}, None
except Exception as e:
yield util.format_error(filepath, "", textwrap.fill(str(e)))
return {}, None
if content is None:
yield util.format_error(filepath, "", f"'{filepath}' file can not be empty.")
return {}, None
if not isinstance(content, dict):
return {}, None
if content == {}:
return {}, None
schema_key = content.get("$schema")
if not isinstance(schema_key, str):
raise TypeError(f"Invalid schema key {schema_key}")
filetype: Optional[str] = None
try:
filetype = schema_key.split("/")[-2]
except IndexError:
filetype = None
if filetype not in ("metrics", "pings", "tags"):
filetype = None
for error in validate(content, filepath):
content = {}
yield error
return content, filetype
@functools.lru_cache(maxsize=1)
def _load_schemas() -> Dict[str, Tuple[Any, Any]]:
"""
Load all of the known schemas from disk, and put them in a map based on the
schema's $id.
"""
schemas = {}
for schema_path in SCHEMAS_DIR.glob("*.yaml"):
schema = util.load_yaml_or_json(schema_path)
resolver = util.get_null_resolver(schema)
validator_class = jsonschema.validators.validator_for(schema)
_update_validator(validator_class)
validator_class.check_schema(schema)
validator = validator_class(schema, resolver=resolver)
schemas[schema["$id"]] = (schema, validator)
return schemas
def _get_schema(
schema_id: str, filepath: Union[str, Path] = "<input>"
) -> Tuple[Any, Any]:
"""
Get the schema for the given schema $id.
"""
schemas = _load_schemas()
if schema_id not in schemas:
raise ValueError(
util.format_error(
filepath,
"",
f"$schema key must be one of {', '.join(schemas.keys())}",
)
)
return schemas[schema_id]
def _get_schema_for_content(
content: Dict[str, util.JSONType], filepath: Union[str, Path]
) -> Tuple[Any, Any]:
"""
Get the appropriate schema for the given JSON content.
"""
schema_url = content.get("$schema")
if not isinstance(schema_url, str):
raise TypeError("Invalid $schema type {schema_url}")
return _get_schema(schema_url, filepath)
def validate(
content: Dict[str, util.JSONType], filepath: Union[str, Path] = "<input>"
) -> Generator[str, None, None]:
"""
Validate the given content against the appropriate schema.
"""
try:
schema, validator = _get_schema_for_content(content, filepath)
except ValueError as e:
yield str(e)
else:
yield from (
util.format_error(filepath, "", util.pprint_validation_error(e))
for e in validator.iter_errors(content)
)
def _instantiate_metrics(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
"""
Load a list of metrics.yaml files, convert the JSON information into Metric
objects, and merge them into a single tree.
"""
global_no_lint = content.get("no_lint", [])
global_tags = content.get("$tags", [])
assert isinstance(global_tags, list)
for category_key, category_val in sorted(content.items()):
if category_key.startswith("$"):
continue
if category_key == "no_lint":
continue
if not config.get("allow_reserved") and category_key.split(".")[0] == "glean":
yield util.format_error(
filepath,
f"For category '{category_key}'",
"Categories beginning with 'glean' are reserved for "
"Glean internal use.",
)
continue
all_objects.setdefault(category_key, DictWrapper())
if not isinstance(category_val, dict):
raise TypeError(f"Invalid content for {category_key}")
for metric_key, metric_val in sorted(category_val.items()):
try:
metric_obj = Metric.make_metric(
category_key, metric_key, metric_val, validated=True, config=config
)
except Exception as e:
yield util.format_error(
filepath,
f"On instance {category_key}.{metric_key}",
str(e),
metric_val.defined_in["line"],
)
metric_obj = None
else:
if (
not config.get("allow_reserved")
and "all-pings" in metric_obj.send_in_pings
):
yield util.format_error(
filepath,
f"On instance {category_key}.{metric_key}",
'Only internal metrics may specify "all-pings" '
'in "send_in_pings"',
metric_val.defined_in["line"],
)
metric_obj = None
if metric_obj is not None:
metric_obj.no_lint = sorted(set(metric_obj.no_lint + global_no_lint))
if len(global_tags):
metric_obj.metadata["tags"] = sorted(
set(metric_obj.metadata.get("tags", []) + global_tags)
)
if isinstance(filepath, Path):
metric_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get((category_key, metric_key))
if already_seen is not None:
# We've seen this metric name already
yield util.format_error(
filepath,
"",
(
f"Duplicate metric name '{category_key}.{metric_key}' "
f"already defined in '{already_seen}'"
),
metric_obj.defined_in["line"],
)
else:
all_objects[category_key][metric_key] = metric_obj
sources[(category_key, metric_key)] = filepath
def _instantiate_pings(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
"""
Load a list of pings.yaml files, convert the JSON information into Ping
objects.
"""
global_no_lint = content.get("no_lint", [])
assert isinstance(global_no_lint, list)
ping_schedule_reverse_map: Dict[str, Set[str]] = dict()
for ping_key, ping_val in sorted(content.items()):
if ping_key.startswith("$"):
continue
if ping_key == "no_lint":
continue
if not config.get("allow_reserved"):
if ping_key in RESERVED_PING_NAMES:
yield util.format_error(
filepath,
f"For ping '{ping_key}'",
f"Ping uses a reserved name ({RESERVED_PING_NAMES})",
)
continue
if not isinstance(ping_val, dict):
raise TypeError(f"Invalid content for ping {ping_key}")
ping_val["name"] = ping_key
if "metadata" in ping_val and "ping_schedule" in ping_val["metadata"]:
if ping_key in ping_val["metadata"]["ping_schedule"]:
yield util.format_error(
filepath,
f"For ping '{ping_key}'",
"ping_schedule contains its own ping name",
)
continue
for ping_schedule in ping_val["metadata"]["ping_schedule"]:
if ping_schedule not in ping_schedule_reverse_map:
ping_schedule_reverse_map[ping_schedule] = set()
ping_schedule_reverse_map[ping_schedule].add(ping_key)
try:
ping_obj = Ping(
defined_in=getattr(ping_val, "defined_in", None),
_validated=True,
**ping_val,
)
except Exception as e:
yield util.format_error(filepath, f"On instance '{ping_key}'", str(e))
continue
if ping_obj is not None:
ping_obj.no_lint = sorted(set(ping_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and ping_obj.defined_in is not None:
ping_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(ping_key)
if already_seen is not None:
# We've seen this ping name already
yield util.format_error(
filepath,
"",
f"Duplicate ping name '{ping_key}' "
f"already defined in '{already_seen}'",
)
else:
all_objects.setdefault("pings", {})[ping_key] = ping_obj
sources[ping_key] = filepath
for scheduler, scheduled in ping_schedule_reverse_map.items():
if scheduler in all_objects["pings"] and isinstance(
all_objects["pings"][scheduler], Ping
):
scheduler_obj: Ping = cast(Ping, all_objects["pings"][scheduler])
scheduler_obj.schedules_pings = sorted(list(scheduled))
def _instantiate_tags(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]:
"""
Load a list of tags.yaml files, convert the JSON information into Tag
objects.
"""
global_no_lint = content.get("no_lint", [])
assert isinstance(global_no_lint, list)
for tag_key, tag_val in sorted(content.items()):
if tag_key.startswith("$"):
continue
if tag_key == "no_lint":
continue
if not isinstance(tag_val, dict):
raise TypeError(f"Invalid content for tag {tag_key}")
tag_val["name"] = tag_key
try:
tag_obj = Tag(
defined_in=getattr(tag_val, "defined_in", None),
_validated=True,
**tag_val,
)
except Exception as e:
yield util.format_error(filepath, f"On instance '{tag_key}'", str(e))
continue
if tag_obj is not None:
tag_obj.no_lint = sorted(set(tag_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and tag_obj.defined_in is not None:
tag_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(tag_key)
if already_seen is not None:
# We've seen this tag name already
yield util.format_error(
filepath,
"",
f"Duplicate tag name '{tag_key}' "
f"already defined in '{already_seen}'",
)
else:
all_objects.setdefault("tags", {})[tag_key] = tag_obj
sources[tag_key] = filepath
def _preprocess_objects(objs: ObjectTree, config: Dict[str, Any]) -> ObjectTree:
"""
Preprocess the object tree to better set defaults.
"""
for category in objs.values():
for obj in category.values():
if not isinstance(obj, Metric):
continue
if not config.get("do_not_disable_expired", False) and hasattr(
obj, "is_disabled"
):
obj.disabled = obj.is_disabled()
if hasattr(obj, "send_in_pings"):
if "default" in obj.send_in_pings:
obj.send_in_pings = obj.default_store_names + [
x for x in obj.send_in_pings if x != "default"
]
obj.send_in_pings = sorted(list(set(obj.send_in_pings)))
return objs
@util.keep_value
def parse_objects(
filepaths: Iterable[Path], config: Optional[Dict[str, Any]] = None
) -> Generator[str, None, ObjectTree]:
"""
Parse one or more metrics.yaml and/or pings.yaml files, returning a tree of
`metrics.Metric`, `pings.Ping`, and `tags.Tag` instances.
The result is a generator over any errors. If there are no errors, the
actual metrics can be obtained from `result.value`. For example::
result = metrics.parse_metrics(filepaths)
for err in result:
print(err)
all_metrics = result.value
The result value is a dictionary of category names to categories, where
each category is a dictionary from metric name to `metrics.Metric`
instances. There are also the special categories `pings` and `tags`
containing all of the `pings.Ping` and `tags.Tag` instances, respectively.
:param filepaths: list of Path objects to metrics.yaml, pings.yaml, and/or
tags.yaml files
:param config: A dictionary of options that change parsing behavior.
Supported keys are:
- `allow_reserved`: Allow values reserved for internal Glean use.
- `do_not_disable_expired`: Don't mark expired metrics as disabled.
This is useful when you want to retain the original "disabled"
value from the `metrics.yaml`, rather than having it overridden when
the metric expires.
- `allow_missing_files`: Do not raise a `FileNotFoundError` if any of
the input `filepaths` do not exist.
"""
if config is None:
config = {}
all_objects: ObjectTree = DictWrapper()
sources: Dict[Any, Path] = {}
filepaths = util.ensure_list(filepaths)
for filepath in filepaths:
content, filetype = yield from _load_file(filepath, config)
if filetype == "metrics":
yield from _instantiate_metrics(
all_objects, sources, content, filepath, config
)
elif filetype == "pings":
yield from _instantiate_pings(
all_objects, sources, content, filepath, config
)
elif filetype == "tags":
yield from _instantiate_tags(
all_objects, sources, content, filepath, config
)
return _preprocess_objects(all_objects, config)