DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (c584e29f6ac2)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

/* Single producer single consumer lock-free and wait-free queue. */

#ifndef mozilla_LockFreeQueue_h
#define mozilla_LockFreeQueue_h

#include "mozilla/Assertions.h"
#include "mozilla/Attributes.h"
#include "mozilla/PodOperations.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>

namespace mozilla {

namespace details {
template <typename T, bool IsPod = std::is_trivial<T>::value>
struct MemoryOperations {
  /**
   * This allows zeroing (using memset) or default-constructing a number of
   * elements calling the constructors if necessary.
   */
  static void ConstructDefault(T* aDestination, size_t aCount);
  /**
   * This allows either moving (if T supports it) or copying a number of
   * elements from a `aSource` pointer to a `aDestination` pointer.
   * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
   * constructors and destructors are called in a loop.
   */
  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
};

template <typename T>
struct MemoryOperations<T, true> {
  static void ConstructDefault(T* aDestination, size_t aCount) {
    PodZero(aDestination, aCount);
  }
  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
    PodCopy(aDestination, aSource, aCount);
  }
};

template <typename T>
struct MemoryOperations<T, false> {
  static void ConstructDefault(T* aDestination, size_t aCount) {
    for (size_t i = 0; i < aCount; i++) {
      aDestination[i] = T();
    }
  }
  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
    std::move(aSource, aSource + aCount, aDestination);
  }
};
}  // namespace details

/**
 * This data structure allows producing data from one thread, and consuming it
 * on another thread, safely and without explicit synchronization.
 *
 * The role for the producer and the consumer must be constant, i.e., the
 * producer should always be on one thread and the consumer should always be on
 * another thread.
 *
 * Some words about the inner workings of this class:
 * - Capacity is fixed. Only one allocation is performed, in the constructor.
 *   When reading and writing, the return value of the method allows checking if
 *   the ring buffer is empty or full.
 * - We always keep the read index at least one element ahead of the write
 *   index, so we can distinguish between an empty and a full ring buffer: an
 *   empty ring buffer is when the write index is at the same position as the
 *   read index. A full buffer is when the write index is exactly one position
 *   before the read index.
 * - We synchronize updates to the read index after having read the data, and
 *   the write index after having written the data. This means that the each
 *   thread can only touch a portion of the buffer that is not touched by the
 *   other thread.
 * - Callers are expected to provide buffers. When writing to the queue,
 *   elements are copied into the internal storage from the buffer passed in.
 *   When reading from the queue, the user is expected to provide a buffer.
 *   Because this is a ring buffer, data might not be contiguous in memory;
 *   providing an external buffer to copy into is an easy way to have linear
 *   data for further processing.
 */
template <typename T>
class SPSCRingBufferBase {
 public:
  /**
   * Constructor for a ring buffer.
   *
   * This performs an allocation on the heap, but is the only allocation that
   * will happen for the life time of a `SPSCRingBufferBase`.
   *
   * @param Capacity The maximum number of element this ring buffer will hold.
   */
  explicit SPSCRingBufferBase(int aCapacity)
      : mReadIndex(0),
        mWriteIndex(0)
        /* One more element to distinguish from empty and full buffer. */
        ,
        mCapacity(aCapacity + 1) {
    MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2,
               "buffer too large for the type of index used.");
    MOZ_ASSERT(mCapacity > 0 && aCapacity != std::numeric_limits<int>::max());

    mData = std::make_unique<T[]>(StorageCapacity());

