Tutorial #15: Multithreading

Wenn mehrere, nebenläufige Threads gleichzeitig ausgeführt werden, spricht man von Multithreading.

Normalerweise besteht keine Notwendigkeit eigene Threads zu erzeugen. Wenn aber doch einmal eine längere Berechnung im Hintergrund ausgeführt werden soll, kann dafür mit der Klasse System::Thread ein eigener Thread aufgespannt werden.

Für die Synchronisation stellt die Murl Engine noch die Klassen System::Semaphore, System::Mutex, System::Locker und System::AtomicSInt32 zur Verfügung.

Header-Dateien:

#include "murl_system.h"
#include "murl_system_thread.h"
#include "murl_system_time.h"
#include "murl_system_semaphore.h"
#include "murl_system_mutex.h"

Quicklinks zu den einzelnen Abschnitten in diesem Tutorial:

Thread

Mit der Klasse System::Thread kann ein neuer Thread aufgespannt werden. Dafür erstellen wir eine innere Klasse und leiten diese von System::Thread ab.

class MyThread : public System::Thread
{
public:
    MyThread();
    virtual ~MyThread();
protected:
    virtual Bool Run();
};

MyThread mMyThread;

Wir definieren für die Klasse einen Konstruktor, einen Destruktor und überschreiben die Methode Run().

App::MultithreadingLogic::MyThread::MyThread()
: Thread("MyThread")
{
}

App::MultithreadingLogic::MyThread::~MyThread()
{
    Stop();
    Join();
}

Bool App::MultithreadingLogic::MyThread::Run()
{
    System::Time t(0.1);
    while (mIsRunning)
    {
        Debug::Trace(".");
        System::Sleep(t);
    }
    Debug::Trace("Thread finished");
    return true;
}
Zu beachten
Achtung! Damit ein Thread-Objekt nicht freigegeben wird, während es noch läuft, ist es notwendig einen Destruktor zu implementieren, der die Methoden Stop() und Join() aufruft.

Nun können wir unseren Thread mit Start() starten bzw. mit Stop() anhalten. Die Methode Join() blockiert den aufrufenden Thread, bis der Thread tatsächlich beendet wurde und muss aufgerufen werden, bevor der Thread erneut gestartet werden kann.

if (deviceHandler->WasRawKeyPressed(RAWKEY_T) || mButtonT->WasReleasedInside())
{
    if (mMyThread.Start())
        Debug::Trace("Thread started");
    else
    {
        mMyThread.Stop();
        mMyThread.Join();
        Debug::Trace("Thread stopped");
    }
}

Unser neuer Thread kann mit T (Tastatur oder Button) gestartet und gestoppt werden und schreibt lediglich alle 100ms ein Punkt-Zeichen in das Debug-Fenster.

Thread started
.
.
.
Thread finished
Thread stopped

Semaphore

Die Klasse System::Semaphore implementiert einen binären Semaphor mit den Methoden Signal(), Wait() und Try().

Wait() blockiert den aufrufenden Thread bis Signal() aufgerufen wird (wahlweise mit oder ohne Timeout). Try() blockiert den Thread nicht und prüft lediglich ob Signal() bereits aufgerufen wurde.

Wir können einen Semaphor beispielsweise dafür verwenden, um festzustellen ob ein Thread bereits fertig abgearbeitet wurde.

class MyThread2 : public System::Thread
{
public:
    MyThread2();
    virtual ~MyThread2();
    System::Semaphore mSemaphore;
protected:
    virtual Bool Run();
};
MyThread2 mMyThread2;
App::MultithreadingLogic::MyThread2::MyThread2()
: Thread("MyThread2")
{
}

App::MultithreadingLogic::MyThread2::~MyThread2()
{
    Stop();
    Join();
}

Bool App::MultithreadingLogic::MyThread2::Run()
{
    UInt32 progress = 0;
    System::Time t(0.02);
    while (progress < 100)
    {
        progress++;
        System::Sleep(t);
    }
    mSemaphore.Signal();
    return true;
}
if (deviceHandler->WasRawKeyPressed(RAWKEY_S) || mButtonS->WasReleasedInside())
{
    if (mMyThread2.Start())
        Debug::Trace("Thread2 started");
}

Bool threadHasFinished = mMyThread2.mSemaphore.Try();
if (threadHasFinished)
    Debug::Trace("Thread2 has finished");

Der Thread kann mit S gestartet werden und schreibt eine Infomeldungen in das Debug-Fenster sobald er fertig abgearbeitet wurde.

Thread2 started
Thread2 has finished

Mutex

