Skip to content

Commit

Permalink
engine/activity: fixed blocking calls to ExecutionEngine::process()
Browse files Browse the repository at this point in the history
Broadcasting condition variables without holding the associated mutex can block for some reasons.
This patch adds MutexLocks whereever the msg_cond is triggered.

From http://pubs.opengroup.org/onlinepubs/009695399/functions/pthread_cond_broadcast.html:
> The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether
> or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait()
> have associated with the condition variable during their waits; however, if predictable scheduling
> behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or
> pthread_cond_signal().

This patch also clears the Thread active flag in Thread::terminate() is called and forces the thread
function to be terminated in the destructor of class Activity. This solves some issues with pure virtual
method calls during the destruction phase if threads are still running and triggering each other.

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>
  • Loading branch information
meyerj committed Apr 13, 2015
1 parent d0f7919 commit 58cb4bb
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
10 changes: 8 additions & 2 deletions rtt/Activity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ namespace RTT
Activity::~Activity()
{
stop();

// We need to join the activity's thread before destruction as the thread function might still
// access member variables. Activity::stop() does not guarantuee to stop the underlying thread.
terminate();
}

os::ThreadInterface* Activity::thread() {
Expand Down Expand Up @@ -142,7 +146,10 @@ namespace RTT
if ( ! Thread::isActive() )
return false;
//a trigger is always allowed when active
msg_cond.broadcast();
{
os::MutexLock lock(msg_lock);
msg_cond.broadcast();
}
Thread::start();
return true;
}
Expand All @@ -158,7 +165,6 @@ namespace RTT
return false;
}
mtimeout = true;
msg_cond.broadcast();
Thread::start();
return true;
}
Expand Down
24 changes: 14 additions & 10 deletions rtt/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ namespace RTT
assert(foo);
if ( foo->execute() == false ){
foo->unloaded();
os::MutexLock lock(msg_lock);
msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
} else {
f_queue->enqueue( foo );
Expand Down Expand Up @@ -211,16 +212,16 @@ namespace RTT
assert( com );
com->executeAndDispose();
}
// there's no need to hold the lock during
// emptying the queue. But we must hold the
// lock once between excuteAndDispose and the
// broadcast to avoid the race condition in
// waitForMessages().
// This allows us to recurse into processMessages.
MutexLock locker( msg_lock );
}
if ( com )
msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)

// there's no need to hold the lock during
// emptying the queue. But we must hold the
// lock once between excuteAndDispose and the
// broadcast to avoid the race condition in
// waitForMessages().
// This allows us to recurse into processMessages.
MutexLock locker( msg_lock );
msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
}

bool ExecutionEngine::process( DisposableInterface* c )
Expand All @@ -237,7 +238,10 @@ namespace RTT

bool result = mqueue->enqueue( c );
this->getActivity()->trigger();
msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
{
os::MutexLock lock(msg_lock);
msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
}
return result;
}
return false;
Expand Down
1 change: 1 addition & 0 deletions rtt/os/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ namespace RTT {
rtos_sem_signal(&sem);

rtos_task_delete(&rtos_task); // this must join the thread.
active = false;
}

const char* Thread::getName() const
Expand Down

0 comments on commit 58cb4bb

Please # to comment.