    std::atomic_thread_fence(std::memory_order::memory_order_seq_cst);
  }
  /**
   * Push `aCount` zero or default constructed elements in the array.
   *
   * Only safely called on the producer thread.
   *
   * @param count The number of elements to enqueue.
   * @return The number of element enqueued.
   */
  MOZ_MUST_USE
  int EnqueueDefault(int aCount) { return Enqueue(nullptr, aCount); }
  /**
   * @brief Put an element in the queue.
   *
   * Only safely called on the producer thread.
   *
   * @param element The element to put in the queue.
   *
   * @return 1 if the element was inserted, 0 otherwise.
   */
  MOZ_MUST_USE
  int Enqueue(T& aElement) { return Enqueue(&aElement, 1); }
  /**
   * Push `aCount` elements in the ring buffer.
   *
   * Only safely called on the producer thread.
   *
   * @param elements a pointer to a buffer containing at least `count` elements.
   * If `elements` is nullptr, zero or default constructed elements are enqueud.
   * @param count The number of elements to read from `elements`
   * @return The number of elements successfully coped from `elements` and
   * inserted into the ring buffer.
   */
  MOZ_MUST_USE
  int Enqueue(T* aElements, int aCount) {
#ifdef DEBUG
    AssertCorrectThread(mProducerId);
#endif

    int rdIdx = mReadIndex.load(std::memory_order::memory_order_acquire);
    int wrIdx = mWriteIndex.load(std::memory_order::memory_order_relaxed);

    if (IsFull(rdIdx, wrIdx)) {
      return 0;
    }

    int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);

    /* First part, from the write index to the end of the array. */
    int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
    /* Second part, from the beginning of the array */
    int secondPart = toWrite - firstPart;

    if (aElements) {
      details::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements,
                                               firstPart);
      details::MemoryOperations<T>::MoveOrCopy(
          mData.get(), aElements + firstPart, secondPart);
    } else {
      details::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx,
                                                     firstPart);
      details::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
    }

    mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
                      std::memory_order::memory_order_release);

    return toWrite;
  }
  /**
   * Retrieve at most `count` elements from the ring buffer, and copy them to
   * `elements`, if non-null.
   *
   * Only safely called on the consumer side.
   *
   * @param elements A pointer to a buffer with space for at least `count`
   * elements. If `elements` is `nullptr`, `count` element will be discarded.
   * @param count The maximum number of elements to Dequeue.
   * @return The number of elements written to `elements`.
   */
  MOZ_MUST_USE
  int Dequeue(T* elements, int count) {
#ifdef DEBUG
    AssertCorrectThread(mConsumerId);
#endif

    int wrIdx = mWriteIndex.load(std::memory_order::memory_order_acquire);
    int rdIdx = mReadIndex.load(std::memory_order::memory_order_relaxed);

    if (IsEmpty(rdIdx, wrIdx)) {
      return 0;
    }

    int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);

    int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
    int secondPart = toRead - firstPart;

    if (elements) {
      details::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx,
                                               firstPart);
      details::MemoryOperations<T>::MoveOrCopy(elements + firstPart,
                                               mData.get(), secondPart);
    }

    mReadIndex.store(IncrementIndex(rdIdx, toRead),
                     std::memory_order::memory_order_release);

    return toRead;
  }
  /**
   * Get the number of available elements for consuming.
   *
   * Only safely called on the consumer thread. This can be less than the actual
   * number of elements in the queue, since the mWriteIndex is updated at the
   * very end of the Enqueue method on the producer thread, but consequently
   * always returns a number of elements such that a call to Dequeue return this
   * number of elements.
   *
   * @return The number of available elements for reading.
   */
  int AvailableRead() const {
#ifdef DEBUG
    AssertCorrectThread(mConsumerId);
#endif
    return AvailableReadInternal(
        mReadIndex.load(std::memory_order::memory_order_relaxed),
        mWriteIndex.load(std::memory_order::memory_order_relaxed));
  }
  /**
   * Get the number of available elements for writing.
   *
   * Only safely called on the producer thread. This can be less than than the
   * actual number of slots that are available, because mReadIndex is update at
   * the very end of the Deque method. It always returns a number such that a
   * call to Enqueue with this number will succeed in enqueuing this number of
   * elements.
   *
   * @return The number of empty slots in the buffer, available for writing.
   */
  int AvailableWrite() const {
#ifdef DEBUG
    AssertCorrectThread(mProducerId);
#endif
    return AvailableWriteInternal(
        mReadIndex.load(std::memory_order::memory_order_relaxed),
        mWriteIndex.load(std::memory_order::memory_order_relaxed));
  }
  /**
   * Get the total Capacity, for this ring buffer.
   *
   * Can be called safely on any thread.
   *
   * @return The maximum Capacity of this ring buffer.
   */
  int Capacity() const { return StorageCapacity() - 1; }
  /**
   * Reset the consumer and producer thread identifier, in case the threads are
   * being changed. This has to be externally synchronized. This is no-op when
   * asserts are disabled.
   */
  void ResetThreadIds() {
#ifdef DEBUG
    mConsumerId = mProducerId = std::thread::id();
#endif
  }

 private:
  /** Return true if the ring buffer is empty.
   *
   * This can be called from the consumer or the producer thread.
   *
   * @param aReadIndex the read index to consider
   * @param writeIndex the write index to consider
   * @return true if the ring buffer is empty, false otherwise.
   **/
  bool IsEmpty(int aReadIndex, int aWriteIndex) const {
    return aWriteIndex == aReadIndex;
  }
  /** Return true if the ring buffer is full.
   *
   * This happens if the write index is exactly one element behind the read
   * index.
   *
   * This can be called from the consummer or the producer thread.
   *
   * @param aReadIndex the read index to consider
   * @param writeIndex the write index to consider
   * @return true if the ring buffer is full, false otherwise.
   **/
  bool IsFull(int aReadIndex, int aWriteIndex) const {
    return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
  }
  /**
   * Return the size of the storage. It is one more than the number of elements
   * that can be stored in the buffer.
   *
   * This can be called from any thread.
   *
   * @return the number of elements that can be stored in the buffer.
   */
  int StorageCapacity() const { return mCapacity; }
  /**
   * Returns the number of elements available for reading.
   *
   * This can be called from the consummer or producer thread, but see the
   * comment in `AvailableRead`.
   *
   * @return the number of available elements for reading.
   */
  int AvailableReadInternal(int aReadIndex, int aWriteIndex) const {
    if (aWriteIndex >= aReadIndex) {
      return aWriteIndex - aReadIndex;
    } else {
      return aWriteIndex + StorageCapacity() - aReadIndex;
    }
  }
  /**
   * Returns the number of empty elements, available for writing.
   *
   * This can be called from the consummer or producer thread, but see the
   * comment in `AvailableWrite`.
   *
   * @return the number of elements that can be written into the array.
   */
  int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const {
    /* We subtract one element here to always keep at least one sample
     * free in the buffer, to distinguish between full and empty array. */
    int rv = aReadIndex - aWriteIndex - 1;
    if (aWriteIndex >= aReadIndex) {
      rv += StorageCapacity();
    }
    return rv;
  }
  /**
   * Increments an index, wrapping it around the storage.
   *
   * Incrementing `mWriteIndex` can be done on the producer thread.
   * Incrementing `mReadIndex` can be done on the consummer thread.
   *
   * @param index a reference to the index to increment.
   * @param increment the number by which `index` is incremented.
   * @return the new index.
   */
  int IncrementIndex(int aIndex, int aIncrement) const {
    MOZ_ASSERT(aIncrement >= 0 && aIncrement < StorageCapacity() &&
               aIndex < StorageCapacity());
    return (aIndex + aIncrement) % StorageCapacity();
  }
  /**
   * @brief This allows checking that Enqueue (resp. Dequeue) are always
   * called by the right thread.
   *
   * The role of the thread are assigned the first time they call Enqueue or
   * Dequeue, and cannot change, except when ResetThreadIds is called..
   *
   * @param id the id of the thread that has called the calling method first.
   */
#ifdef DEBUG
  static void AssertCorrectThread(std::thread::id& aId) {
    if (aId == std::thread::id()) {
      aId = std::this_thread::get_id();
      return;
    }
    MOZ_ASSERT(aId == std::this_thread::get_id());
  }
#endif
  /** Index at which the oldest element is. */
  std::atomic<int> mReadIndex;
  /** Index at which to write new elements. `mWriteIndex` is always at
   * least one element ahead of `mReadIndex`. */
  std::atomic<int> mWriteIndex;
  /** Maximum number of elements that can be stored in the ring buffer. */
  const int mCapacity;
  /** Data storage, of size `mCapacity + 1` */
  std::unique_ptr<T[]> mData;
#ifdef DEBUG
  /** The id of the only thread that is allowed to read from the queue. */
  mutable std::thread::id mConsumerId;
  /** The id of the only thread that is allowed to write from the queue. */
  mutable std::thread::id mProducerId;
#endif
};

/**
 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
 * from two threads, one producer, one consumer (that never change role),
 * without explicit synchronization.
 */
template <typename T>
using SPSCQueue = SPSCRingBufferBase<T>;

}  // namespace mozilla

#endif  // mozilla_LockFreeQueue_h