DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (0a25b3cd8249)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

/* Implementation of an asynchronous lock-free logging system. */

#ifndef mozilla_dom_AsyncLogger_h
#define mozilla_dom_AsyncLogger_h

#include <atomic>
#include <thread>
#include "mozilla/Logging.h"
#include "mozilla/Attributes.h"
#include "mozilla/MathAlgorithms.h"
#include "mozilla/Sprintf.h"

namespace mozilla {

namespace detail {

// This class implements a lock-free multiple producer single consumer queue of
// fixed size log messages, with the following characteristics:
// - Unbounded (uses a intrinsic linked list)
// - Allocates on Push. Push can be called on any thread.
// - Deallocates on Pop. Pop MUST always be called on the same thread for the
// life-time of the queue.
//
// In our scenario, the producer threads are real-time, they can't block. The
// consummer thread runs every now and then and empties the queue to a log
// file, on disk.
//
// Having fixed size messages and jemalloc is probably not the fastest, but
// allows having a simpler design, we count on the fact that jemalloc will get
// the memory from a thread-local source most of the time.
template <size_t MESSAGE_LENGTH>
class MPSCQueue {
 public:
  struct Message {
    Message() { mNext.store(nullptr, std::memory_order_relaxed); }
    Message(const Message& aMessage) = delete;
    void operator=(const Message& aMessage) = delete;

    char data[MESSAGE_LENGTH];
    std::atomic<Message*> mNext;
  };
  // Creates a new MPSCQueue. Initially, the queue has a single sentinel node,
  // pointed to by both mHead and mTail.
  MPSCQueue()
      // At construction, the initial message points to nullptr (it has no
      // successor). It is a sentinel node, that does not contain meaningful
      // data.
      : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {}

  ~MPSCQueue() {
    Message dummy;
    while (this->Pop(dummy.data)) {
    }
    Message* front = mHead.load(std::memory_order_relaxed);
    delete front;
  }

  void Push(MPSCQueue<MESSAGE_LENGTH>::Message* aMessage) {
    // The next two non-commented line are called A and B in this paragraph.
    // Producer threads i, i-1, etc. are numbered in the order they reached
    // A in time, thread i being the thread that has reached A first.
    // Atomically, on line A the new `mHead` is set to be the node that was
    // just allocated, with strong memory order. From now one, any thread
    // that reaches A will see that the node just allocated is
    // effectively the head of the list, and will make itself the new head
    // of the list.
    // In a bad case (when thread i executes A and then
    // is not scheduled for a long time), it is possible that thread i-1 and
    // subsequent threads create a seemingly disconnected set of nodes, but
    // they all have the correct value for the next node to set as their
    // mNext member on their respective stacks (in `prev`), and this is
    // always correct. When the scheduler resumes, and line B is executed,
    // the correct linkage is resumed.
    // Before line B, since mNext for the node the was the last element of
    // the queue still has an mNext of nullptr, Pop will not see the node
    // added.
    // For line A, it's critical to have strong ordering both ways (since
    // it's going to possibly be read and write repeatidly by multiple
    // threads)
    // Line B can have weaker guarantees, it's only going to be written by a
    // single thread, and we just need to ensure it's read properly by a
    // single other one.
    Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel);
    prev->mNext.store(aMessage, std::memory_order_release);
  }

  // Allocates a new node, copy aInput to the new memory location, and pushes
  // it to the end of the list.
  void Push(const char aInput[MESSAGE_LENGTH]) {
    // Create a new message, and copy the messages passed on argument to the
    // new memory location. We are not touching the queue right now. The
    // successor for this new node is set to be nullptr.
    Message* msg = new Message();
    strncpy(msg->data, aInput, MESSAGE_LENGTH);

    Push(msg);
  }

