Tutorial #15: Multithreading

Multithreading is known as concurrently executing multiple threads apparently at the same time.

Usually it is not necessary to create your own threads. Anyway, if a time-consuming calculation needs to be done in the background, a separate thread can be spawned utilizing the System::Thread class.

Additionally the Murl Engine provides the classes System::Semaphore, System::Mutex, System::Locker and System::AtomicSInt32 to synchronize threads.

Header files used for this tutorial:

#include "murl_system.h"
#include "murl_system_thread.h"
#include "murl_system_time.h"
#include "murl_system_semaphore.h"
#include "murl_system_mutex.h"

Quick links to the individual sections in this tutorial:

Thread

The class System::Thread can be used to spawn a new thread. To do so, we create an inner class and derive it from System::Thread.

class MyThread : public System::Thread
{
public:
    MyThread();
    virtual ~MyThread();
protected:
    virtual Bool Run();
};

MyThread mMyThread;

For this class we define a constructor, a destructor and overwrite the method Run().

App::MultithreadingLogic::MyThread::MyThread()
: Thread("MyThread")
{
}

App::MultithreadingLogic::MyThread::~MyThread()
{
    Stop();
    Join();
}

Bool App::MultithreadingLogic::MyThread::Run()
{
    System::Time t(0.1);
    while (mIsRunning)
    {
        Debug::Trace(".");
        System::Sleep(t);
    }
    Debug::Trace("Thread finished");
    return true;
}
Note
Attention! To prevent the release of a thread object while it is running, it is strongly advised to implement a destructor which calls the methods Stop() and Join().

Now we can start and stop our new thread with the methods Start() and Stop() respectively. The method Join() blocks the calling thread until the thread finishes its execution and needs to be called before the thread can be restarted.

if (deviceHandler->WasRawKeyPressed(RAWKEY_T) || mButtonT->WasReleasedInside())
{
    if (mMyThread.Start())
        Debug::Trace("Thread started");
    else
    {
        mMyThread.Stop();
        mMyThread.Join();
        Debug::Trace("Thread stopped");
    }
}

Our new thread can be started and stopped with T (key press or button press) and simply prints every 100 ms a point character into the debug window.

Thread started
.
.
.
Thread finished
Thread stopped

Semaphore

The class System::Semaphore implements a binary semaphore and provides the methods Signal(), Wait() and Try().

Wait() blocks the current thread until Signal() is called (optionally with or without timeout). Try() checks if Signal() has been called but does not block the current thread.

For example, we can use a semaphore to determine if a thread has been finished its execution.

class MyThread2 : public System::Thread
{
public:
    MyThread2();
    virtual ~MyThread2();
    System::Semaphore mSemaphore;
protected:
    virtual Bool Run();
};
MyThread2 mMyThread2;
App::MultithreadingLogic::MyThread2::MyThread2()
: Thread("MyThread2")
{
}

App::MultithreadingLogic::MyThread2::~MyThread2()
{
    Stop();
    Join();
}

Bool App::MultithreadingLogic::MyThread2::Run()
{
    UInt32 progress = 0;
    System::Time t(0.02);
    while (progress < 100)
    {
        progress++;
        System::Sleep(t);
    }
    mSemaphore.Signal();
    return true;
}
if (deviceHandler->WasRawKeyPressed(RAWKEY_S) || mButtonS->WasReleasedInside())
{
    if (mMyThread2.Start())
        Debug::Trace("Thread2 started");
}

Bool threadHasFinished = mMyThread2.mSemaphore.Try();
if (threadHasFinished)
    Debug::Trace("Thread2 has finished");

The thread can be started with S and prints a info message into the debug window when the thread has finished.

Thread2 started
Thread2 has finished

Mutex

A mutex (mutual exclusion) can be used to specify critical sections. Thus, it can be ensured that no two concurrent threads are in a critical section at the same time.

The class System::Mutex provides the methods Lock() and Unlock() for this purpose.

mMutex.Lock();   // critical section start
...
mMutex.Unlock(); // critical section end

