SUMO - Simulation of Urban MObility
FXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
7 // A thread class together with a pool and a task for parallelized computation
8 /****************************************************************************/
9 // SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
10 // Copyright (C) 2004-2016 DLR (http://www.dlr.de/) and contributors
11 /****************************************************************************/
12 //
13 // This file is part of SUMO.
14 // SUMO is free software: you can redistribute it and/or modify
15 // it under the terms of the GNU General Public License as published by
16 // the Free Software Foundation, either version 3 of the License, or
17 // (at your option) any later version.
18 //
19 /****************************************************************************/
20 
21 #ifndef FXWorkerThread_h
22 #define FXWorkerThread_h
23 
24 
25 // ===========================================================================
26 // included modules
27 // ===========================================================================
28 #ifdef _MSC_VER
29 #include <windows_config.h>
30 #else
31 #include <config.h>
32 #endif
33 
34 #include <list>
35 #include <vector>
36 #include <fx.h>
37 #include <FXThread.h>
38 
39 
40 // ===========================================================================
41 // class definitions
42 // ===========================================================================
47 class FXWorkerThread : public FXThread {
48 
49 public:
54  class Task {
55  public:
57  virtual ~Task() {};
58 
67  virtual void run(FXWorkerThread* context) = 0;
68 
75  void setIndex(const int newIndex) {
76  myIndex = newIndex;
77  }
78  private:
80  int myIndex;
81  };
82 
87  class Pool {
88  public:
95  Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myNumFinished(0) {
96  while (numThreads > 0) {
97  new FXWorkerThread(*this);
98  numThreads--;
99  }
100  }
101 
106  virtual ~Pool() {
107  clear();
108  }
109 
112  void clear() {
113  for (std::vector<FXWorkerThread*>::iterator it = myWorkers.begin(); it != myWorkers.end(); ++it) {
114  delete *it;
115  }
116  myWorkers.clear();
117  }
118 
123  void addWorker(FXWorkerThread* const w) {
124 // if (myWorkers.empty()) std::cout << "created pool at " << SysUtils::getCurrentMillis() << std::endl;
125  myWorkers.push_back(w);
126  }
127 
134  void add(Task* const t, int index = -1) {
135  t->setIndex(myRunningIndex++);
136  if (index < 0) {
137  index = myRunningIndex % myWorkers.size();
138  }
139  myWorkers[index]->add(t);
140  }
141 
148  void addFinished(Task* const t) {
149  myMutex.lock();
150  myNumFinished++;
151  myFinishedTasks.push_back(t);
152  myCondition.signal();
153  myMutex.unlock();
154  }
155 
157  void waitAll() {
158  myMutex.lock();
159  while (myNumFinished < myRunningIndex) {
160  myCondition.wait(myMutex);
161  }
162 // if (myRunningIndex > 0) std::cout << "finished waiting for " << myRunningIndex << " tasks at " << SysUtils::getCurrentMillis() << std::endl;
163  for (std::list<Task*>::iterator it = myFinishedTasks.begin(); it != myFinishedTasks.end(); ++it) {
164  delete *it;
165  }
166  myFinishedTasks.clear();
167  myRunningIndex = 0;
168  myNumFinished = 0;
169  myMutex.unlock();
170  }
171 
179  bool isFull() const {
180  return myRunningIndex - myNumFinished >= size();
181  }
182 
187  int size() const {
188  return (int)myWorkers.size();
189  }
190 
192  void lock() {
193  myPoolMutex.lock();
194  }
195 
197  void unlock() {
198  myPoolMutex.unlock();
199  }
200 
201  private:
203  std::vector<FXWorkerThread*> myWorkers;
205  FXMutex myMutex;
207  FXMutex myPoolMutex;
209  FXCondition myCondition;
211  std::list<Task*> myFinishedTasks;
216  };
217 
218 public:
225  FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false), myCounter(0) {
226  pool.addWorker(this);
227  start();
228  }
229 
234  virtual ~FXWorkerThread() {
235  stop();
236  }
237 
242  void add(Task* t) {
243  myMutex.lock();
244  myTasks.push_back(t);
245  myCondition.signal();
246  myMutex.unlock();
247  }
248 
255  FXint run() {
256  while (!myStopped) {
257  myMutex.lock();
258  while (!myStopped && myTasks.empty()) {
259  myCondition.wait(myMutex);
260  }
261  if (myStopped) {
262  myMutex.unlock();
263  break;
264  }
265  Task* t = myTasks.front();
266  myTasks.pop_front();
267  myMutex.unlock();
268  t->run(this);
269  myCounter++;
270 // if (myCounter % 1000 == 0) std::cout << (size_t)this << " ran " << myCounter << " tasks " << std::endl;
271  myPool.addFinished(t);
272  }
273 // std::cout << "ran " << myCounter << " tasks " << std::endl;
274  return 0;
275  }
276 
281  void stop() {
282  myMutex.lock();
283  myStopped = true;
284  myCondition.signal();
285  myMutex.unlock();
286  join();
287  }
288 
289 private:
293  FXMutex myMutex;
295  FXCondition myCondition;
297  std::list<Task*> myTasks;
299  bool myStopped;
302 };
303 
304 
305 #endif
std::vector< FXWorkerThread * > myWorkers
the current worker threads
int myNumFinished
the number of finished tasks (is reset when the pool runs empty)
bool isFull() const
Checks whether there are currently more pending tasks than threads.
virtual ~FXWorkerThread()
Destructor.
FXCondition myCondition
the semaphore when waiting for new tasks
std::list< Task * > myTasks
the list of pending tasks
int myRunningIndex
the running index for the next task
FXWorkerThread(Pool &pool)
Constructor.
virtual ~Pool()
Destructor.
void addFinished(Task *const t)
Adds the given task to the list of finished tasks and assigns it to a randomly chosen worker...
void add(Task *t)
Adds the given task to this thread to be calculated.
FXMutex myMutex
the internal mutex for the task list
FXMutex myPoolMutex
the pool mutex for external sync
FXint run()
Main execution method of this thread.
void lock()
locks the pool mutex
bool myStopped
whether we are still running
void waitAll()
waits for all tasks to be finished
int myIndex
the index of the task, valid only after the task has been added to the pool
int myCounter
counting completed tasks for debugging / profiling
std::list< Task * > myFinishedTasks
list of finished tasks
Pool(int numThreads=0)
Constructor.
FXMutex myMutex
the mutex for the task list
void clear()
Stops and deletes all worker threads.
virtual ~Task()
Desctructor.
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index. If the index is negative, assign to the next (round robin) one.
int size() const
Returns the number of threads in the pool.
A pool of worker threads which distributes the tasks and collects the results.
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Pool & myPool
the pool for this thread
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
void unlock()
unlocks the pool mutex
void stop()
Stops the thread.
Abstract superclass of a task to be run with an index to keep track of pending tasks.
A thread repeatingly calculating incoming tasks.
FXCondition myCondition
the semaphore to wait on for finishing all tasks
void setIndex(const int newIndex)
Sets the running index of this task.