Ein Mutex (Abk. für engl. mutual exclusion) kann verwendet werden um kritische Abschnitte zu definieren. Damit kann sichergestellt werden, dass sich immer nur maximal ein Thread in einem kritischen Abschnitt befindet.

Die Klasse System::Mutex stellt dafür die beiden Methoden Lock() und Unlock() zur Verfügung.

mMutex.Lock();   // critical section start
...
mMutex.Unlock(); // critical section end

Die Klasse System::Locker ist eine praktische Hilfsklasse für Mutex-Objekte.

Ein System::Locker-Objekt ruft die Methode Lock() automatisch auf, wenn das Locker-Objekt erzeugt wird und ruft automatisch wieder Unlock() auf, sobald das Locker-Objekt out-of-scope geht.

Wird der kritische Abschnitt in eine eigenen Methode verpackt und mit einem System::Locker gesperrt, so ist sichergestellt, dass jedenfalls am Ende der Methode immer automatisch ein Unlock() aufgerufen wird.

void App::MultithreadingLogic::MyThread3::ClearProgress()
{
    System::Locker locker(mMutex);
    //critical section start, Unlock() is called automatically on return
    SInt32 temp = -1 * mProgress;
    mProgress.ExchangeAndAdd(temp);
}

Atomic Integer

Ein System::AtomicSInt32 kann als atomarer Zähler verwendet werden, ohne dass dafür ein eigenes Mutex-Objekt benötigt wird.

Wir können die System::AtomicSInt32 Klasse z.B. für eine Progressbar verwenden.

class MyThread3 : public System::Thread
{
public:
    MyThread3();
    virtual ~MyThread3();
    SInt32 GetProgress();
protected:
    virtual Bool Run();
    void ClearProgress();
    System::AtomicSInt32 mProgress;
    System::Mutex mMutex;
};
MyThread3 mMyThread3;
App::MultithreadingLogic::MyThread3::MyThread3()
: Thread("MyThread3")
, mProgress(0)
{
}

App::MultithreadingLogic::MyThread3::~MyThread3()
{
    Stop();
    Join();
}

Bool App::MultithreadingLogic::MyThread3::Run()
{
    ClearProgress();
    System::Time t(0.02);
    while ((mIsRunning) && (mProgress < 100))
    {
        mProgress++;
        System::Sleep(t);
    }
    return true;
}

void App::MultithreadingLogic::MyThread3::ClearProgress()
{
    System::Locker locker(mMutex);
    //critical section start, unlock is called automatically on return
    SInt32 temp = -1 * mProgress;
    mProgress.ExchangeAndAdd(temp);
}

SInt32 App::MultithreadingLogic::MyThread3::GetProgress()
{
    return mProgress;
}
if (deviceHandler->WasRawKeyPressed(RAWKEY_P) || mButtonStart->WasReleasedInside())
{
    if (!mMyThread3.Start())
    {
        mMyThread3.Stop();
        mMyThread3.Join();
    }
}

mPlane->SetScaleFactorX(mMyThread3.GetProgress()*6);

Der Thread kann mit P gestartet und gestoppt werden. Der Berechnungs-Fortschritt wird über die mPlane visualisiert.

Message-Thread

Die Klasse Util::MessageThread implementiert einen Thread mit Util::MessageQueue und Util::MessageDispatch Objekten. Diese Klasse kann für eigene Message-Dispatcher verwendet werden.

Für ein einfaches Beispiel definieren wir zunächst zwei Message-Klassen, die wir von Util::Message ableiten.

class MyMessage1 : public Util::Message
{
public:
    MyMessage1(UInt32 messageId, UInt32 data);
    UInt32 mData;
};
class MyMessage2 : public Util::Message
{
public:
    MyMessage2(UInt32 messageId, UInt32 data1, UInt32 data2);
    UInt32Array mData;
};

Weiters benötigen wir unsere Util::MessageThread Klasse.

class MyMessageThread: public Util::MessageThread
{
public:
    enum MyMessageIds
    {
        MY_MESSAGE_1 = 1,
        MY_MESSAGE_2,
        MY_MESSAGE_3,
        MY_MESSAGE_4,
    };
    MyMessageThread();
    virtual ~MyMessageThread();
    void Send1(UInt32 data);
    void Send2(UInt32 data1, UInt32 data2);
    void Send3();
    void Send4();
protected:
    void ReceiveTimeout(Util::Message::AutoPtr message);
    void ReceiveDefault(Util::Message::AutoPtr message);
    void Receive1(AutoPointer<MyMessage1> message);
    void Receive2(AutoPointer<MyMessage2> message);
};
MyMessageThread mMessageThread;

