WModuleContainer.cpp 12.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
//---------------------------------------------------------------------------
//
// Project: OpenWalnut ( http://www.openwalnut.org )
//
// Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
// For more information see http://www.openwalnut.org/copying
//
// This file is part of OpenWalnut.
//
// OpenWalnut is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// OpenWalnut is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
//
//---------------------------------------------------------------------------

25
#include <list>
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
26
#include <set>
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
27
#include <vector>
28 29
#include <string>
#include <sstream>
30
#include <algorithm>
31 32 33

#include "WModule.h"
#include "exceptions/WModuleUninitialized.h"
34
#include "exceptions/WModuleAlreadyAssociated.h"
35
#include "exceptions/WModuleSignalSubscriptionFailed.h"
36
#include "../common/WLogger.h"
37
#include "../common/WThreadedRunner.h"
38
#include "WKernel.h"
39
#include "WModuleFactory.h"
40
#include "WModuleTypes.h"
41 42
#include "WModuleInputConnector.h"
#include "WModuleOutputConnector.h"
43
#include "WBatchLoader.h"
44

45 46
#include "../modules/data/WMData.h"

47 48
#include "WModuleContainer.h"

49
WModuleContainer::WModuleContainer( std::string name, std::string description ):
50
    WModule(),
51 52
    m_name( name ),
    m_description( description )
53
{
54
    WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_INFO );
55 56 57 58 59 60 61 62
    // initialize members
}

WModuleContainer::~WModuleContainer()
{
    // cleanup
}

63 64 65 66 67 68 69 70 71 72 73 74 75
void WModuleContainer::moduleMain()
{
    // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
    // Only set the ready flag.
    ready();
}

boost::shared_ptr< WModule > WModuleContainer::factory() const
{
    // this factory is not used actually.
    return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
}

76
void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
77
{
78 79
    WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
            "ModuleContainer (" + getName() + ")", LL_INFO );
80

81
    if ( !module->isInitialized()() )
82 83
    {
        std::ostringstream s;
84
        s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
85 86 87 88 89 90 91 92 93 94 95

        throw WModuleUninitialized( s.str() );
    }

    // already associated with this container?
    if ( module->getAssociatedContainer() == shared_from_this() )
    {
        return;
    }

    // is this module already associated?
96
    if ( module->isAssociated()() )
97 98 99 100 101 102 103 104
    {
        module->getAssociatedContainer()->remove( module );
    }

    // get write lock
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_moduleSetLock );
    m_modules.insert( module );
    lock.unlock();
105 106
    module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
    WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
107
            LL_INFO );
108 109 110

    // now module->isUsable() is true
    // -> so run it
111 112

    // connect default ready/error notifiers
113
    boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
114 115
    for ( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
    {
116
        module->subscribeSignal( WM_ERROR, ( *iter ) );
117
    }
118 119 120 121 122 123 124 125 126 127 128
    slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
    for ( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
    {
        // call associated notifier
        ( *iter )( module );
    }
    slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
    for ( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
    {
        module->subscribeSignal( WM_READY, ( *iter ) );
    }
129 130
    slock.unlock();

131 132 133
    // add the modules progress to local progress combiner
    m_progress->addSubProgress( module->getRootProgressCombiner() );

134 135 136 137 138
    // run it
    if ( run )
    {
        module->run();
    }
139 140 141 142
}

void WModuleContainer::remove( boost::shared_ptr< WModule > module )
{
143
    WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
144
            LL_DEBUG );
145

146 147 148 149 150
    if ( module->getAssociatedContainer() != shared_from_this() )
    {
        return;
    }

151
    // stop module
152
    WLogger::getLogger()->addLogMessage( "Waiting for module \"" + module->getName() + "\" to finish." , "ModuleContainer (" + getName() + ")",
153
            LL_DEBUG );
154 155
    module->wait( true );

156 157 158 159
    // get write lock
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_moduleSetLock );
    m_modules.erase( module );
    lock.unlock();
160
    module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
161

162
    // TODO(ebaum): remove signal subscriptions
163
    // TODO(ebaum): remove progress from combiner
164 165 166
    // TODO(ebaum): flat or deep removal? What to do with associated modules?
}