  // Copy the content of the first message of the queue to aOutput, and
  // frees the message. Returns true if there was a message, in which case
  // `aOutput` contains a valid value. If the queue was empty, returns false,
  // in which case `aOutput` is left untouched.
  bool Pop(char aOutput[MESSAGE_LENGTH]) {
    // Similarly, in this paragraph, the two following lines are called A
    // and B, and threads are called thread i, i-1, etc. in order of
    // execution of line A.
    // On line A, the first element of the queue is acquired. It is simply a
    // sentinel node.
    // On line B, we acquire the node that has the data we want. If B is
    // null, then only the sentinel node was present in the queue, we can
    // safely return false.
    // mTail can be loaded with relaxed ordering, since it's not written nor
    // read by any other thread (this queue is single consumer).
    // mNext can be written to by one of the producer, so it's necessary to
    // ensure those writes are seen, hence the stricter ordering.
    Message* tail = mTail.load(std::memory_order_relaxed);
    Message* next = tail->mNext.load(std::memory_order_acquire);

    if (next == nullptr) {
      return false;
    }

    strncpy(aOutput, next->data, MESSAGE_LENGTH);

    // Simply shift the queue one node further, so that the sentinel node is
    // now pointing to the correct most ancient node. It contains stale data,
    // but this data will never be read again.
    // It's only necessary to ensure the previous load on this thread is not
    // reordered past this line, so release ordering is sufficient here.
    mTail.store(next, std::memory_order_release);

    // This thread is now the only thing that points to `tail`, it can be
    // safely deleted.
    delete tail;

    return true;
  }

 private:
  // An atomic pointer to the most recent message in the queue.
  std::atomic<Message*> mHead;
  // An atomic pointer to a sentinel node, that points to the oldest message
  // in the queue.
  std::atomic<Message*> mTail;

  MPSCQueue(const MPSCQueue&) = delete;
  void operator=(const MPSCQueue&) = delete;

 public:
  // The goal here is to make it easy on the allocator. We pack a pointer in the
  // message struct, and we still want to do power of two allocations to
  // minimize allocator slop. The allocation size are going to be constant, so
  // the allocation is probably going to hit the thread local cache in jemalloc,
  // making it cheap and, more importantly, lock-free enough.
  static const size_t MESSAGE_PADDING = sizeof(Message::mNext);

 private:
  static_assert(IsPowerOfTwo(MESSAGE_LENGTH + MESSAGE_PADDING),
                "MPSCQueue internal allocations must have a size that is a"
                "power of two ");
};
}  // end namespace detail

// This class implements a lock-free asynchronous logger, that outputs to
// MOZ_LOG.
// Any thread can use this logger without external synchronization and without
// being blocked. This log is suitable for use in real-time audio threads.
// Log formatting is best done externally, this class implements the output
// mechanism only.
// This class uses a thread internally, and must be started and stopped
// manually.
// If logging is disabled, all the calls are no-op.
class AsyncLogger {
 public:
  static const uint32_t MAX_MESSAGE_LENGTH =
      512 - detail::MPSCQueue<sizeof(void*)>::MESSAGE_PADDING;

  // aLogModuleName is the name of the MOZ_LOG module.
  explicit AsyncLogger(const char* aLogModuleName)
      : mThread(nullptr), mLogModule(aLogModuleName), mRunning(false) {}

  ~AsyncLogger() {
    if (Enabled()) {
      Stop();
    }
  }

  void Start() {
    MOZ_ASSERT(!mRunning, "Double calls to AsyncLogger::Start");
    if (Enabled()) {
      mRunning = true;
      Run();
    }
  }

  void Stop() {
    if (Enabled()) {
      if (mRunning) {
        mRunning = false;
        mThread->join();
      }
    } else {
      MOZ_ASSERT(!mRunning && !mThread);
    }
  }

  void Log(const char* format, ...) MOZ_FORMAT_PRINTF(2, 3) {
    if (Enabled()) {
      auto* msg = new detail::MPSCQueue<MAX_MESSAGE_LENGTH>::Message();
      va_list args;
      va_start(args, format);
      VsprintfLiteral(msg->data, format, args);
      va_end(args);
      mMessageQueue.Push(msg);
    }
  }

  bool Enabled() {
    return MOZ_LOG_TEST(mLogModule, mozilla::LogLevel::Verbose);
  }

 private:
  void Run() {
    MOZ_ASSERT(Enabled());
    mThread.reset(new std::thread([this]() {
      while (mRunning) {
        char message[MAX_MESSAGE_LENGTH];
        while (mMessageQueue.Pop(message) && mRunning) {
          MOZ_LOG(mLogModule, mozilla::LogLevel::Verbose, ("%s", message));
        }
        Sleep();
      }
    }));
  }

  void Sleep() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); }

  std::unique_ptr<std::thread> mThread;
  mozilla::LazyLogModule mLogModule;
  detail::MPSCQueue<MAX_MESSAGE_LENGTH> mMessageQueue;
  std::atomic<bool> mRunning;
};

}  // end namespace mozilla

#endif  // mozilla_dom_AsyncLogger_h