The class System::Locker is a useful helper class for mutex objects.

A System::Locker object automatically calls the Lock() method at object creation and automatically calls Unlock() when the Locker object goes out of scope.

If a critical section is turned into a method and a System::Locker object is used to lock the mutex, it is assured that in any case the Unlock() method is called automatically when the method call ends.

void App::MultithreadingLogic::MyThread3::ClearProgress()
{
    System::Locker locker(mMutex);
    //critical section start, Unlock() is called automatically on return
    SInt32 temp = -1 * mProgress;
    mProgress.ExchangeAndAdd(temp);
}

Atomic Integer

A System::AtomicSInt32 class can be used to create an atomically counter without the need of an extra mutex object.

We can e.g. use the System::AtomicSInt32 class for a progress bar.

class MyThread3 : public System::Thread
{
public:
    MyThread3();
    virtual ~MyThread3();
    SInt32 GetProgress();
protected:
    virtual Bool Run();
    void ClearProgress();
    System::AtomicSInt32 mProgress;
    System::Mutex mMutex;
};
MyThread3 mMyThread3;
App::MultithreadingLogic::MyThread3::MyThread3()
: Thread("MyThread3")
, mProgress(0)
{
}

App::MultithreadingLogic::MyThread3::~MyThread3()
{
    Stop();
    Join();
}

Bool App::MultithreadingLogic::MyThread3::Run()
{
    ClearProgress();
    System::Time t(0.02);
    while ((mIsRunning) && (mProgress < 100))
    {
        mProgress++;
        System::Sleep(t);
    }
    return true;
}

void App::MultithreadingLogic::MyThread3::ClearProgress()
{
    System::Locker locker(mMutex);
    //critical section start, unlock is called automatically on return
    SInt32 temp = -1 * mProgress;
    mProgress.ExchangeAndAdd(temp);
}

SInt32 App::MultithreadingLogic::MyThread3::GetProgress()
{
    return mProgress;
}
if (deviceHandler->WasRawKeyPressed(RAWKEY_P) || mButtonStart->WasReleasedInside())
{
    if (!mMyThread3.Start())
    {
        mMyThread3.Stop();
        mMyThread3.Join();
    }
}

mPlane->SetScaleFactorX(mMyThread3.GetProgress()*6);

The thread can be started and stopped with P. The progress is visualized with the mPlane object.

MessageThread

The class Util::MessageThread implements a thread with Util::MessageQueue and Util::MessageDispatch objects. The class can be used to write your own message dispatcher.

To create a simple example, we define at first two message classes derived from Util::Message.

class MyMessage1 : public Util::Message
{
public:
    MyMessage1(UInt32 messageId, UInt32 data);
    UInt32 mData;
};
class MyMessage2 : public Util::Message
{
public:
    MyMessage2(UInt32 messageId, UInt32 data1, UInt32 data2);
    UInt32Array mData;
};

Next, we need a Util::MessageThread class.

class MyMessageThread: public Util::MessageThread
{
public:
    enum MyMessageIds
    {
        MY_MESSAGE_1 = 1,
        MY_MESSAGE_2,
        MY_MESSAGE_3,
        MY_MESSAGE_4,
    };
    MyMessageThread();
    virtual ~MyMessageThread();
    void Send1(UInt32 data);
    void Send2(UInt32 data1, UInt32 data2);
    void Send3();
    void Send4();
protected:
    void ReceiveTimeout(Util::Message::AutoPtr message);
    void ReceiveDefault(Util::Message::AutoPtr message);
    void Receive1(AutoPointer<MyMessage1> message);
    void Receive2(AutoPointer<MyMessage2> message);
};
MyMessageThread mMessageThread;

We use Register() to specify which method should process which message.

App::MultithreadingLogic::MyMessage1::MyMessage1(UInt32 messageId, UInt32 data)
: Util::Message(messageId)
, mData(data)
{
}
App::MultithreadingLogic::MyMessage2::MyMessage2(UInt32 messageId, UInt32 data1, UInt32 data2)
: Util::Message(messageId)
{
    mData.Add(data1);
    mData.Add(data2);
}

