DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (5216dd412535)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
# Multiprocess activities with a push-driven divide-process-collect model.

import os, sys, time
from threading import Thread, Lock
from Queue import Queue, Empty
from datetime import datetime

class Source:
    def __init__(self, task_list, results, verbose = False):
        self.tasks = Queue()
        for task in task_list:
            self.tasks.put_nowait(task)

        self.results = results
        self.verbose = verbose
    
    def start(self, worker_count):
        t0 = datetime.now()

        sink = Sink(self.results)
        self.workers = [ Worker(_+1, self.tasks, sink, self.verbose) for _ in range(worker_count) ]
        if self.verbose: print '[P] Starting workers.'
        for w in self.workers:
            w.t0 = t0
            w.start()
        ans = self.join_workers()
        if self.verbose: print '[P] Finished.'

        t1 = datetime.now()
        dt = t1-t0

        return ans

    def join_workers(self):
        try:
            for w in self.workers:
                w.thread.join(20000)
            return True
        except KeyboardInterrupt:
            for w in self.workers:
                w.stop = True
            return False

class Sink:
    def __init__(self, results):
        self.results = results
        self.lock = Lock()

    def push(self, result):
        self.lock.acquire()
        try:
            self.results.push(result)
        finally:
            self.lock.release()

class Worker(object):
    def __init__(self, id, tasks, sink, verbose):
        self.id = id
        self.tasks = tasks
        self.sink = sink
        self.verbose = verbose

        self.thread = None
        self.stop = False

    def log(self, msg):
        dd = datetime.now() - self.t0
        dt = dd.seconds + 1e-6 * dd.microseconds
        
        if self.verbose:
            print '[W%d %.3f] %s' % (self.id, dt, msg)

    def start(self):
        self.thread = Thread(target=self.run)
        self.thread.setDaemon(True)
        self.thread.start()

    def run(self):
        try:
            while True:
                if self.stop:
                    break
                self.log('Get next task.')
                task = self.tasks.get(False)
                self.log('Start task %s.'%str(task))
                result = task()
                self.log('Finished task.')
                self.sink.push(result)
                self.log('Pushed result.')
        except Empty:
            pass