167
WModuleContainer::DataModuleListType WModuleContainer::getDataModules()
168 169 170 171 172 173 174 175 176 177 178 179 180 181
{
    DataModuleListType l;

    boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_moduleSetLock );

    // iterate module list
    for( std::set< boost::shared_ptr< WModule > >::iterator iter = m_modules.begin(); iter != m_modules.end(); ++iter )
    {
        // is this module a data module?
        if ( ( *iter )->getType() == MODULE_DATA )
        {
            boost::shared_ptr< WMData > dm = boost::shared_static_cast< WMData >( *iter );

            // now check the contained dataset ( isTexture and whether it is ready )
182
            if ( dm->isReady()() )
183 184 185 186 187 188 189 190 191 192 193 194
            {
                l.insert( dm );
            }
        }
    }
    slock.unlock();

    // now sort the list using the sorter

    return l;
}

195 196
void WModuleContainer::stop()
{
197
    WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
198 199 200

    // read lock
    boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
201 202
    for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
            ++listIter )
203 204 205 206 207
    {
        ( *listIter )->wait( true );
    }
    slock.unlock();

208
    WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
209 210

    // read lock
211
    slock = boost::shared_lock<boost::shared_mutex>( m_moduleSetLock );
212 213
    for( std::set< boost::shared_ptr< WModule > >::iterator listIter = m_modules.begin(); listIter != m_modules.end(); ++listIter )
    {
214
        WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
215
                "ModuleContainer (" + getName() + ")", LL_INFO );
216 217 218
        ( *listIter )->wait( true );
    }
    slock.unlock();
219 220 221 222 223

    // get write lock
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_moduleSetLock );
    m_modules.clear();
    lock.unlock();
224 225 226 227 228 229 230 231 232 233 234 235
}

const std::string WModuleContainer::getName() const
{
    return m_name;
}

const std::string WModuleContainer::getDescription() const
{
    return m_description;
}

236
void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
237
{
238 239 240
    boost::unique_lock<boost::shared_mutex> lock;
    switch (signal)
    {
241 242 243 244 245
        case WM_ASSOCIATED:
            lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
            m_associatedNotifiers.push_back( notifier );
            lock.unlock();
            break;
246
        case WM_READY:
247 248 249 250 251 252 253 254 255 256
            lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
            m_readyNotifiers.push_back( notifier );
            lock.unlock();
            break;
        default:
            std::ostringstream s;
            s << "Could not subscribe to unknown signal.";
            throw WModuleSignalSubscriptionFailed( s.str() );
            break;
    }
257 258
}

259
void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
260
{
261 262 263
    boost::unique_lock<boost::shared_mutex> lock;
    switch (signal)
    {
264
        case WM_ERROR:
265 266 267 268 269 270 271 272 273 274
            lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
            m_errorNotifiers.push_back( notifier );
            lock.unlock();
            break;
        default:
            std::ostringstream s;
            s << "Could not subscribe to unknown signal.";
            throw WModuleSignalSubscriptionFailed( s.str() );
            break;
    }
275 276
}

277
boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
278
{
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
    boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
    if ( tryOnly )
    {
        // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
        prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
        if( !prototype )
        {
            return prototype;
        }
    }
    else
    {
        prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
    }

    return applyModule( applyOn, prototype );
295 296
}

297 298
boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
                                                                         boost::shared_ptr< WModule > prototype )
299 300 301 302 303 304 305 306 307 308 309 310
{
    // is this module already associated with another container?
    if ( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
    {
        throw WModuleAlreadyAssociated( "The specified module \"" + applyOn->getName() + "\" is associated with another container." );
    }

    // create a new initialized instance of the module
    boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );

    // add it
    add( m, true );
311 312
    applyOn->isReady().wait();
    m->isReady().wait();
313

314 315 316 317 318 319 320
    // get offered outputs
    std::set<boost::shared_ptr<WModuleInputConnector> > ins = m->getInputConnectors();
    // get offered inputs
    std::set<boost::shared_ptr<WModuleOutputConnector> > outs = applyOn->getOutputConnectors();

    // TODO(ebaum): search best matching instead of simply connecting both
    ( *ins.begin() )->connect( ( *outs.begin() ) );
321 322 323 324

    return m;
}

325 326 327
boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames )
{
    // create thread which actually loads the data
328 329 330
    boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
                boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
    );
331 332 333 334 335 336 337
    t->run();
    return t;
}

void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames )
{
    // create thread which actually loads the data
338 339 340
    boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
                boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
    );
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
    t->run();
    t->wait();
}

void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
{
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
    m_pendingThreads.insert( thread );
    lock.unlock();
}

void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
{
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
    m_pendingThreads.erase( thread );
    lock.unlock();
}