DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (dcc6d7a0dc00)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
import os

from mozharness.base.errors import HgErrorList
from mozharness.base.log import FATAL, INFO
from mozharness.base.vcs.mercurial import MercurialVCS


class MercurialRepoManipulationMixin(object):

    def get_version(self, repo_root,
                    version_file="browser/config/version.txt"):
        version_path = os.path.join(repo_root, version_file)
        contents = self.read_from_file(version_path, error_level=FATAL)
        lines = [l for l in contents.splitlines() if l and
                 not l.startswith("#")]
        return lines[-1].split(".")

    def replace(self, file_name, from_, to_):
        """ Replace text in a file.
            """
        text = self.read_from_file(file_name, error_level=FATAL)
        new_text = text.replace(from_, to_)
        if text == new_text:
            self.fatal("Cannot replace '%s' to '%s' in '%s'" %
                       (from_, to_, file_name))
        self.write_to_file(file_name, new_text, error_level=FATAL)

    def query_hg_revision(self, path):
        """ Avoid making 'pull' a required action every run, by being able
            to fall back to figuring out the revision from the cloned repo
            """
        m = MercurialVCS(log_obj=self.log_obj, config=self.config)
        revision = m.get_revision_from_path(path)
        return revision

    def hg_commit(self, cwd, message, user=None, ignore_no_changes=False):
        """ Commit changes to hg.
            """
        cmd = self.query_exe('hg', return_type='list') + [
            'commit', '-m', message]
        if user:
            cmd.extend(['-u', user])
        success_codes = [0]
        if ignore_no_changes:
            success_codes.append(1)
        self.run_command(
            cmd, cwd=cwd, error_list=HgErrorList,
            halt_on_failure=True,
            success_codes=success_codes
        )
        return self.query_hg_revision(cwd)

    def clean_repos(self):
        """ We may end up with contaminated local repos at some point, but
            we don't want to have to clobber and reclone from scratch every
            time.

            This is an attempt to clean up the local repos without needing a
            clobber.
            """
        dirs = self.query_abs_dirs()
        hg = self.query_exe("hg", return_type="list")
        hg_repos = self.query_repos()
        hg_strip_error_list = [{
            'substr': r'''abort: empty revision set''', 'level': INFO,
            'explanation': "Nothing to clean up; we're good!",
        }] + HgErrorList
        for repo_config in hg_repos:
            repo_name = repo_config["dest"]
            repo_path = os.path.join(dirs['abs_work_dir'], repo_name)
            if os.path.exists(repo_path):
                # hg up -C to discard uncommitted changes
                self.run_command(
                    hg + ["up", "-C", "-r", repo_config['revision']],
                    cwd=repo_path,
                    error_list=HgErrorList,
                    halt_on_failure=True,
                )
                # discard unpushed commits
                status = self.retry(
                    self.run_command,
                    args=(hg + ["--config", "extensions.mq=", "strip",
                          "--no-backup", "outgoing()"], ),
                    kwargs={
                        'cwd': repo_path,
                        'error_list': hg_strip_error_list,
                        'return_type': 'num_errors',
                        'success_codes': (0, 255),
                    },
                )
                if status not in [0, 255]:
                    self.fatal("Issues stripping outgoing revisions!")
                # 2nd hg up -C to make sure we're not on a stranded head
                # which can happen when reverting debugsetparents
                self.run_command(
                    hg + ["up", "-C", "-r", repo_config['revision']],
                    cwd=repo_path,
                    error_list=HgErrorList,
                    halt_on_failure=True,
                )

    def commit_changes(self):
        """ Do the commit.
            """
        hg = self.query_exe("hg", return_type="list")
        for cwd in self.query_commit_dirs():
            self.run_command(hg + ["diff"], cwd=cwd)
            self.hg_commit(
                cwd, user=self.config['hg_user'],
                message=self.query_commit_message(),
                ignore_no_changes=self.config.get("ignore_no_changes", False)
            )
        self.info("Now verify |hg out| and |hg out --patch| if you're paranoid, and --push")

    def hg_tag(self, cwd, tags, user=None, message=None, revision=None,
               force=None, halt_on_failure=True):
        if isinstance(tags, basestring):
            tags = [tags]
        cmd = self.query_exe('hg', return_type='list') + ['tag']
        if not message:
            message = "No bug - Tagging %s" % os.path.basename(cwd)
            if revision:
                message = "%s %s" % (message, revision)
            message = "%s with %s" % (message, ', '.join(tags))
            message += " a=release DONTBUILD CLOSED TREE"
        self.info(message)
        cmd.extend(['-m', message])
        if user:
            cmd.extend(['-u', user])
        if revision:
            cmd.extend(['-r', revision])
        if force:
            cmd.append('-f')
        cmd.extend(tags)
        return self.run_command(
            cmd, cwd=cwd, halt_on_failure=halt_on_failure,
            error_list=HgErrorList
        )

    def push(self):
        """
            """
        error_message = """Push failed!  If there was a push race, try rerunning
the script (--clean-repos --pull --migrate).  The second run will be faster."""
        hg = self.query_exe("hg", return_type="list")
        for cwd in self.query_push_dirs():
            if not cwd:
                self.warning("Skipping %s" % cwd)
                continue
            push_cmd = hg + ['push'] + self.query_push_args(cwd)
            if self.config.get("push_dest"):
                push_cmd.append(self.config["push_dest"])
            status = self.run_command(
                push_cmd,
                cwd=cwd,
                error_list=HgErrorList,
                success_codes=[0, 1],
            )
            if status == 1:
                self.warning("No changes for %s!" % cwd)
            elif status:
                self.fatal(error_message)