35 #ifndef PTLIB_THREADPOOL_H
36 #define PTLIB_THREADPOOL_H
198 PThreadPoolBase(
unsigned maxWorkerCount = 10,
unsigned maxWorkUnitCount = 0);
200 virtual bool CheckWorker(WorkerThreadBase * worker);
214 template <
class Work_T>
233 : WorkerThreadBase(priority)
238 virtual void AddWork(Work_T * work) = 0;
240 virtual void Main() = 0;
253 : InternalWorkBase(group)
289 bool AddWork(Work_T * work,
const char * group = NULL)
296 if ((group == NULL) || (strlen(group) == 0)) {
307 worker = g->second.m_worker;
308 PTRACE(4,
"ThreadPool\tAllocated worker thread by group Id " << group);
323 if (!internalWork.m_group.empty()) {
324 typename GroupInfoMap_t::iterator r =
m_groupInfoMap.find(internalWork.m_group);
330 info.m_worker = worker;
331 m_groupInfoMap.insert(
typename GroupInfoMap_t::value_type(internalWork.m_group, info));
336 worker->AddWork(work);
349 typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
350 if (iterWork == m_externalToInternalWorkMap.end())
356 if (removeFromWorker)
360 if (!internalWork.m_group.empty()) {
361 typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
362 PAssert(iterGroup != m_groupInfoMap.end(),
"Attempt to find thread from unknown work group");
363 if (iterGroup != m_groupInfoMap.end()) {
364 if (--iterGroup->second.m_count == 0)
365 m_groupInfoMap.erase(iterGroup);
373 m_externalToInternalWorkMap.erase(iterWork);
382 template <
class Work_T>
413 Work_T * work =
m_queue.front();
421 return (
unsigned)
m_queue.size();
463 #endif // PTLIB_THREADPOOL_H
virtual bool CheckWorker(WorkerThreadBase *worker)
This class waits for the semaphore on construction and automatically signals the semaphore on destruc...
Definition: psync.h:86
void SetMaxUnits(unsigned count)
Definition: threadpool.h:193
This class defines a thread synchronisation object.
Definition: semaphor.h:78
Definition: threadpool.h:249
unsigned GetMaxUnits() const
Definition: threadpool.h:191
PMutex m_mutex
Definition: threadpool.h:451
Work_T * m_work
Definition: threadpool.h:260
PThreadPool(unsigned maxWorkers=10, unsigned maxWorkUnits=0)
Definition: threadpool.h:222
std::vector< WorkerThreadBase * > WorkerList_t
Definition: threadpool.h:204
void SetMaxWorkers(unsigned count)
Definition: threadpool.h:187
Definition: threadpool.h:393
WorkerList_t m_workers
Definition: threadpool.h:205
Definition: threadpool.h:168
unsigned m_count
Definition: threadpool.h:274
void StopWorker(WorkerThreadBase *worker)
PThreadPoolBase(unsigned maxWorkerCount=10, unsigned maxWorkUnitCount=0)
WorkerThreadBase(Priority priority=NormalPriority)
Definition: threadpool.h:156
bool RemoveWork(Work_T *work, bool removeFromWorker=true)
Definition: threadpool.h:344
WorkerThread * m_worker
Definition: threadpool.h:275
ExternalToInternalWorkMap_T m_externalToInternalWorkMap
Definition: threadpool.h:267
Priority
Codes for thread priorities.
Definition: thread.h:74
#define PTRACE(level, args)
Output trace.
Definition: object.h:530
GroupInfoMap_t m_groupInfoMap
Definition: threadpool.h:283
virtual void Signal()
If there are waiting (blocked) threads then unblock the first one that was blocked.
std::map< Work_T *, InternalWork > ExternalToInternalWorkMap_T
Definition: threadpool.h:266
virtual void Wait()
If the semaphore count is > 0, decrement the semaphore and return.
PMutex m_workerMutex
Definition: threadpool.h:165
virtual void RemoveWork(Work_T *work)=0
unsigned m_maxWorkUnitCount
Definition: threadpool.h:208
High Level (queued work item) thread pool.
Definition: threadpool.h:383
unsigned GetWorkSize() const
Definition: threadpool.h:419
These classes and templates implement a generic thread pooling mechanism.
Definition: threadpool.h:150
void AddWork(Work_T *work)
Definition: threadpool.h:402
virtual PThreadPoolBase::WorkerThreadBase * CreateWorkerThread()
Definition: threadpool.h:456
bool m_shutdown
Definition: threadpool.h:164
WorkerThread * m_worker
Definition: threadpool.h:259
void Shutdown()
Definition: threadpool.h:442
virtual WorkerThreadBase * CreateWorkerThread()=0
PQueuedThreadPool(unsigned maxWorkers=10, unsigned maxWorkUnits=0)
Definition: threadpool.h:389
Queue m_queue
Definition: threadpool.h:450
Don't delete thread as it may not be on heap.
Definition: thread.h:94
std::map< std::string, GroupInfo > GroupInfoMap_t
Definition: threadpool.h:282
Definition: threadpool.h:153
Definition: threadpool.h:273
PThreadPool & m_pool
Definition: threadpool.h:243
This class defines a thread of execution in the system.
Definition: thread.h:66
void Main()
Definition: threadpool.h:424
QueuedWorkerThread(PThreadPool< Work_T > &pool, PThread::Priority priority=PThread::NormalPriority)
Definition: threadpool.h:396
virtual WorkerThreadBase * AllocateWorker()
virtual void Shutdown()=0
bool AddWork(Work_T *work, const char *group=NULL)
Definition: threadpool.h:289
PMutex m_listMutex
Definition: threadpool.h:202
unsigned m_maxWorkerCount
Definition: threadpool.h:207
virtual WorkerThreadBase * NewWorker()
WorkerThread(PThreadPool &pool, Priority priority=NormalPriority)
Definition: threadpool.h:232
#define PAssert(b, msg)
This macro is used to assert that a condition must be true.
Definition: object.h:192
void RemoveWork(Work_T *)
Definition: threadpool.h:410
InternalWorkBase(const char *group)
Definition: threadpool.h:171
std::string m_group
Definition: threadpool.h:176
InternalWork(WorkerThread *worker, Work_T *work, const char *group)
Definition: threadpool.h:252
virtual void AddWork(Work_T *work)=0
Definition: threadpool.h:229
PSemaphore m_available
Definition: threadpool.h:452
Ultimate parent class for all objects in the class library.
Definition: object.h:1118
unsigned GetMaxWorkers() const
Definition: threadpool.h:185
Normal priority for a thread.
Definition: thread.h:79
std::queue< Work_T * > Queue
Definition: threadpool.h:449
Low Level thread pool.
Definition: threadpool.h:215
virtual unsigned GetWorkSize() const =0