Source code

Revision control

Copy as Markdown

Other Tools

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include <functional>
#include <queue>
#include <string>
#include <utility>
#include "MainThreadUtils.h"
#include "gtest/gtest.h"
#include "mozilla/Attributes.h"
#include "mozilla/CondVar.h"
#include "mozilla/gtest/MozAssertions.h"
#include "mozilla/Mutex.h"
#include "mozilla/RefPtr.h"
#include "mozilla/ThrottledEventQueue.h"
#include "nsCOMPtr.h"
#include "nsError.h"
#include "nsIRunnable.h"
#include "nsISerialEventTarget.h"
#include "nsIThread.h"
#include "nsThreadUtils.h"
#include "prinrval.h"
using mozilla::CondVar;
using mozilla::MakeRefPtr;
using mozilla::Mutex;
using mozilla::MutexAutoLock;
using mozilla::ThrottledEventQueue;
using std::function;
using std::string;
namespace TestThrottledEventQueue {
// A simple queue of runnables, to serve as the base target of
// ThrottledEventQueues in tests.
//
// This is much simpler than mozilla::TaskQueue, and so better for unit tests.
// It's about the same as mozilla::EventQueue, but that doesn't implement
// nsIEventTarget, so it can't be the base target of a ThrottledEventQueue.
struct RunnableQueue : nsISerialEventTarget {
std::queue<nsCOMPtr<nsIRunnable>> runnables;
bool IsEmpty() { return runnables.empty(); }
size_t Length() { return runnables.size(); }
[[nodiscard]] nsresult Run() {
while (!runnables.empty()) {
auto runnable = std::move(runnables.front());
runnables.pop();
nsresult rv = runnable->Run();
if (NS_FAILED(rv)) return rv;
}
return NS_OK;
}
// nsIEventTarget methods
[[nodiscard]] NS_IMETHODIMP Dispatch(already_AddRefed<nsIRunnable> aRunnable,
uint32_t aFlags) override {
MOZ_ALWAYS_TRUE(aFlags == nsIEventTarget::DISPATCH_NORMAL);
runnables.push(aRunnable);
return NS_OK;
}
[[nodiscard]] NS_IMETHODIMP DispatchFromScript(nsIRunnable* aRunnable,
uint32_t aFlags) override {
RefPtr<nsIRunnable> r = aRunnable;
return Dispatch(r.forget(), aFlags);
}
NS_IMETHOD_(bool)
IsOnCurrentThreadInfallible(void) override { return NS_IsMainThread(); }
[[nodiscard]] NS_IMETHOD IsOnCurrentThread(bool* retval) override {
*retval = IsOnCurrentThreadInfallible();
return NS_OK;
}
[[nodiscard]] NS_IMETHODIMP DelayedDispatch(
already_AddRefed<nsIRunnable> aEvent, uint32_t aDelay) override {
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD RegisterShutdownTask(nsITargetShutdownTask*) override {
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD UnregisterShutdownTask(nsITargetShutdownTask*) override {
return NS_ERROR_NOT_IMPLEMENTED;
}
// nsISupports methods
NS_DECL_THREADSAFE_ISUPPORTS
private:
virtual ~RunnableQueue() = default;
};
NS_IMPL_ISUPPORTS(RunnableQueue, nsIEventTarget, nsISerialEventTarget)
static void Enqueue(nsIEventTarget* target, function<void()>&& aCallable) {
nsresult rv = target->Dispatch(
NS_NewRunnableFunction("TEQ GTest", std::move(aCallable)));
MOZ_ALWAYS_TRUE(NS_SUCCEEDED(rv));
}
} // namespace TestThrottledEventQueue
using namespace TestThrottledEventQueue;
TEST(ThrottledEventQueue, RunnableQueue)
{
string log;
RefPtr<RunnableQueue> queue = MakeRefPtr<RunnableQueue>();
Enqueue(queue, [&]() { log += 'a'; });
Enqueue(queue, [&]() { log += 'b'; });
Enqueue(queue, [&]() { log += 'c'; });
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(queue->Run());
ASSERT_EQ(log, "abc");
}
TEST(ThrottledEventQueue, SimpleDispatch)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 1");
Enqueue(throttled, [&]() { log += 'a'; });
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "a");
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
}
TEST(ThrottledEventQueue, MixedDispatch)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 2");
// A ThrottledEventQueue limits its impact on the base target by only queuing
// its next event on the base once the prior event has been run. What it
// actually queues on the base is a sort of proxy event called an
// "executor": the base running the executor draws an event from the
// ThrottledEventQueue and runs that. If the ThrottledEventQueue has further
// events, it re-queues the executor on the base, effectively "going to the
// back of the line".
// Queue an event on the ThrottledEventQueue. This also queues the "executor"
// event on the base.
Enqueue(throttled, [&]() { log += 'a'; });
ASSERT_EQ(throttled->Length(), 1U);
ASSERT_EQ(base->Length(), 1U);
// Add a second event to the throttled queue. The executor is already queued.
Enqueue(throttled, [&]() { log += 'b'; });
ASSERT_EQ(throttled->Length(), 2U);
ASSERT_EQ(base->Length(), 1U);
// Add an event directly to the base, after the executor.
Enqueue(base, [&]() { log += 'c'; });
ASSERT_EQ(throttled->Length(), 2U);
ASSERT_EQ(base->Length(), 2U);
// Run the base target. This runs:
// - the executor, which runs the first event from the ThrottledEventQueue,
// and re-enqueues itself
// - the event queued directly on the base
// - the executor again, which runs the second event from the
// ThrottledEventQueue.
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "acb");
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
}
TEST(ThrottledEventQueue, EnqueueFromRun)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 3");
// When an event from the throttled queue dispatches a new event directly to
// the base target, it is queued after the executor, so the next event from
// the throttled queue will run before it.
Enqueue(base, [&]() { log += 'a'; });
Enqueue(throttled, [&]() {
log += 'b';
Enqueue(base, [&]() { log += 'c'; });
});
Enqueue(throttled, [&]() { log += 'd'; });
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "abdc");
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
}
TEST(ThrottledEventQueue, RunFromRun)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 4");
// Running the event queue from within an event (i.e., a nested event loop)
// does not stall the ThrottledEventQueue.
Enqueue(throttled, [&]() {
log += '(';
// This should run subsequent events from throttled.
ASSERT_NS_SUCCEEDED(base->Run());
log += ')';
});
Enqueue(throttled, [&]() { log += 'a'; });
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "(a)");
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
}
TEST(ThrottledEventQueue, DropWhileRunning)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
// If we drop the event queue while it still has events, they still run.
{
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 5");
Enqueue(throttled, [&]() { log += 'a'; });
}
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "a");
}
TEST(ThrottledEventQueue, AwaitIdle)
{
Mutex mutex MOZ_UNANNOTATED("TEQ AwaitIdle");
CondVar cond(mutex, "TEQ AwaitIdle");
string dequeue_await; // mutex
bool threadFinished = false; // mutex & cond
bool runnableFinished = false; // main thread only
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 6");
// Put an event in the queue so the AwaitIdle might block.
Enqueue(throttled, [&]() { runnableFinished = true; });
// Create a separate thread that waits for the queue to become idle, and
// then takes observable action.
nsCOMPtr<nsIRunnable> await = NS_NewRunnableFunction("TEQ AwaitIdle", [&]() {
throttled->AwaitIdle();
MutexAutoLock lock(mutex);
dequeue_await += " await";
threadFinished = true;
cond.Notify();
});
nsCOMPtr<nsIThread> thread;
nsresult rv =
NS_NewNamedThread("TEQ AwaitIdle", getter_AddRefs(thread), await);
ASSERT_NS_SUCCEEDED(rv);
// We can't guarantee that the thread has reached the AwaitIdle call, but we
// can get pretty close. Either way, it shouldn't affect the behavior of the
// test.
PR_Sleep(PR_MillisecondsToInterval(100));
// Drain the queue.
{
MutexAutoLock lock(mutex);
ASSERT_EQ(dequeue_await, "");
dequeue_await += "dequeue";
ASSERT_FALSE(threadFinished);
}
ASSERT_FALSE(runnableFinished);
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_TRUE(runnableFinished);
// Wait for the thread to finish.
{
MutexAutoLock lock(mutex);
while (!threadFinished) cond.Wait();
ASSERT_EQ(dequeue_await, "dequeue await");
}
ASSERT_NS_SUCCEEDED(thread->Shutdown());
}
TEST(ThrottledEventQueue, AwaitIdleMixed)
{
// Create a separate thread that waits for the queue to become idle, and
// then takes observable action.
nsCOMPtr<nsIThread> thread;
ASSERT_TRUE(NS_SUCCEEDED(
NS_NewNamedThread("AwaitIdleMixed", getter_AddRefs(thread))));
Mutex mutex MOZ_UNANNOTATED("AwaitIdleMixed");
CondVar cond(mutex, "AwaitIdleMixed");
// The following are protected by mutex and cond, above.
string log;
bool threadStarted = false;
bool threadFinished = false;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 7");
Enqueue(throttled, [&]() {
MutexAutoLock lock(mutex);
log += 'a';
});
Enqueue(throttled, [&]() {
MutexAutoLock lock(mutex);
log += 'b';
});
nsCOMPtr<nsIRunnable> await = NS_NewRunnableFunction("AwaitIdleMixed", [&]() {
{
MutexAutoLock lock(mutex);
// Note that we are about to begin awaiting. When the main thread sees
// this notification, it will begin draining the queue.
log += '(';
threadStarted = true;
cond.Notify();
}
// Wait for the main thread to drain the TEQ.
throttled->AwaitIdle();
{
MutexAutoLock lock(mutex);
// Note that we have finished awaiting.
log += ')';
threadFinished = true;
cond.Notify();
}
});
{
MutexAutoLock lock(mutex);
ASSERT_EQ(log, "");
}
ASSERT_NS_SUCCEEDED(thread->Dispatch(await.forget()));
// Wait for the thread to be ready to await. We can't be sure it will actually
// be blocking before we get around to draining the event queue, but that's
// the nature of the API; this test should work even if we drain the queue
// before it awaits.
{
MutexAutoLock lock(mutex);
while (!threadStarted) cond.Wait();
ASSERT_EQ(log, "(");
}
// Let the queue drain.
ASSERT_NS_SUCCEEDED(base->Run());
{
MutexAutoLock lock(mutex);
// The first runnable must always finish before AwaitIdle returns. But the
// TEQ notifies the condition variable as soon as it dequeues the last
// runnable, without waiting for that runnable to complete. So the thread
// and the last runnable could run in either order. Or, we might beat the
// thread to the mutex.
//
// (The only combination excluded here is "(a)": the 'b' runnable should
// definitely have run.)
ASSERT_TRUE(log == "(ab" || log == "(a)b" || log == "(ab)");
while (!threadFinished) cond.Wait();
ASSERT_TRUE(log == "(a)b" || log == "(ab)");
}
ASSERT_NS_SUCCEEDED(thread->Shutdown());
}
TEST(ThrottledEventQueue, SimplePauseResume)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 8");
ASSERT_FALSE(throttled->IsPaused());
Enqueue(throttled, [&]() { log += 'a'; });
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "a");
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(true));
ASSERT_TRUE(throttled->IsPaused());
Enqueue(throttled, [&]() { log += 'b'; });
ASSERT_EQ(log, "a");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "a");
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(false));
ASSERT_FALSE(throttled->IsPaused());
ASSERT_EQ(log, "a");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "ab");
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
}
TEST(ThrottledEventQueue, MixedPauseResume)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 9");
ASSERT_FALSE(throttled->IsPaused());
Enqueue(base, [&]() { log += 'A'; });
Enqueue(throttled, [&]() {
log += 'b';
MOZ_ALWAYS_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
});
Enqueue(throttled, [&]() { log += 'c'; });
Enqueue(base, [&]() { log += 'D'; });
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
// Since the 'b' event paused the throttled queue, 'c' should not have run.
// but 'D' was enqueued directly on the base, and should have run.
ASSERT_EQ(log, "AbD");
ASSERT_TRUE(base->IsEmpty());
ASSERT_FALSE(throttled->IsEmpty());
ASSERT_TRUE(throttled->IsPaused());
Enqueue(base, [&]() { log += 'E'; });
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(false));
Enqueue(base, [&]() { log += 'F'; });
ASSERT_FALSE(throttled->IsPaused());
ASSERT_NS_SUCCEEDED(base->Run());
// Since we've unpaused, 'c' should be able to run now. The executor should
// have been enqueued between 'E' and 'F'.
ASSERT_EQ(log, "AbDEcF");
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
}
TEST(ThrottledEventQueue, AwaitIdlePaused)
{
Mutex mutex MOZ_UNANNOTATED("AwaitIdlePaused");
CondVar cond(mutex, "AwaitIdlePaused");
string dequeue_await; // mutex
bool threadFinished = false; // mutex & cond
bool runnableFinished = false; // main thread only
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 10");
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(true));
// Put an event in the queue so the AwaitIdle might block. Since throttled is
// paused, this should not enqueue an executor in the base target.
Enqueue(throttled, [&]() { runnableFinished = true; });
ASSERT_TRUE(base->IsEmpty());
// Create a separate thread that waits for the queue to become idle, and
// then takes observable action.
nsCOMPtr<nsIRunnable> await =
NS_NewRunnableFunction("AwaitIdlePaused", [&]() {
throttled->AwaitIdle();
MutexAutoLock lock(mutex);
dequeue_await += " await";
threadFinished = true;
cond.Notify();
});
nsCOMPtr<nsIThread> thread;
nsresult rv =
NS_NewNamedThread("AwaitIdlePaused", getter_AddRefs(thread), await);
ASSERT_NS_SUCCEEDED(rv);
// We can't guarantee that the thread has reached the AwaitIdle call, but we
// can get pretty close. Either way, it shouldn't affect the behavior of the
// test.
PR_Sleep(PR_MillisecondsToInterval(100));
// The AwaitIdle call should be blocked, even though there is no executor,
// because throttled is paused.
{
MutexAutoLock lock(mutex);
ASSERT_EQ(dequeue_await, "");
dequeue_await += "dequeue";
ASSERT_FALSE(threadFinished);
}
// A paused TEQ contributes no events to its base target. (This is covered by
// other tests...)
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_TRUE(base->IsEmpty());
ASSERT_FALSE(throttled->IsEmpty());
// Resume and drain the queue.
ASSERT_FALSE(runnableFinished);
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(false));
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_TRUE(base->IsEmpty());
ASSERT_TRUE(throttled->IsEmpty());
ASSERT_TRUE(runnableFinished);
// Wait for the thread to finish.
{
MutexAutoLock lock(mutex);
while (!threadFinished) cond.Wait();
ASSERT_EQ(dequeue_await, "dequeue await");
}
ASSERT_NS_SUCCEEDED(thread->Shutdown());
}
TEST(ThrottledEventQueue, ExecutorTransitions)
{
string log;
auto base = MakeRefPtr<RunnableQueue>();
RefPtr<ThrottledEventQueue> throttled =
ThrottledEventQueue::Create(base, "test queue 11");
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(true));
// Since we're paused, queueing an event on throttled shouldn't queue the
// executor on the base target.
Enqueue(throttled, [&]() { log += 'a'; });
ASSERT_EQ(throttled->Length(), 1U);
ASSERT_EQ(base->Length(), 0U);
// Resuming throttled should create the executor, since throttled is not
// empty.
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(false));
ASSERT_EQ(throttled->Length(), 1U);
ASSERT_EQ(base->Length(), 1U);
// Pausing can't remove the executor from the base target since we've already
// queued it there, but it can ensure that it doesn't do anything.
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(true));
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "");
ASSERT_EQ(throttled->Length(), 1U);
ASSERT_EQ(base->Length(), 0U);
// As before, resuming must create the executor, since throttled is not empty.
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(false));
ASSERT_EQ(throttled->Length(), 1U);
ASSERT_EQ(base->Length(), 1U);
ASSERT_EQ(log, "");
ASSERT_NS_SUCCEEDED(base->Run());
ASSERT_EQ(log, "a");
ASSERT_EQ(throttled->Length(), 0U);
ASSERT_EQ(base->Length(), 0U);
// Since throttled is empty, pausing and resuming now should not enqueue an
// executor.
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(true));
ASSERT_NS_SUCCEEDED(throttled->SetIsPaused(false));
ASSERT_EQ(throttled->Length(), 0U);
ASSERT_EQ(base->Length(), 0U);
}