Mit Register() definieren wir Methoden für die Verarbeitung der unterschiedlichen Messages.

App::MultithreadingLogic::MyMessage1::MyMessage1(UInt32 messageId, UInt32 data)
: Util::Message(messageId)
, mData(data)
{
}
App::MultithreadingLogic::MyMessage2::MyMessage2(UInt32 messageId, UInt32 data1, UInt32 data2)
: Util::Message(messageId)
{
    mData.Add(data1);
    mData.Add(data2);
}

App::MultithreadingLogic::MyMessageThread::MyMessageThread()
: MessageThread("MyMessageThread")
{
    Util::MessageDispatch& disp = GetMessageDispatch();
    disp.RegisterTimeout(this, &MyMessageThread::ReceiveTimeout);
    disp.RegisterDefault(this, &MyMessageThread::ReceiveDefault);
    disp.Register<MyMessage1>(MY_MESSAGE_1, this, &MyMessageThread::Receive1);
    disp.Register<MyMessage2>(MY_MESSAGE_2, this, &MyMessageThread::Receive2);
}
App::MultithreadingLogic::MyMessageThread::~MyMessageThread()
{
    Stop();
    Join();
}

Mit der Methode SendMessage() bzw. SendSyncMessage() kann dann eine Message versendet werden.

void App::MultithreadingLogic::MyMessageThread::Send1(UInt32 data)
{
    Debug::Trace(("%s Message1(%d) Send"), mName.Begin(), data);
    SendMessage(AutoPointer<MyMessage1>(new MyMessage1(MY_MESSAGE_1, data)));
}
void App::MultithreadingLogic::MyMessageThread::Send2(UInt32 data1, UInt32 data2)
{
    Debug::Trace(("%s Message2(%d, %d) SendSync"), mName.Begin(), data1, data2);
    SendSyncMessage(AutoPointer<MyMessage2>(new MyMessage2(MY_MESSAGE_2, data1, data2)));
    Debug::Trace(("%s Message2(%d, %d) returned"), mName.Begin(), data1, data2);
}
void App::MultithreadingLogic::MyMessageThread::Send3()
{
    Debug::Trace("Send Non-dispatched id %d", MY_MESSAGE_3);
    SendId(MY_MESSAGE_3);
}
void App::MultithreadingLogic::MyMessageThread::Send4()
{
    Debug::Trace("Send Non-dispatched id %d", MY_MESSAGE_4);
    SendId(MY_MESSAGE_4);
}

Die Message wird nach Empfang ausgewertet und die entsprechende Receive-Methode wird aufgerufen.

void App::MultithreadingLogic::MyMessageThread::ReceiveTimeout(Util::Message::AutoPtr message)
{
    Debug::Trace(("%s Timout Message(%d) received"), mName.Begin(), message->GetId());
}
void App::MultithreadingLogic::MyMessageThread::ReceiveDefault(Util::Message::AutoPtr message)
{
    Debug::Trace(("%s Default Message(%d) received"), mName.Begin(), message->GetId());
}
void App::MultithreadingLogic::MyMessageThread::Receive1(AutoPointer<MyMessage1> message)
{
    Debug::Trace(("%s Message1(%d) received"), mName.Begin(), message->mData);
}
void App::MultithreadingLogic::MyMessageThread::Receive2(AutoPointer<MyMessage2> message)
{
    Debug::Trace(("%s Message2(%d, %d) received"), mName.Begin(), message->mData[0], message->mData[1]);
}

Mit M kann der Thread gestartet und einzelne Test-Messages versandt werden.

if (deviceHandler->WasRawKeyPressed(RAWKEY_M) || mButtonM->WasReleasedInside())
{
    mMessageThread.Start();
    mMessageThread.Send2(16, 17);
    mMessageThread.Send1(42);
    mMessageThread.Send2(19, 18);
    mMessageThread.Send1(43);
    mMessageThread.Send3();
    mMessageThread.Send4();
    mMessageThread.SendQuit();
    mMessageThread.Join();
}

Als Ergebnis erhalten wir entsprechende Ausgaben im Debug-Fenster:

MyMessageThread Message2(16, 17) SendSync
MyMessageThread Message2(16, 17) received
MyMessageThread Message2(16, 17) returned
MyMessageThread Message1(42) Send
MyMessageThread Message2(19, 18) SendSync
MyMessageThread Message1(42) received
MyMessageThread Message2(19, 18) received
MyMessageThread Message2(19, 18) returned
MyMessageThread Message1(43) Send
Send Non-dispatched id 3
MyMessageThread Message1(43) received
Send Non-dispatched id 4
MyMessageThread Default Message(3) received
MyMessageThread Default Message(4) received

