Drizzled Public API Documentation

barrier.h
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2010 Brian Aker
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  * * Neither the name of the <organization> nor the
15  * names of its contributors may be used to endorse or promote products
16  * derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
22  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  *
29  */
30 
31 #include <boost/thread/mutex.hpp>
32 #include <boost/thread/condition_variable.hpp>
33 #include <boost/shared_ptr.hpp>
34 #include <boost/foreach.hpp>
35 
36 #include "observer.h"
37 
38 #pragma once
39 
40 /*
41  Barrier was designed with the following concepts.
42 
43  1) A barrier can be set with an initial limit which can be used such that if the limit is met, it releases all waiters.
44  2) A barrier can be released at any time, even if the limit is not met by an outside caller.
45  3) An observer can register itself to the barrier, it will wait until some predicate action releases it.
46  4) Observers are always released by limit, or in the case where the barrier is released or destroyed.
47  5) Observers should be held by copy, not by reference in order to allow for correct deletion.
48 
49  @todo while we do pass an owner type to a barrier currently, we make no usage of it, and instead we currently protect
50  poor use, namely the owner of a barrier calling wait() via the layer above. It may be a good idea to change this.
51 */
52 
53 namespace user_locks {
54 namespace barriers {
55 
56 // Barrier starts in a blocking posistion
57 class Barrier {
58 public:
59  typedef boost::shared_ptr<Barrier> shared_ptr;
60 
61  Barrier(drizzled::session_id_t owner_arg) :
62  owner(owner_arg),
63  limit(0),
64  current_wait(0),
65  generation(0)
66  { }
67 
68  Barrier(drizzled::session_id_t owner_arg, int64_t limit_arg) :
69  owner(owner_arg),
70  limit(limit_arg),
71  current_wait(limit),
72  generation(0)
73  {
74  }
75 
76  ~Barrier()
77  {
78  wakeAll();
79  }
80 
81  // Signal all of the observers to start
82  void signal()
83  {
84  boost::mutex::scoped_lock scopedBarrier(sleeper_mutex);
85  wakeAll();
86  }
87 
88  drizzled::session_id_t getOwner() const
89  {
90  return owner;
91  }
92 
93  void wait()
94  {
95  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
96  int64_t my_generation= generation;
97 
98  --current_wait;
99  if (limit)
100  {
101  if (not current_wait)
102  {
103  wakeAll();
104 
105  return;
106  }
107 
108  }
109  checkObservers();
110 
111  // If we are interrupted we remove ourself from the list, and check on
112  // the observers.
113  try
114  {
115  while (my_generation == generation)
116  {
117  sleep_threshhold.wait(sleeper_mutex);
118  }
119  }
120  catch(boost::thread_interrupted const& error)
121  {
122  current_wait++;
123  checkObservers();
124  }
125  }
126 
127  // A call to either signal or a release will cause wait_for() to continue
128  void wait_until(int64_t wait_until_arg)
129  {
130  Observer::shared_ptr observer;
131  {
132  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
133 
134  if (wait_until_arg <= count())
135  return;
136 
137  observer.reset(new Observer(wait_until_arg));
138  observers.push_back(observer);
139  }
140 
141  try {
142  observer->sleep();
143  }
144  catch(boost::thread_interrupted const& error)
145  {
146  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
147  // Someone has interrupted us, we now try to remove ourself from the
148  // observer chain ourself
149 
150  observers.remove(observer);
151 
152  throw error;
153  }
154  }
155 
156  void wait(int64_t generation_arg)
157  {
158  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
159  int64_t my_generation= generation;
160 
161  // If the generation is newer then we just return immediatly
162  if (my_generation > generation_arg)
163  return;
164 
165  --current_wait;
166 
167  if (limit)
168  {
169  if (not current_wait)
170  {
171  wakeAll();
172  return;
173  }
174 
175  }
176 
177  while (my_generation == generation)
178  {
179  sleep_threshhold.wait(sleeper_mutex);
180  }
181  }
182 
183  int64_t getGeneration()
184  {
185  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
186  return generation;
187  }
188 
189  int64_t sizeObservers()
190  {
191  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
192  return static_cast<int64_t>(observers.size());
193  }
194 
195  int64_t sizeWaiters()
196  {
197  boost::mutex::scoped_lock scopedLock(sleeper_mutex);
198  return count();
199  }
200 
201  int64_t getLimit() const
202  {
203  return limit;
204  }
205 
206 private:
207  void wakeAll()
208  {
209  generation++;
210  current_wait= limit;
211  sleep_threshhold.notify_all();
212 
213  checkObservers();
214  }
215 
216  struct isReady : public std::unary_function<Observer::list::const_reference, bool>
217  {
218  const int64_t count;
219 
220  isReady(int64_t arg) :
221  count(arg)
222  { }
223 
224  result_type operator() (argument_type observer)
225  {
226  if (observer->getLimit() <= count or count == 0)
227  {
228  observer->wake();
229  return true;
230  }
231 
232  return false;
233  }
234  };
235 
236  void checkObservers()
237  {
238  observers.remove_if(isReady(count()));
239  }
240 
241  int64_t count() const
242  {
243  if (limit)
244  {
245  return limit - current_wait;
246  }
247  return std::abs(static_cast<long int>(current_wait));
248  }
249 
250 
251  drizzled::session_id_t owner;
252 
253  const int64_t limit;
254  int64_t current_wait;
255  int64_t generation;
256 
257  Observer::list observers;
258 
259  boost::mutex sleeper_mutex;
260  boost::condition_variable_any sleep_threshhold;
261 
262 };
263 
264 } // namespace barriers
265 } // namespace user_locks
266