Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import time
from cProfile import Profile
from pathlib import Path
import six
from .base import MachError
INVALID_COMMAND_CONTEXT = r"""
It looks like you tried to run a mach command from an invalid context. The %s
command failed to meet the following conditions: %s
Run |mach help| to show a list of all commands available to the current context.
""".lstrip()
class MachRegistrar(object):
"""Container for mach command and config providers."""
def __init__(self):
self.command_handlers = {}
self.commands_by_category = {}
self.settings_providers = set()
self.categories = {}
self.require_conditions = False
self.command_depth = 0
def register_command_handler(self, handler):
name = handler.name
if not handler.category:
raise MachError(
"Cannot register a mach command without a " "category: %s" % name
)
if handler.category not in self.categories:
raise MachError(
"Cannot register a command to an undefined "
"category: %s -> %s" % (name, handler.category)
)
self.command_handlers[name] = handler
self.commands_by_category[handler.category].add(name)
def register_settings_provider(self, cls):
self.settings_providers.add(cls)
def register_category(self, name, title, description, priority=50):
self.categories[name] = (title, description, priority)
self.commands_by_category[name] = set()
@classmethod
def _condition_failed_message(cls, name, conditions):
msg = ["\n"]
for c in conditions:
part = [" %s" % getattr(c, "__name__", c)]
if c.__doc__ is not None:
part.append(c.__doc__)
msg.append(" - ".join(part))
return INVALID_COMMAND_CONTEXT % (name, "\n".join(msg))
@classmethod
def _instance(_, handler, context, **kwargs):
if context is None:
raise ValueError("Expected a non-None context.")
prerun = getattr(context, "pre_dispatch_handler", None)
if prerun:
prerun(context, handler, args=kwargs)
context.handler = handler
return handler.create_instance(context, handler.virtualenv_name)
@classmethod
def _fail_conditions(_, handler, instance):
fail_conditions = []
if handler.conditions:
for c in handler.conditions:
if not c(instance):
fail_conditions.append(c)
return fail_conditions
def _run_command_handler(
self,
handler,
context,
command_site_manager=None,
debug_command=False,
profile_command=False,
**kwargs,
):
instance = MachRegistrar._instance(handler, context, **kwargs)
fail_conditions = MachRegistrar._fail_conditions(handler, instance)
if fail_conditions:
print(
MachRegistrar._condition_failed_message(handler.name, fail_conditions)
)
return 1
self.command_depth += 1
fn = handler.func
if handler.virtualenv_name:
if command_site_manager:
instance.virtualenv_manager = command_site_manager
else:
instance.activate_virtualenv()
profile = None
if profile_command:
profile = Profile()
profile.enable()
start_time = time.monotonic()
if debug_command:
import pdb
result = pdb.runcall(fn, instance, **kwargs)
else:
result = fn(instance, **kwargs)
end_time = time.monotonic()
if profile_command:
profile.disable()
profile_file = (
Path(context.topdir) / f"mach_profile_{handler.name}.cProfile"
)
profile.dump_stats(profile_file)
print(
f'Mach command profile created at "{profile_file}". To visualize, use '
f"snakeviz:"
)
print("python3 -m pip install snakeviz")
print(f"python3 -m snakeviz {profile_file.name}")
result = result or 0
assert isinstance(result, six.integer_types)
if not debug_command:
postrun = getattr(context, "post_dispatch_handler", None)
if postrun:
postrun(
context,
handler,
instance,
not result,
start_time,
end_time,
self.command_depth,
args=kwargs,
)
self.command_depth -= 1
return result
def dispatch(self, name, context, argv=None, subcommand=None, **kwargs):
"""Dispatch/run a command.
Commands can use this to call other commands.
"""
from mach.command_util import load_command_module_from_command_name
handler = self.command_handlers.get(name)
if not handler:
load_command_module_from_command_name(name, context.topdir)
handler = self.command_handlers.get(name)
if not handler:
raise MachError(
f"Mach was not able to load the module for the '{name}' command."
)
if subcommand:
handler = handler.subcommand_handlers[subcommand]
if handler.parser:
parser = handler.parser
# save and restore existing defaults so **kwargs don't persist across
# subsequent invocations of Registrar.dispatch()
old_defaults = parser._defaults.copy()
parser.set_defaults(**kwargs)
kwargs, unknown = parser.parse_known_args(argv or [])
kwargs = vars(kwargs)
parser._defaults = old_defaults
if unknown:
if subcommand:
name = "{} {}".format(name, subcommand)
parser.error(
"unrecognized arguments for {}: {}".format(
name, ", ".join(["'{}'".format(arg) for arg in unknown])
)
)
return self._run_command_handler(handler, context, **kwargs)
Registrar = MachRegistrar()