//--------------------------------------------------------------------------- // // Project: OpenWalnut ( http://www.openwalnut.org ) // // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS // For more information see http://www.openwalnut.org/copying // // This file is part of OpenWalnut. // // OpenWalnut is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // OpenWalnut is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with OpenWalnut. If not, see . // //--------------------------------------------------------------------------- #include #include #include #include #include #include #include #include "WModule.h" #include "exceptions/WModuleUninitialized.h" #include "exceptions/WModuleAlreadyAssociated.h" #include "exceptions/WModuleSignalSubscriptionFailed.h" #include "../common/WLogger.h" #include "../common/WThreadedRunner.h" #include "WKernel.h" #include "WModuleFactory.h" #include "WModuleTypes.h" #include "WModuleInputConnector.h" #include "WModuleOutputConnector.h" #include "WBatchLoader.h" #include "../modules/data/WMData.h" #include "WModuleContainer.h" WModuleContainer::WModuleContainer( std::string name, std::string description ): WModule(), m_moduleAccess( m_modules.getAccessObject() ), m_name( name ), m_description( description ), m_crashIfModuleCrashes( true ), m_moduleSubscriptionsAccess( m_moduleSubscriptions.getAccessObject() ) { WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_INFO ); // initialize members } WModuleContainer::~WModuleContainer() { // cleanup } void WModuleContainer::moduleMain() { // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it. // Only set the ready flag. ready(); } boost::shared_ptr< WModule > WModuleContainer::factory() const { // this factory is not used actually. return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) ); } void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run ) { WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." , "ModuleContainer (" + getName() + ")", LL_INFO ); if ( !module->isInitialized()() ) { std::ostringstream s; s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized."; throw WModuleUninitialized( s.str() ); } // already associated with this container? if ( module->getAssociatedContainer() == shared_from_this() ) { return; } // is this module already associated? if ( module->isAssociated()() ) { module->getAssociatedContainer()->remove( module ); } // get write lock m_moduleAccess->beginWrite(); m_moduleAccess->get().insert( module ); m_moduleAccess->endWrite(); module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) ); WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")", LL_INFO ); // now module->isUsable() is true // Connect the error handler and all default handlers: m_moduleSubscriptionsAccess->beginWrite(); // connect the containers signal handler explicitly t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 ); boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func ); m_moduleSubscriptionsAccess->get().insert( ModuleSubscription( module, signalCon ) ); // connect default ready/error notifiers boost::shared_lock slock = boost::shared_lock( m_errorNotifiersLock ); for ( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter) { signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) ); m_moduleSubscriptionsAccess->get().insert( ModuleSubscription( module, signalCon ) ); } slock = boost::shared_lock( m_associatedNotifiersLock ); for ( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter) { // call associated notifier ( *iter )( module ); } slock = boost::shared_lock( m_readyNotifiersLock ); for ( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter) { signalCon = module->subscribeSignal( WM_READY, ( *iter ) ); m_moduleSubscriptionsAccess->get().insert( ModuleSubscription( module, signalCon ) ); } slock.unlock(); m_moduleSubscriptionsAccess->endWrite(); // add the modules progress to local progress combiner m_progress->addSubProgress( module->getRootProgressCombiner() ); // run it if ( run ) { module->run(); } } void WModuleContainer::remove( boost::shared_ptr< WModule > module ) { // simple flat removal. WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")", LL_DEBUG ); if ( module->getAssociatedContainer() != shared_from_this() ) { return; } // remove connections inside this container module->disconnect(); // remove progress combiner m_progress->removeSubProgress( module->getRootProgressCombiner() ); // remove signal subscriptions to this containers default notifiers m_moduleSubscriptionsAccess->beginWrite(); // find all subscriptions for this module std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = m_moduleSubscriptionsAccess->get().equal_range( module ); for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it ) { // disconnect subscription. ( *it ).second.disconnect(); } // erase them m_moduleSubscriptionsAccess->get().erase( subscriptions.first, subscriptions.second ); m_moduleSubscriptionsAccess->endWrite(); // get write lock m_moduleAccess->beginWrite(); m_moduleAccess->get().erase( module ); m_moduleAccess->endWrite(); module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); } WModuleContainer::DataModuleListType WModuleContainer::getDataModules() { DataModuleListType l; m_moduleAccess->beginRead(); // iterate module list for( ModuleConstIterator iter = m_moduleAccess->get().begin(); iter != m_moduleAccess->get().end(); ++iter ) { // is this module a data module? if ( ( *iter )->getType() == MODULE_DATA ) { boost::shared_ptr< WMData > dm = boost::shared_static_cast< WMData >( *iter ); // now check the contained dataset ( isTexture and whether it is ready ) if ( dm->isReady()() ) { l.insert( dm ); } } } m_moduleAccess->endRead(); // now sort the list using the sorter return l; } void WModuleContainer::stop() { WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO ); // read lock boost::shared_lock slock = boost::shared_lock( m_pendingThreadsLock ); for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end(); ++listIter ) { ( *listIter )->wait( true ); } slock.unlock(); WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO ); // read lock m_moduleAccess->beginRead(); for( ModuleConstIterator listIter = m_moduleAccess->get().begin(); listIter != m_moduleAccess->get().end(); ++listIter ) { WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." , "ModuleContainer (" + getName() + ")", LL_INFO ); ( *listIter )->wait( true ); ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); // remove last refs to this container inside the module } m_moduleAccess->endRead(); // get write lock m_moduleAccess->beginWrite(); m_moduleAccess->get().clear(); m_moduleAccess->endWrite(); } const std::string WModuleContainer::getName() const { return m_name; } const std::string WModuleContainer::getDescription() const { return m_description; } void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier ) { boost::unique_lock lock; switch (signal) { case WM_ASSOCIATED: lock = boost::unique_lock( m_associatedNotifiersLock ); m_associatedNotifiers.push_back( notifier ); lock.unlock(); break; case WM_READY: lock = boost::unique_lock( m_readyNotifiersLock ); m_readyNotifiers.push_back( notifier ); lock.unlock(); break; default: std::ostringstream s; s << "Could not subscribe to unknown signal."; throw WModuleSignalSubscriptionFailed( s.str() ); break; } } void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier ) { boost::unique_lock lock; switch (signal) { case WM_ERROR: lock = boost::unique_lock( m_errorNotifiersLock ); m_errorNotifiers.push_back( notifier ); lock.unlock(); break; default: std::ostringstream s; s << "Could not subscribe to unknown signal."; throw WModuleSignalSubscriptionFailed( s.str() ); break; } } boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly ) { boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >(); if ( tryOnly ) { // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what ); if( !prototype ) { return prototype; } } else { prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what ); } return applyModule( applyOn, prototype ); } boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, boost::shared_ptr< WModule > prototype ) { // is this module already associated with another container? if ( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) ) { throw WModuleAlreadyAssociated( "The specified module \"" + applyOn->getName() + "\" is associated with another container." ); } // create a new initialized instance of the module boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype ); // add it add( m, true ); applyOn->isReadyOrCrashed().wait(); m->isReadyOrCrashed().wait(); // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here. // get offered outputs std::set > ins = m->getInputConnectors(); // get offered inputs std::set > outs = applyOn->getOutputConnectors(); // TODO(ebaum): search best matching instead of simply connecting both if ( !ins.empty() && !outs.empty() ) { ( *ins.begin() )->connect( ( *outs.begin() ) ); } return m; } boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames ) { // create thread which actually loads the data boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames, boost::shared_static_cast< WModuleContainer >( shared_from_this() ) ) ); t->run(); return t; } void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames ) { // create thread which actually loads the data boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames, boost::shared_static_cast< WModuleContainer >( shared_from_this() ) ) ); t->run(); t->wait(); } void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread ) { boost::unique_lock lock = boost::unique_lock( m_pendingThreadsLock ); m_pendingThreads.insert( thread ); lock.unlock(); } void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread ) { boost::unique_lock lock = boost::unique_lock( m_pendingThreadsLock ); m_pendingThreads.erase( thread ); lock.unlock(); } void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception ) { errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container."; // simply forward it to the other signal handler signal_error( module, exception ); if ( m_crashIfModuleCrashes ) { infoLog() << "Crash caused this container to shutdown."; requestStop(); m_isCrashed( true ); } } void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed ) { m_crashIfModuleCrashes = crashIfCrashed; } WModuleContainer::ModuleSharedContainerType::WSharedAccess WModuleContainer::getAccessObject() { return m_moduleAccess; }