00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #ifndef __PION_SCHEDULER_HEADER__
00011 #define __PION_SCHEDULER_HEADER__
00012
00013 #include <vector>
00014 #include <boost/asio.hpp>
00015 #include <boost/assert.hpp>
00016 #include <boost/bind.hpp>
00017 #include <boost/function/function0.hpp>
00018 #include <boost/cstdint.hpp>
00019 #include <boost/shared_ptr.hpp>
00020 #include <boost/noncopyable.hpp>
00021 #include <boost/thread/thread.hpp>
00022 #include <boost/thread/mutex.hpp>
00023 #include <boost/thread/xtime.hpp>
00024 #include <boost/thread/condition.hpp>
00025 #include <pion/config.hpp>
00026 #include <pion/logger.hpp>
00027
00028
00029 namespace pion {
00030
00034 class PION_API scheduler :
00035 private boost::noncopyable
00036 {
00037 public:
00038
00040 scheduler(void)
00041 : m_logger(PION_GET_LOGGER("pion.scheduler")),
00042 m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
00043 {}
00044
00046 virtual ~scheduler() {}
00047
00049 virtual void startup(void) {}
00050
00052 virtual void shutdown(void);
00053
00055 void join(void);
00056
00060 void add_active_user(void);
00061
00063 void remove_active_user(void);
00064
00066 inline bool is_running(void) const { return m_is_running; }
00067
00069 inline void set_num_threads(const boost::uint32_t n) { m_num_threads = n; }
00070
00072 inline boost::uint32_t get_num_threads(void) const { return m_num_threads; }
00073
00075 inline void set_logger(logger log_ptr) { m_logger = log_ptr; }
00076
00078 inline logger get_logger(void) { return m_logger; }
00079
00081 virtual boost::asio::io_service& get_io_service(void) = 0;
00082
00088 virtual void post(boost::function0<void> work_func) {
00089 get_io_service().post(work_func);
00090 }
00091
00098 void keep_running(boost::asio::io_service& my_service,
00099 boost::asio::deadline_timer& my_timer);
00100
00107 inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
00108 boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
00109 boost::thread::sleep(wakeup_time);
00110 }
00111
00121 template <typename ConditionType, typename LockType>
00122 inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
00123 boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
00124 {
00125 boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
00126 wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
00127 }
00128
00129
00131 void process_service_work(boost::asio::io_service& service);
00132
00133
00134 protected:
00135
00144 static boost::system_time get_wakeup_time(boost::uint32_t sleep_sec,
00145 boost::uint32_t sleep_nsec);
00146
00148 virtual void stop_services(void) {}
00149
00151 virtual void stop_threads(void) {}
00152
00154 virtual void finish_services(void) {}
00155
00157 virtual void finish_threads(void) {}
00158
00159
00161 static const boost::uint32_t DEFAULT_NUM_THREADS;
00162
00164 static const boost::uint32_t NSEC_IN_SECOND;
00165
00167 static const boost::uint32_t MICROSEC_IN_SECOND;
00168
00170 static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS;
00171
00172
00174 boost::mutex m_mutex;
00175
00177 logger m_logger;
00178
00180 boost::condition m_no_more_active_users;
00181
00183 boost::condition m_scheduler_has_stopped;
00184
00186 boost::uint32_t m_num_threads;
00187
00189 boost::uint32_t m_active_users;
00190
00192 bool m_is_running;
00193 };
00194
00195
00199 class PION_API multi_thread_scheduler :
00200 public scheduler
00201 {
00202 public:
00203
00205 multi_thread_scheduler(void) {}
00206
00208 virtual ~multi_thread_scheduler() {}
00209
00210
00211 protected:
00212
00214 virtual void stop_threads(void) {
00215 if (! m_thread_pool.empty()) {
00216 PION_LOG_DEBUG(m_logger, "Waiting for threads to shutdown");
00217
00218
00219 boost::thread current_thread;
00220 for (ThreadPool::iterator i = m_thread_pool.begin();
00221 i != m_thread_pool.end(); ++i)
00222 {
00223
00224
00225 if (**i != current_thread) (*i)->join();
00226 }
00227 }
00228 }
00229
00231 virtual void finish_threads(void) { m_thread_pool.clear(); }
00232
00233
00235 typedef std::vector<boost::shared_ptr<boost::thread> > ThreadPool;
00236
00237
00239 ThreadPool m_thread_pool;
00240 };
00241
00242
00246 class PION_API single_service_scheduler :
00247 public multi_thread_scheduler
00248 {
00249 public:
00250
00252 single_service_scheduler(void)
00253 : m_service(), m_timer(m_service)
00254 {}
00255
00257 virtual ~single_service_scheduler() { shutdown(); }
00258
00260 virtual boost::asio::io_service& get_io_service(void) { return m_service; }
00261
00263 virtual void startup(void);
00264
00265
00266 protected:
00267
00269 virtual void stop_services(void) { m_service.stop(); }
00270
00272 virtual void finish_services(void) { m_service.reset(); }
00273
00274
00276 boost::asio::io_service m_service;
00277
00279 boost::asio::deadline_timer m_timer;
00280 };
00281
00282
00286 class PION_API one_to_one_scheduler :
00287 public multi_thread_scheduler
00288 {
00289 public:
00290
00292 one_to_one_scheduler(void)
00293 : m_service_pool(), m_next_service(0)
00294 {}
00295
00297 virtual ~one_to_one_scheduler() { shutdown(); }
00298
00300 virtual boost::asio::io_service& get_io_service(void) {
00301 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00302 while (m_service_pool.size() < m_num_threads) {
00303 boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type());
00304 m_service_pool.push_back(service_ptr);
00305 }
00306 if (++m_next_service >= m_num_threads)
00307 m_next_service = 0;
00308 BOOST_ASSERT(m_next_service < m_num_threads);
00309 return m_service_pool[m_next_service]->first;
00310 }
00311
00318 virtual boost::asio::io_service& get_io_service(boost::uint32_t n) {
00319 BOOST_ASSERT(n < m_num_threads);
00320 BOOST_ASSERT(n < m_service_pool.size());
00321 return m_service_pool[n]->first;
00322 }
00323
00325 virtual void startup(void);
00326
00327
00328 protected:
00329
00331 virtual void stop_services(void) {
00332 for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
00333 (*i)->first.stop();
00334 }
00335 }
00336
00338 virtual void finish_services(void) { m_service_pool.clear(); }
00339
00340
00342 struct service_pair_type {
00343 service_pair_type(void) : first(), second(first) {}
00344 boost::asio::io_service first;
00345 boost::asio::deadline_timer second;
00346 };
00347
00349 typedef std::vector<boost::shared_ptr<service_pair_type> > service_pool_type;
00350
00351
00353 service_pool_type m_service_pool;
00354
00356 boost::uint32_t m_next_service;
00357 };
00358
00359
00360 }
00361
00362 #endif