C++11

C++11 bietet von sich aus Support für Threads, atomare Variablen, Mutexe etc. Werden nur Plattformen mit modernen Compilern und C++11 Unterstützung verwendet, können alternativ diese C++11 Features für Multithreading verwendet werden.

Weitere Informationen dazu gibt es z.B. hier:

Header-Dateien für das C++11 Beispiel:

#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <sstream>

Beispiel:

std::atomic<SInt32> mCPP11_Atomic_SInt32;
std::mutex mCPP11_Mutex;

void CPP11_Test();
void CPP11_Method_1();
void CPP11_Method_2(UInt32& val);
void CPP11_Function()
{
    for (UInt32 i = 0; i < 5; i++)
    {
        Debug::Trace("CPP11_Function %d", i);
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

void App::MultithreadingLogic::CPP11_Test()
{
    UInt32 val = 40;
    mCPP11_Atomic_SInt32 = 100;

    std::thread t1(CPP11_Function);
    std::thread t2(&MultithreadingLogic::CPP11_Method_1, this);
    std::thread t3(&MultithreadingLogic::CPP11_Method_1, this);
    std::thread t4(&MultithreadingLogic::CPP11_Method_2, this, std::ref(val));

    mCPP11_Mutex.lock();
    // critical section start
    // ...
    // critical section end
    mCPP11_Mutex.unlock();

    t1.join();
    t2.join();
    t3.join();
    t4.join();
    Debug::Trace("CPP11_Test val: %d", val);
}

void App::MultithreadingLogic::CPP11_Method_1()
{
    std::thread::id threadId = std::this_thread::get_id();
    std::stringstream ss;
    ss << threadId;
    std::string stringId = ss.str();

    for (UInt32 i = 0; i < 5; i++)
    {
        mCPP11_Atomic_SInt32++;
        SInt32 val = mCPP11_Atomic_SInt32;
        Debug::Trace("CPP11_Method_1 thread %s, value %d", stringId.c_str() , val);
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

void App::MultithreadingLogic::CPP11_Method_2(UInt32& val)
{
    val += 2;
    Debug::Trace("CPP11_Method_2 finished");
}

Mit std::thread kann ein neuer Thread aufgespannt werden. Als Parameter wird die Funktion übergeben, die der neue Thread ausführen soll (Zeile 273).

Bei Member-Methoden muss ein Pointer auf die Methode und das eigentliche Objekt als Parameter übergeben werden (Zeile 274-276).

Standardmäßig kopiert der Thread-Konstruktor alle übergebenen Argumente. Um Variablen als Referenz zu übergeben, müssen diese mit std::ref oder std::cref übergeben werden (Zeile 276).

Als Ergebnis erhalten wir wieder entsprechende Ausgaben im Debug-Fenster:

CPP11_Function 0
CPP11_Method_1 thread 3864, value 101
CPP11_Method_1 thread 9284, value 102
CPP11_Method_2 finished
CPP11_Method_1 thread 9284, value 103
CPP11_Function 1
CPP11_Method_1 thread 3864, value 104
CPP11_Method_1 thread 9284, value 105
CPP11_Method_1 thread 3864, value 106
CPP11_Function 2
CPP11_Method_1 thread 3864, value 107
CPP11_Function 3
CPP11_Method_1 thread 9284, value 108
CPP11_Method_1 thread 3864, value 109
CPP11_Function 4
CPP11_Method_1 thread 9284, value 110
CPP11_Test val: 42

Achtung! Um C++11 Features unter Android verwenden zu können muss im Makefile das Compiler-Flag für C++11 gesetzt werden (APP_CPPFLAGS += -std=c++11). Wenn die NDK Version älter als Revision 10d ist, muss außerdem die NDK_TOOLCHAIN_VERSION auf NDK_TOOLCHAIN_VERSION=4.8 oder NDK_TOOLCHAIN_VERSION=clang geändert werden (default ist GCC 4.6). Seit Android NDK, Revision 10d ist GCC 4.8 der Standardcompiler für alle 32-Bit ABIs.

Achtung! Visual Studio 2010 bietet keine Unterstützung für C++11 Threads. Daher müssen für VS2010 die C++11 Tests ausgeklammert werden:

// comment the next line out to exclude the CPP11 code segments
// #define INCLUDE_CPP11_CODE

Screenshot

tut0115_multithreading.png
Multithreading


Copyright © 2011-2017 Spraylight GmbH.