DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (c68fe15a81fc)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#ifndef mozilla_dom_MPSCQueue_h
#define mozilla_dom_MPSCQueue_h

namespace mozilla {

// This class implements a lock-free multiple producer single consumer queue of
// fixed size log messages, with the following characteristics:
// - Unbounded (uses a intrinsic linked list)
// - Allocates on Push. Push can be called on any thread.
// - Deallocates on Pop. Pop MUST always be called on the same thread for the
// life-time of the queue.
//
// In our scenario, the producer threads are real-time, they can't block. The
// consummer thread runs every now and then and empties the queue to a log
// file, on disk.
//
// Having fixed size messages and jemalloc is probably not the fastest, but
// allows having a simpler design, we count on the fact that jemalloc will get
// the memory from a thread-local source most of the time.  We'll replace
// this with a fixed-size ring buffer if this becomes an issue.
const size_t MPSC_MSG_RESERVERD = sizeof(void*);

template <typename T>
class MPSCQueue {
 public:
  struct Message {
    Message() { mNext.store(nullptr, std::memory_order_relaxed); }
    Message(const Message& aMessage) = delete;
    void operator=(const Message& aMessage) = delete;

    T data;
    std::atomic<Message*> mNext;
  };

  // The goal here is to make it easy on the allocator. We pack a pointer in the
  // message struct, and we still want to do power of two allocations to
  // minimize allocator slop. The allocation size are going to be constant, so
  // the allocation is probably going to hit the thread local cache in jemalloc,
  // making it cheap and, more importantly, lock-free enough. This has been
  // measured to be cheap and reliable enough, but will be replaced in the
  // longer run.
#if !defined(XP_WIN) && !defined(MOZ_WIDGET_ANDROID)
  static_assert(IsPowerOfTwo(sizeof(MPSCQueue<T>::Message)),
                "MPSCQueue internal allocations must have a size that is a"
                "power of two ");
#endif

  // Creates a new MPSCQueue. Initially, the queue has a single sentinel node,
  // pointed to by both mHead and mTail.
  MPSCQueue()
      // At construction, the initial message points to nullptr (it has no
      // successor). It is a sentinel node, that does not contain meaningful
      // data.
      : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {}

  ~MPSCQueue() {
    Message dummy;
    while (Pop(&dummy.data)) {
    }
    Message* front = mHead.load(std::memory_order_relaxed);
    delete front;
  }

  void Push(MPSCQueue<T>::Message* aMessage) {
    // The next two non-commented line are called A and B in this paragraph.
    // Producer threads i, i-1, etc. are numbered in the order they reached
    // A in time, thread i being the thread that has reached A first.
    // Atomically, on line A the new `mHead` is set to be the node that was
    // just allocated, with strong memory order. From now on, any thread
    // that reaches A will see that the node just allocated is
    // effectively the head of the list, and will make itself the new head
    // of the list.
    // In a bad case (when thread i executes A and then
    // is not scheduled for a long time), it is possible that thread i-1 and
    // subsequent threads create a seemingly disconnected set of nodes, but
    // they all have the correct value for the next node to set as their
    // mNext member on their respective stacks (in `prev`), and this is
    // always correct. When the scheduler resumes, and line B is executed,
    // the correct linkage is resumed.
    // Before line B, since mNext for the node was the last element of
    // the queue still has an mNext of nullptr, Pop will not see the node
    // added.
    // For line A, it's critical to have strong ordering both ways (since
    // it's going to possibly be read and write repeatidly by multiple
    // threads)
    // Line B can have weaker guarantees, it's only going to be written by a
    // single thread, and we just need to ensure it's read properly by a
    // single other one.
    Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel);
    prev->mNext.store(aMessage, std::memory_order_release);
  }

  // Copy the content of the first message of the queue to aOutput, and
  // frees the message. Returns true if there was a message, in which case
  // `aOutput` contains a valid value. If the queue was empty, returns false,
  // in which case `aOutput` is left untouched.
  bool Pop(T* aOutput) {
    // Similarly, in this paragraph, the two following lines are called A
    // and B, and threads are called thread i, i-1, etc. in order of
    // execution of line A.
    // On line A, the first element of the queue is acquired. It is simply a
    // sentinel node.
    // On line B, we acquire the node that has the data we want. If B is
    // null, then only the sentinel node was present in the queue, we can
    // safely return false.
    // mTail can be loaded with relaxed ordering, since it's not written nor
    // read by any other thread (this queue is single consumer).
    // mNext can be written to by one of the producer, so it's necessary to
    // ensure those writes are seen, hence the stricter ordering.
    Message* tail = mTail.load(std::memory_order_relaxed);
    Message* next = tail->mNext.load(std::memory_order_acquire);

    if (next == nullptr) {
      return false;
    }

    *aOutput = next->data;

    // Simply shift the queue one node further, so that the sentinel node is
    // now pointing to the correct most ancient node. It contains stale data,
    // but this data will never be read again.
    // It's only necessary to ensure the previous load on this thread is not
    // reordered past this line, so release ordering is sufficient here.
    mTail.store(next, std::memory_order_release);

    // This thread is now the only thing that points to `tail`, it can be
    // safely deleted.
    delete tail;

    return true;
  }

 private:
  // An atomic pointer to the most recent message in the queue.
  std::atomic<Message*> mHead;
  // An atomic pointer to a sentinel node, that points to the oldest message
  // in the queue.
  std::atomic<Message*> mTail;

  MPSCQueue(const MPSCQueue&) = delete;
  void operator=(const MPSCQueue&) = delete;
};

}  // namespace mozilla

#endif  // mozilla_dom_MPSCQueue_h