App::MultithreadingLogic::MyMessageThread::MyMessageThread()
: MessageThread("MyMessageThread")
{
    Util::MessageDispatch& disp = GetMessageDispatch();
    disp.RegisterTimeout(this, &MyMessageThread::ReceiveTimeout);
    disp.RegisterDefault(this, &MyMessageThread::ReceiveDefault);
    disp.Register<MyMessage1>(MY_MESSAGE_1, this, &MyMessageThread::Receive1);
    disp.Register<MyMessage2>(MY_MESSAGE_2, this, &MyMessageThread::Receive2);
}
App::MultithreadingLogic::MyMessageThread::~MyMessageThread()
{
    Stop();
    Join();
}

The methods SendMessage() and SendSyncMessage() can be used to send a message.

void App::MultithreadingLogic::MyMessageThread::Send1(UInt32 data)
{
    Debug::Trace(("%s Message1(%d) Send"), mName.Begin(), data);
    SendMessage(AutoPointer<MyMessage1>(new MyMessage1(MY_MESSAGE_1, data)));
}
void App::MultithreadingLogic::MyMessageThread::Send2(UInt32 data1, UInt32 data2)
{
    Debug::Trace(("%s Message2(%d, %d) SendSync"), mName.Begin(), data1, data2);
    SendSyncMessage(AutoPointer<MyMessage2>(new MyMessage2(MY_MESSAGE_2, data1, data2)));
    Debug::Trace(("%s Message2(%d, %d) returned"), mName.Begin(), data1, data2);
}
void App::MultithreadingLogic::MyMessageThread::Send3()
{
    Debug::Trace("Send Non-dispatched id %d", MY_MESSAGE_3);
    SendId(MY_MESSAGE_3);
}
void App::MultithreadingLogic::MyMessageThread::Send4()
{
    Debug::Trace("Send Non-dispatched id %d", MY_MESSAGE_4);
    SendId(MY_MESSAGE_4);
}

If a message is received it is evaluated and the corresponding receive method is called.

void App::MultithreadingLogic::MyMessageThread::ReceiveTimeout(Util::Message::AutoPtr message)
{
    Debug::Trace(("%s Timout Message(%d) received"), mName.Begin(), message->GetId());
}
void App::MultithreadingLogic::MyMessageThread::ReceiveDefault(Util::Message::AutoPtr message)
{
    Debug::Trace(("%s Default Message(%d) received"), mName.Begin(), message->GetId());
}
void App::MultithreadingLogic::MyMessageThread::Receive1(AutoPointer<MyMessage1> message)
{
    Debug::Trace(("%s Message1(%d) received"), mName.Begin(), message->mData);
}
void App::MultithreadingLogic::MyMessageThread::Receive2(AutoPointer<MyMessage2> message)
{
    Debug::Trace(("%s Message2(%d, %d) received"), mName.Begin(), message->mData[0], message->mData[1]);
}

The test sequence can be started with M.

if (deviceHandler->WasRawKeyPressed(RAWKEY_M) || mButtonM->WasReleasedInside())
{
    mMessageThread.Start();
    mMessageThread.Send2(16, 17);
    mMessageThread.Send1(42);
    mMessageThread.Send2(19, 18);
    mMessageThread.Send1(43);
    mMessageThread.Send3();
    mMessageThread.Send4();
    mMessageThread.SendQuit();
    mMessageThread.Join();
}

As result we get the expected output in the debug window:

MyMessageThread Message2(16, 17) SendSync
MyMessageThread Message2(16, 17) received
MyMessageThread Message2(16, 17) returned
MyMessageThread Message1(42) Send
MyMessageThread Message2(19, 18) SendSync
MyMessageThread Message1(42) received
MyMessageThread Message2(19, 18) received
MyMessageThread Message2(19, 18) returned
MyMessageThread Message1(43) Send
Send Non-dispatched id 3
MyMessageThread Message1(43) received
Send Non-dispatched id 4
MyMessageThread Default Message(3) received
MyMessageThread Default Message(4) received

