Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions ntcore/src/main/native/cpp/InstanceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ int InstanceImpl::AllocImpl() {
}

void InstanceImpl::Destroy(int inst) {
std::scoped_lock lock(s_mutex);
if (inst < 0 || inst >= kNumInstances) {
return;
}
if (auto impl = Get(inst)) {
impl->listenerStorage.Stop();

delete s_instances[inst].exchange(nullptr);
std::scoped_lock lock{s_mutex};
delete s_instances[inst].exchange(nullptr);
}
}

void InstanceImpl::StartLocal() {
Expand Down Expand Up @@ -230,6 +230,12 @@ void InstanceImpl::AddTimeSyncListener(NT_Listener listener,
}

void InstanceImpl::Reset() {
// Listeners sometimes call NetworkTables APIs, so reset listenerStorage
// first. Note that if there are queued events, this will wait until
// listeners are called, and listeners may use this instance, so holding
// the lock could result in deadlock.
listenerStorage.Reset();

std::scoped_lock lock{m_mutex};
m_networkServer.reset();
m_networkClient.reset();
Expand All @@ -238,7 +244,6 @@ void InstanceImpl::Reset() {
m_serverTimeOffset.reset();
m_rtt2 = 0;

listenerStorage.Reset();
// connectionList should have been cleared by destroying networkClient/server
localStorage.Reset();
}
16 changes: 14 additions & 2 deletions ntcore/src/main/native/cpp/ListenerStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ bool ListenerStorage::WaitForListenerQueue(double timeout) {
}

void ListenerStorage::Reset() {
Stop(); // If a callback is currently running, wait for it to complete.

std::scoped_lock lock{m_mutex};
m_pollers.clear();
m_listeners.clear();
Expand All @@ -355,9 +357,19 @@ void ListenerStorage::Reset() {
m_valueListeners.clear();
m_logListeners.clear();
m_timeSyncListeners.clear();
if (m_thread) {
m_thread.Stop();
}

void ListenerStorage::Stop() {
{
std::scoped_lock lock{m_mutex};
if (auto thr = m_thread.GetThread()) {
thr->m_waitQueueWakeup.Set();
} else {
return;
}
}

m_thread.Join();
}

std::vector<std::pair<NT_Listener, unsigned int>>
Expand Down
2 changes: 2 additions & 0 deletions ntcore/src/main/native/cpp/ListenerStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class ListenerStorage final : public IListenerStorage {

void Reset();

void Stop();

private:
// these assume the mutex is already held
NT_Listener DoAddListener(NT_ListenerPoller pollerHandle);
Expand Down
134 changes: 133 additions & 1 deletion ntcore/src/test/native/cpp/TableListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.

#include <atomic>
#include <memory>

#include <gtest/gtest.h>

#include "TestPrinters.h"
#include "Handle.h"
#include "InstanceImpl.h"
#include "gmock/gmock.h"
#include "networktables/DoubleTopic.h"
#include "networktables/IntegerTopic.h"
#include "networktables/NetworkTableInstance.h"
#include "ntcore_cpp.h"

Expand Down Expand Up @@ -41,6 +44,23 @@ void TableListenerTest::PublishTopics() {
m_bazvalue = m_inst.GetDoubleTopic("/baz/bazvalue").Publish();
}

// Matchers for checking the validity of a NetworkTableInstance.
//
// Note: These tests cannot simply rely on inst.m_handle since
// NetworkTableInstance::Destroy() zeroeds it after it calls
// DestroyInstance().
MATCHER(HasHandle, "a non-zero handle") {
return !!arg.GetHandle();
}
MATCHER(MapsToInstanceImpl, "a handle mapping to an InstanceImpl") {
auto handle = arg.GetHandle();
if (!handle) {
return ExplainMatchResult(HasHandle(), arg, result_listener);
}
int inst = nt::Handle{handle}.GetTypedInst(nt::Handle::kInstance);
return nt::InstanceImpl::Get(inst) != nullptr;
}

TEST_F(TableListenerTest, AddListener) {
auto table = m_inst.GetTable("/foo");
MockTableEventListener listener;
Expand All @@ -51,6 +71,118 @@ TEST_F(TableListenerTest, AddListener) {
EXPECT_TRUE(m_inst.WaitForListenerQueue(1.0));
}

TEST_F(TableListenerTest, DestroyInstanceWhileInCallack) {
std::atomic_bool destroyCalled;
std::atomic_bool destroyReturned;
std::atomic_bool majorFailureDetected;
std::atomic_bool callbackWokeUp;
std::atomic_bool callbackSuccessful;
std::atomic_bool destroyerSuccessful;
auto listenerCalledEvent = WPI_CreateEvent(false, false);
auto listenerDoneEvent = WPI_CreateEvent(false, false);
auto destroyerThreadStartedEvent = WPI_CreateEvent(false, false);
auto destroyerThreadReadyEvent = WPI_CreateEvent(false, false);
auto destroyerThreadDoneEvent = WPI_CreateEvent(false, false);
auto exitListenerEvent = WPI_CreateEvent(false, false);
auto table = m_inst.GetTable("/Preferences");

table->AddListener(
NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE,
[&](auto table, auto key, auto& event) {
EXPECT_THAT(m_inst, HasHandle());
EXPECT_THAT(m_inst, MapsToInstanceImpl());
wpi::SetEvent(listenerCalledEvent);
SCOPED_TRACE(
"[Listener] Sent listenerCalledEvent; waiting for "
"destroyerThreadStartedEvent");

bool timedOut;
EXPECT_TRUE(
wpi::WaitForObject(destroyerThreadStartedEvent, 1.0, &timedOut));
if (!timedOut) {
SCOPED_TRACE(
"[Listener] Received destroyerThreadStartedEvent; waiting for "
"exitListenerEvent");

// Block Destory()
EXPECT_TRUE(wpi::WaitForObject(exitListenerEvent, 2.0, &timedOut));
callbackWokeUp = true;

if (!timedOut) {
SCOPED_TRACE("[Listener] Received exitListenerEvent");
EXPECT_TRUE(destroyCalled);
if (destroyCalled) {
EXPECT_FALSE(destroyReturned);
if (!destroyReturned) {
EXPECT_THAT(m_inst, HasHandle());
EXPECT_THAT(m_inst, MapsToInstanceImpl());
}
}
}
}

wpi::SetEvent(listenerDoneEvent);
SCOPED_TRACE("[Listener] Sent listenerDoneEvent; exiting");
ASSERT_FALSE(timedOut || majorFailureDetected);
callbackSuccessful = true;
});

auto publisher = m_inst.GetIntegerTopic("/Preferences/key").Publish();
ASSERT_TRUE(wpi::WaitForObject(listenerCalledEvent, 1.0, NULL));

ASSERT_THAT(m_inst, HasHandle());
ASSERT_THAT(m_inst, MapsToInstanceImpl());

// Call Destroy() in a separate thread, in case in hangs.
// Note: After the thread is created, use EXPECT_*() tests until thread
// joined.
SCOPED_TRACE("[Test thread] Starting destroyer thread");
auto destroyerThread = std::thread([&]() {
wpi::SetEvent(destroyerThreadStartedEvent);
EXPECT_FALSE(callbackWokeUp);
destroyCalled = true;
wpi::SetEvent(destroyerThreadReadyEvent);
SCOPED_TRACE("[Destroyer thread] Calling Destroy()");

nt::NetworkTableInstance::Destroy(m_inst);
SCOPED_TRACE("[Destroyer thread] Returned from Destroy()");
destroyReturned = true;

EXPECT_FALSE(m_inst);
EXPECT_TRUE(callbackWokeUp);
destroyerSuccessful = callbackWokeUp && !m_inst;
wpi::SetEvent(destroyerThreadDoneEvent);
});

bool timedOut;
EXPECT_TRUE(wpi::WaitForObject(destroyerThreadReadyEvent, 2.0, &timedOut));
if (timedOut) {
SCOPED_TRACE(
"[Test thread] Timed out waiting for destroyerThreadReadyEvent");
majorFailureDetected = true; // Ensure traces from the listener are shown
wpi::SetEvent(exitListenerEvent);
wpi::WaitForObject(listenerDoneEvent, 3.0, NULL);
ASSERT_TRUE(majorFailureDetected);
return;
}
SCOPED_TRACE("[Test thread] Received destroyerThreadReadyEvent");

// Wait long enough to ensure destroyerThread is blocked inside Destroy()
std::this_thread::sleep_for(std::chrono::milliseconds(4));
SCOPED_TRACE("[Test thread] Sending exitListenerEvent");
wpi::SetEvent(exitListenerEvent);

EXPECT_TRUE(wpi::WaitForObject(listenerDoneEvent, 3.0, NULL));
EXPECT_TRUE(wpi::WaitForObject(destroyerThreadDoneEvent, 1.0, &timedOut));
if (!timedOut && destroyerThread.joinable()) {
destroyerThread.join();
}

EXPECT_TRUE(callbackSuccessful);
EXPECT_TRUE(destroyerSuccessful);
EXPECT_FALSE(m_inst);
}

TEST_F(TableListenerTest, AddSubTableListener) {
auto table = m_inst.GetTable("/foo");
MockSubTableListener listener;
Expand Down
Loading