DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (d38398e5144e)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from __future__ import absolute_import, print_function, unicode_literals

import concurrent.futures as futures
import requests
import requests.adapters
import json
import os
import logging

from slugid import nice as slugid
from taskgraph.util.time import (
    current_json_time,
    json_time_from_now
)

logger = logging.getLogger(__name__)

# the maximum number of parallel createTask calls to make
CONCURRENCY = 50


def create_tasks(taskgraph, label_to_taskid, params):
    taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}

    session = requests.Session()

    # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
    # that limit. Connections are established as needed, so using a large value
    # should not negatively impact performance.
    http_adapter = requests.adapters.HTTPAdapter(pool_connections=CONCURRENCY,
                                                 pool_maxsize=CONCURRENCY)
    session.mount('https://', http_adapter)
    session.mount('http://', http_adapter)

    decision_task_id = os.environ.get('TASK_ID')

    # when running as an actual decision task, we use the decision task's
    # taskId as the taskGroupId.  The process that created the decision task
    # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
    # fall back to a slugid
    task_group_id = decision_task_id or slugid()
    scheduler_id = 'gecko-level-{}'.format(params['level'])

    with futures.ThreadPoolExecutor(CONCURRENCY) as e:
        fs = {}

        # We can't submit a task until its dependencies have been submitted.
        # So our strategy is to walk the graph and submit tasks once all
        # their dependencies have been submitted.
        #
        # Using visit_postorder() here isn't the most efficient: we'll
        # block waiting for dependencies of task N to submit even though
        # dependencies for task N+1 may be finished. If we need to optimize
        # this further, we can build a graph of task dependencies and walk
        # that.
        for task_id in taskgraph.graph.visit_postorder():
            task_def = taskgraph.tasks[task_id].task
            attributes = taskgraph.tasks[task_id].attributes
            # if this task has no dependencies, make it depend on this decision
            # task so that it does not start immediately; and so that if this loop
            # fails halfway through, none of the already-created tasks run.
            if decision_task_id and not task_def.get('dependencies'):
                task_def['dependencies'] = [decision_task_id]

            task_def['taskGroupId'] = task_group_id
            task_def['schedulerId'] = scheduler_id

            # Wait for dependencies before submitting this.
            deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
                       if dep in fs]
            for f in futures.as_completed(deps_fs):
                f.result()

            fs[task_id] = e.submit(create_task, session, task_id,
                                   taskid_to_label[task_id], task_def)

            # Schedule tasks as many times as task_duplicates indicates
            for i in range(1, attributes.get('task_duplicates', 1)):
                # We use slugid() since we want a distinct task id
                fs[task_id] = e.submit(create_task, session, slugid(),
                                       taskid_to_label[task_id], task_def)

        # Wait for all futures to complete.
        for f in futures.as_completed(fs.values()):
            f.result()


def create_task(session, task_id, label, task_def):
    # create the task using 'http://taskcluster/queue', which is proxied to the queue service
    # with credentials appropriate to this job.

    # Resolve timestamps
    now = current_json_time(datetime_format=True)
    task_def = resolve_timestamps(now, task_def)

    logger.debug("Creating task with taskId {} for {}".format(task_id, label))
    res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
                      data=json.dumps(task_def))
    if res.status_code != 200:
        try:
            logger.error(res.json()['message'])
        except:
            logger.error(res.text)
        res.raise_for_status()


def resolve_timestamps(now, task_def):
    def recurse(val):
        if isinstance(val, list):
            return [recurse(v) for v in val]
        elif isinstance(val, dict):
            if val.keys() == ['relative-datestamp']:
                return json_time_from_now(val['relative-datestamp'], now)
            else:
                return {k: recurse(v) for k, v in val.iteritems()}
        else:
            return val
    return recurse(task_def)