00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <boost/exception/diagnostic_information.hpp>
00011 #include <boost/date_time/posix_time/posix_time_duration.hpp>
00012 #include <pion/scheduler.hpp>
00013
00014 namespace pion {
00015
00016
00017
00018
00019 const boost::uint32_t scheduler::DEFAULT_NUM_THREADS = 8;
00020 const boost::uint32_t scheduler::NSEC_IN_SECOND = 1000000000;
00021 const boost::uint32_t scheduler::MICROSEC_IN_SECOND = 1000000;
00022 const boost::uint32_t scheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
00023
00024
00025
00026
00027 void scheduler::shutdown(void)
00028 {
00029
00030 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00031
00032 if (m_is_running) {
00033
00034 PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
00035
00036 while (m_active_users > 0) {
00037
00038 PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
00039 m_no_more_active_users.wait(scheduler_lock);
00040 }
00041
00042
00043 m_is_running = false;
00044 stop_services();
00045 stop_threads();
00046 finish_services();
00047 finish_threads();
00048
00049 PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
00050
00051
00052 m_scheduler_has_stopped.notify_all();
00053
00054 } else {
00055
00056
00057 stop_services();
00058 stop_threads();
00059 finish_services();
00060 finish_threads();
00061
00062
00063
00064 m_scheduler_has_stopped.notify_all();
00065 }
00066 }
00067
00068 void scheduler::join(void)
00069 {
00070 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00071 while (m_is_running) {
00072
00073 m_scheduler_has_stopped.wait(scheduler_lock);
00074 }
00075 }
00076
00077 void scheduler::keep_running(boost::asio::io_service& my_service,
00078 boost::asio::deadline_timer& my_timer)
00079 {
00080 if (m_is_running) {
00081
00082 my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
00083 my_timer.async_wait(boost::bind(&scheduler::keep_running, this,
00084 boost::ref(my_service), boost::ref(my_timer)));
00085 }
00086 }
00087
00088 void scheduler::add_active_user(void)
00089 {
00090 if (!m_is_running) startup();
00091 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00092 ++m_active_users;
00093 }
00094
00095 void scheduler::remove_active_user(void)
00096 {
00097 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00098 if (--m_active_users == 0)
00099 m_no_more_active_users.notify_all();
00100 }
00101
00102 boost::system_time scheduler::get_wakeup_time(boost::uint32_t sleep_sec,
00103 boost::uint32_t sleep_nsec)
00104 {
00105 return boost::get_system_time() + boost::posix_time::seconds(sleep_sec) + boost::posix_time::microseconds(sleep_nsec / 1000);
00106 }
00107
00108 void scheduler::process_service_work(boost::asio::io_service& service) {
00109 while (m_is_running) {
00110 try {
00111 service.run();
00112 } catch (std::exception& e) {
00113 PION_LOG_ERROR(m_logger, boost::diagnostic_information(e));
00114 } catch (...) {
00115 PION_LOG_ERROR(m_logger, "caught unrecognized exception");
00116 }
00117 }
00118 }
00119
00120
00121
00122
00123 void single_service_scheduler::startup(void)
00124 {
00125
00126 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00127
00128 if (! m_is_running) {
00129 PION_LOG_INFO(m_logger, "Starting thread scheduler");
00130 m_is_running = true;
00131
00132
00133 m_service.reset();
00134 keep_running(m_service, m_timer);
00135
00136
00137 for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00138 boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
00139 this, boost::ref(m_service)) ));
00140 m_thread_pool.push_back(new_thread);
00141 }
00142 }
00143 }
00144
00145
00146
00147
00148 void one_to_one_scheduler::startup(void)
00149 {
00150
00151 boost::mutex::scoped_lock scheduler_lock(m_mutex);
00152
00153 if (! m_is_running) {
00154 PION_LOG_INFO(m_logger, "Starting thread scheduler");
00155 m_is_running = true;
00156
00157
00158 while (m_service_pool.size() < m_num_threads) {
00159 boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type());
00160 m_service_pool.push_back(service_ptr);
00161 }
00162
00163
00164 for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
00165 keep_running((*i)->first, (*i)->second);
00166 }
00167
00168
00169 for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00170 boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
00171 this, boost::ref(m_service_pool[n]->first)) ));
00172 m_thread_pool.push_back(new_thread);
00173 }
00174 }
00175 }
00176
00177
00178 }