18 #ifndef FXWorkerThread_h 19 #define FXWorkerThread_h 24 #define WORKLOAD_INTERVAL 100 34 #ifdef WORKLOAD_PROFILING 97 Pool(
int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
98 #ifdef WORKLOAD_PROFILING
99 , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
102 #ifdef WORKLOAD_PROFILING 103 long long int timeDiff = 0;
104 for (
int i = 0; i < 100; i++) {
105 const auto begin = std::chrono::high_resolution_clock::now();
106 const auto end = std::chrono::high_resolution_clock::now();
107 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
111 while (numThreads > 0) {
139 myWorkers.push_back(w);
150 index = myRunningIndex % myWorkers.size();
152 #ifdef WORKLOAD_PROFILING 153 if (myRunningIndex == 0) {
155 worker->startProfile();
157 myProfileStart = std::chrono::high_resolution_clock::now();
161 myWorkers[index]->add(t);
172 myFinishedTasks.splice(myFinishedTasks.end(), tasks);
179 if (myException ==
nullptr) {
186 void waitAll(
const bool deleteFinished =
true) {
188 while ((
int)myFinishedTasks.size() < myRunningIndex) {
191 #ifdef WORKLOAD_PROFILING 192 if (myRunningIndex > 0) {
193 const auto end = std::chrono::high_resolution_clock::now();
194 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
195 double minLoad = std::numeric_limits<double>::max();
198 const double load = worker->endProfile(elapsed);
199 minLoad =
MIN2(minLoad, load);
200 maxLoad =
MAX2(maxLoad, load);
202 #ifdef WORKLOAD_INTERVAL 203 myTotalMaxLoad += maxLoad;
204 myTotalSpread += maxLoad / minLoad;
214 if (deleteFinished) {
215 for (
Task* task : myFinishedTasks) {
220 myException =
nullptr;
221 myFinishedTasks.clear();
224 if (toRaise !=
nullptr) {
237 return myRunningIndex - (int)myFinishedTasks.size() >= size();
245 return (
int)myWorkers.size();
255 myPoolMutex.unlock();
273 #ifdef WORKLOAD_PROFILING 277 double myTotalMaxLoad;
279 double myTotalSpread;
281 std::chrono::high_resolution_clock::time_point myProfileStart;
293 #ifdef WORKLOAD_PROFILING
294 , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
307 #ifdef WORKLOAD_PROFILING 308 const double load = 100. * myTotalBusyTime / myTotalTime;
310 " tasks and had a load of " +
toString(load) +
"% (" +
toString(myTotalBusyTime) +
311 "us / " +
toString(myTotalTime) +
"us), " +
toString(myTotalBusyTime / (
double)myCounter) +
" per task.");
346 #ifdef WORKLOAD_PROFILING 347 const auto before = std::chrono::high_resolution_clock::now();
350 #ifdef WORKLOAD_PROFILING 351 const auto after = std::chrono::high_resolution_clock::now();
352 myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
376 #ifdef WORKLOAD_PROFILING 377 void startProfile() {
381 double endProfile(
const long long int time) {
383 myTotalBusyTime += myBusyTime;
384 return time == 0 ? 100. : 100. * myBusyTime / time;
401 #ifdef WORKLOAD_PROFILING 405 long long int myBusyTime;
407 long long int myTotalBusyTime;
409 long long int myTotalTime;
std::vector< FXWorkerThread * > myWorkers
the current worker threads
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
virtual ~FXWorkerThread()
Destructor.
FXCondition myCondition
the semaphore when waiting for new tasks
std::list< Task * > myTasks
the list of pending tasks
int myRunningIndex
the running index for the next task
FXWorkerThread(Pool &pool)
Constructor.
virtual ~Pool()
Destructor.
void add(Task *t)
Adds the given task to this thread to be calculated.
FXMutex myMutex
the internal mutex for the task list
FXMutex myPoolMutex
the pool mutex for external sync
FXint run()
Main execution method of this thread.
void lock()
locks the pool mutex
bool myStopped
whether we are still running
int myIndex
the index of the task, valid only after the task has been added to the pool
std::list< Task * > myFinishedTasks
list of finished tasks
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Pool(int numThreads=0)
Constructor.
void addFinished(std::list< Task *> &tasks)
Adds the given tasks to the list of finished tasks.
int size() const
Returns the number of threads in the pool.
FXMutex myMutex
the mutex for the task list
void clear()
Stops and deletes all worker threads.
virtual ~Task()
Desctructor.
ProcessError * myException
the exception from a child thread
void setException(ProcessError &e)
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index. If the index is negative, assign to the next (round robin) one.
#define WORKLOAD_INTERVAL
A pool of worker threads which distributes the tasks and collects the results.
bool isFull() const
Checks whether there are currently more pending tasks than threads.
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Pool & myPool
the pool for this thread
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
void unlock()
unlocks the pool mutex
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
void stop()
Stops the thread.
Abstract superclass of a task to be run with an index to keep track of pending tasks.
A thread repeatingly calculating incoming tasks.
FXCondition myCondition
the semaphore to wait on for finishing all tasks
#define WRITE_MESSAGE(msg)
void setIndex(const int newIndex)
Sets the running index of this task.