C++11

C++11 provides new support for threads, atomic variables, mutexes etc. If only platforms with modern compilers and C++11 support are used, alternatively the new C++11 features can be used for multithreading.

Further information about C++11 multithreading can be found e.g. here:

Header files for the C++11 example:

#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <sstream>

Example:

std::atomic<SInt32> mCPP11_Atomic_SInt32;
std::mutex mCPP11_Mutex;

void CPP11_Test();
void CPP11_Method_1();
void CPP11_Method_2(UInt32& val);
void CPP11_Function()
{
    for (UInt32 i = 0; i < 5; i++)
    {
        Debug::Trace("CPP11_Function %d", i);
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

void App::MultithreadingLogic::CPP11_Test()
{
    UInt32 val = 40;
    mCPP11_Atomic_SInt32 = 100;

    std::thread t1(CPP11_Function);
    std::thread t2(&MultithreadingLogic::CPP11_Method_1, this);
    std::thread t3(&MultithreadingLogic::CPP11_Method_1, this);
    std::thread t4(&MultithreadingLogic::CPP11_Method_2, this, std::ref(val));

    mCPP11_Mutex.lock();
    // critical section start
    // ...
    // critical section end
    mCPP11_Mutex.unlock();

    t1.join();
    t2.join();
    t3.join();
    t4.join();
    Debug::Trace("CPP11_Test val: %d", val);
}

void App::MultithreadingLogic::CPP11_Method_1()
{
    std::thread::id threadId = std::this_thread::get_id();
    std::stringstream ss;
    ss << threadId;
    std::string stringId = ss.str();

    for (UInt32 i = 0; i < 5; i++)
    {
        mCPP11_Atomic_SInt32++;
        SInt32 val = mCPP11_Atomic_SInt32;
        Debug::Trace("CPP11_Method_1 thread %s, value %d", stringId.c_str() , val);
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

void App::MultithreadingLogic::CPP11_Method_2(UInt32& val)
{
    val += 2;
    Debug::Trace("CPP11_Method_2 finished");
}

The class std::thread can be used to spawn a new thread. The function which should be executed by the new thread is passed as parameter (line 273).

If a member function should be executed, a pointer to the member function and the object needs to be passed as parameters (lines 274-276).

By default the thread constructor will copy all arguments passed to it. To pass a variable by reference, the passed arguments must be wrapped into std::ref or std::cref (line 276).

As result we get again the expected output in the debug window:

CPP11_Function 0
CPP11_Method_1 thread 3864, value 101
CPP11_Method_1 thread 9284, value 102
CPP11_Method_2 finished
CPP11_Method_1 thread 9284, value 103
CPP11_Function 1
CPP11_Method_1 thread 3864, value 104
CPP11_Method_1 thread 9284, value 105
CPP11_Method_1 thread 3864, value 106
CPP11_Function 2
CPP11_Method_1 thread 3864, value 107
CPP11_Function 3
CPP11_Method_1 thread 9284, value 108
CPP11_Method_1 thread 3864, value 109
CPP11_Function 4
CPP11_Method_1 thread 9284, value 110
CPP11_Test val: 42

Attention! To be able to use C++11 features with the Android NDK, it is necessary to set the C++11 compiler flag in the Makefile (APP_CPPFLAGS += -std=c++11). If the NDK version is older than revision 10d, additionally the NDK_TOOLCHAIN_VERSION must be changed to NDK_TOOLCHAIN_VERSION=4.8 or NDK_TOOLCHAIN_VERSION=clang (default is GCC 4.6). GCC 4.8 is the default compiler for all 32-bit ABIs since Android NDK, Revision 10d.

Attention! Visual Studio 2010 does not provide support for C++11 threads. Thus, to be able to compile this tutorial with VS2010, the C++11 tests have to be excluded:

// comment the next line out to exclude the CPP11 code segments
// #define INCLUDE_CPP11_CODE

Screenshot

tut0115_multithreading.png
Multithreading


Copyright © 2011-2017 Spraylight GmbH.