OpenWalnut  1.3.1
WModuleContainer.cpp
1 //---------------------------------------------------------------------------
2 //
3 // Project: OpenWalnut ( http://www.openwalnut.org )
4 //
5 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
6 // For more information see http://www.openwalnut.org/copying
7 //
8 // This file is part of OpenWalnut.
9 //
10 // OpenWalnut is free software: you can redistribute it and/or modify
11 // it under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 3 of the License, or
13 // (at your option) any later version.
14 //
15 // OpenWalnut is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
22 //
23 //---------------------------------------------------------------------------
24 
25 #include <list>
26 #include <set>
27 #include <vector>
28 #include <string>
29 #include <sstream>
30 #include <algorithm>
31 #include <utility>
32 
33 #include "../common/WLogger.h"
34 #include "../common/WThreadedRunner.h"
35 #include "../common/exceptions/WSignalSubscriptionFailed.h"
36 #include "WBatchLoader.h"
37 #include "WModuleCombiner.h"
38 #include "WModuleFactory.h"
39 #include "WModuleInputConnector.h"
40 #include "WModuleOutputConnector.h"
41 #include "WModuleTypes.h"
42 #include "combiner/WApplyCombiner.h"
43 #include "exceptions/WModuleAlreadyAssociated.h"
44 #include "exceptions/WModuleUninitialized.h"
45 #include "WDataModule.h"
46 
47 #include "WModuleContainer.h"
48 
49 WModuleContainer::WModuleContainer( std::string name, std::string description ):
50  WModule(),
51  m_name( name ),
52  m_description( description ),
53  m_crashIfModuleCrashes( true )
54 {
55  WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
56  // initialize members
57 }
58 
60 {
61  // cleanup
62 }
63 
65 {
66  // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
67  // Only set the ready flag.
68  ready();
69 }
70 
71 boost::shared_ptr< WModule > WModuleContainer::factory() const
72 {
73  // this factory is not used actually.
74  return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
75 }
76 
77 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
78 {
79  if( !module )
80  {
81  // just ignore NULL Pointer
82  return;
83  }
84 
85  WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
86  "ModuleContainer (" + getName() + ")", LL_INFO );
87 
88  if( !module->isInitialized()() )
89  {
90  std::ostringstream s;
91  s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
92 
93  throw WModuleUninitialized( s.str() );
94  }
95 
96  // already associated with this container?
97  if( module->getAssociatedContainer() == shared_from_this() )
98  {
99  WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
100  "ModuleContainer (" + getName() + ")", LL_INFO );
101  return;
102  }
103 
104  // is this module already associated?
105  if( module->isAssociated()() )
106  {
107  module->getAssociatedContainer()->remove( module );
108  }
109 
110  // get write lock
112  wlock->get().insert( module );
113  wlock.reset();
114 
115  module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
116  WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
117  LL_INFO );
118 
119  // now module->isUsable() is true
120 
121  // Connect the error handler and all default handlers:
123 
124  // connect the containers signal handler explicitly
125  t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
126  boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
127  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
128 
129  // connect default notifiers:
130  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
131  for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
132  {
133  signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
134  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
135  }
136  slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
137  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
138  {
139  // call associated notifier
140  ( *iter )( module );
141  }
142  slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
143  for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
144  iter != m_connectorEstablishedNotifiers.end(); ++iter )
145  {
146  // subscribe on each input
147  for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
148  {
149  signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
150  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
151  }
152  }
153  for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
154  iter != m_connectorClosedNotifiers.end(); ++iter )
155  {
156  // subscribe on each input
157  for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
158  {
159  signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
160  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
161  }
162  }
163  slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
164  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
165  {
166  signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
167  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
168  }
169  slock.unlock();
170 
171  // free the subscriptions lock
172  subscriptionsLock.reset();
173 
174  // add the modules progress to local progress combiner
175  m_progress->addSubProgress( module->getRootProgressCombiner() );
176 
177  // run it
178  if( run )
179  {
180  module->run();
181  }
182 }
183 
185 {
187  WModuleFactory::getModuleFactory()->getPrototypeByName( name )
188  );
189 
190  // add to the container
191  add( module );
192  module->isReady().wait();
193 
194  return module;
195 }
196 
197 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
198 {
199  // simple flat removal.
200 
201  WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
202  LL_DEBUG );
203 
204  if( module->getAssociatedContainer() != shared_from_this() )
205  {
206  return;
207  }
208 
209  // remove connections inside this container
210  module->disconnect();
211 
212  // remove progress combiner
213  m_progress->removeSubProgress( module->getRootProgressCombiner() );
214 
215  // remove signal subscriptions to this containers default notifiers
217 
218  // find all subscriptions for this module
219  std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
220  for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
221  {
222  // disconnect subscription.
223  ( *it ).second.disconnect();
224  }
225  // erase them
226  subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
227  subscriptionsLock.reset();
228 
229  // get write lock
231  wlock->get().erase( module );
232  wlock.reset();
233 
234  module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
235 
236  // tell all interested about removal
237  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
238  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
239  {
240  // call associated notifier
241  ( *iter )( module );
242  }
243  slock.unlock();
244 }
245 
247 {
249 
250  // lock, unlocked if l looses focus
252 
253  // iterate module list
254  for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
255  {
256  // is this module a data module?
257  if( ( *iter )->getType() == MODULE_DATA )
258  {
259  boost::shared_ptr< WDataModule > dm = boost::shared_static_cast< WDataModule >( *iter );
260 
261  // now check the contained dataset ( isTexture and whether it is ready )
262  if( dm->isReady()() )
263  {
264  l.insert( dm );
265  }
266  }
267  }
268 
269  return l;
270 }
271 
273 {
274  WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
275 
276  // read lock
277  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
278  for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
279  ++listIter )
280  {
281  ( *listIter )->wait( true );
282  }
283  slock.unlock();
284 
285  WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
286 
287  // lock, unlocked if l looses focus
289 
290  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
291  {
292  WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
293  "ModuleContainer (" + getName() + ")", LL_INFO );
294  ( *listIter )->wait( true );
295  ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); // remove last refs to this container inside the module
296  }
297  lock.reset();
298 
299  // get write lock
300  // lock, unlocked if l looses focus
302  wlock->get().clear();
303 }
304 
305 const std::string WModuleContainer::getName() const
306 {
307  return m_name;
308 }
309 
310 const std::string WModuleContainer::getDescription() const
311 {
312  return m_description;
313 }
314 
315 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
316 {
317  boost::unique_lock<boost::shared_mutex> lock;
318  switch( signal)
319  {
320  case WM_ASSOCIATED:
321  lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
322  m_associatedNotifiers.push_back( notifier );
323  lock.unlock();
324  break;
325  case WM_READY:
326  lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
327  m_readyNotifiers.push_back( notifier );
328  lock.unlock();
329  break;
330  case WM_REMOVED:
331  lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
332  m_removedNotifiers.push_back( notifier );
333  lock.unlock();
334  break;
335  default:
336  std::ostringstream s;
337  s << "Could not subscribe to unknown signal.";
338  throw WSignalSubscriptionFailed( s.str() );
339  break;
340  }
341 }
342 
343 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
344 {
345  boost::unique_lock<boost::shared_mutex> lock;
346  switch( signal)
347  {
348  case WM_ERROR:
349  lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
350  m_errorNotifiers.push_back( notifier );
351  lock.unlock();
352  break;
353  default:
354  std::ostringstream s;
355  s << "Could not subscribe to unknown signal.";
356  throw WSignalSubscriptionFailed( s.str() );
357  break;
358  }
359 }
360 
361 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
362 {
363  boost::unique_lock<boost::shared_mutex> lock;
364  switch( signal)
365  {
366  case CONNECTION_ESTABLISHED:
367  lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
368  m_connectorEstablishedNotifiers.push_back( notifier );
369  lock.unlock();
370  break;
371  case CONNECTION_CLOSED:
372  lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
373  m_connectorClosedNotifiers.push_back( notifier );
374  lock.unlock();
375  break;
376  default:
377  std::ostringstream s;
378  s << "Could not subscribe to unknown signal.";
379  throw WSignalSubscriptionFailed( s.str() );
380  break;
381  }
382 }
383 
384 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
385 {
386  boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
387  if( tryOnly )
388  {
389  // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
390  prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
391  if( !prototype )
392  {
393  return prototype;
394  }
395  }
396  else
397  {
398  prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
399  }
400 
401  return applyModule( applyOn, prototype );
402 }
403 
404 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
405  boost::shared_ptr< WModule > prototype )
406 {
407  // is this module already associated with another container?
408  if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
409  {
410  throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
411  std::string( "\" is associated with another container." ) );
412  }
413 
414  // create a new initialized instance of the module
415  boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
416 
417  // add it
418  add( m, true );
419  applyOn->isReadyOrCrashed().wait();
420  m->isReadyOrCrashed().wait();
421 
422  // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
423  // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
424 
425  // get offered outputs
426  WModule::InputConnectorList ins = m->getInputConnectors();
427  // get offered inputs
428  WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
429 
430  // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners.
431  if( !ins.empty() && !outs.empty() )
432  {
433  ( *ins.begin() )->connect( ( *outs.begin() ) );
434  }
435 
436  return m;
437 }
438 
439 WBatchLoader::SPtr WModuleContainer::loadDataSets( std::vector< std::string > filenames, bool suppressColormaps )
440 {
441  // create thread which actually loads the data
442  boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( filenames,
443  boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
444  );
445  t->setSuppressColormaps( suppressColormaps );
446  t->run();
447  return t;
448 }
449 
450 WBatchLoader::SPtr WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > filenames, bool suppressColormaps )
451 {
452  // create thread which actually loads the data
453  boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( filenames,
454  boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
455  );
456  t->setSuppressColormaps( suppressColormaps );
457  t->run();
458  t->wait();
459  return t;
460 }
461 
462 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
463 {
464  boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
465  m_pendingThreads.insert( thread );
466  lock.unlock();
467 }
468 
469 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
470 {
471  boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
472  m_pendingThreads.erase( thread );
473  lock.unlock();
474 }
475 
476 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
477 {
478  errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
479 
480  // simply forward it to the other signal handler
481  signal_error( module, exception );
482 
484  {
485  infoLog() << "Crash caused this container to shutdown.";
486  requestStop();
487  m_isCrashed( true );
488  }
489 }
490 
491 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
492 {
493  m_crashIfModuleCrashes = crashIfCrashed;
494 }
495 
497 {
498  return m_modules.getReadTicket();
499 }
500 
502 {
503  // get the list of all first.
505 
506  // put results in here
508 
509  // handle each module
510  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
511  {
512  // check name
513  if( name == ( *listIter )->getName() )
514  {
515  result.push_back( ( *listIter ) );
516  }
517  }
518 
519  return result;
520 }
521 
522 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
523 {
524  WCombinerTypes::WCompatiblesList complist;
525 
526  if( !module )
527  {
528  // be nice in case of a null pointer
529  return complist;
530  }
531 
532  // read lock the container
534 
535  // handle each module
536  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
537  {
538  WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
539 
540  if( lComp.size() != 0 )
541  {
542  complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
543  }
544  }
545 
546  // sort the compatibles
547  std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
548 
549  return complist;
550 }
551