WModuleContainer.cpp 14.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
//---------------------------------------------------------------------------
//
// Project: OpenWalnut ( http://www.openwalnut.org )
//
// Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
// For more information see http://www.openwalnut.org/copying
//
// This file is part of OpenWalnut.
//
// OpenWalnut is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// OpenWalnut is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
//
//---------------------------------------------------------------------------

25
#include <list>
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
26
#include <set>
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
27
#include <vector>
28 29
#include <string>
#include <sstream>
30
#include <algorithm>
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
31
#include <utility>
32 33 34

#include "WModule.h"
#include "exceptions/WModuleUninitialized.h"
35
#include "exceptions/WModuleAlreadyAssociated.h"
36
#include "exceptions/WModuleSignalSubscriptionFailed.h"
37
#include "../common/WLogger.h"
38
#include "../common/WThreadedRunner.h"
39
#include "WKernel.h"
40
#include "WModuleFactory.h"
41
#include "WModuleTypes.h"
42 43
#include "WModuleInputConnector.h"
#include "WModuleOutputConnector.h"
44
#include "WBatchLoader.h"
45

46 47
#include "../modules/data/WMData.h"

48 49
#include "WModuleContainer.h"

50
WModuleContainer::WModuleContainer( std::string name, std::string description ):
51
    WModule(),
52
    m_moduleAccess( m_modules.getAccessObject() ),
53
    m_name( name ),
54
    m_description( description ),
55 56
    m_crashIfModuleCrashes( true ),
    m_moduleSubscriptionsAccess( m_moduleSubscriptions.getAccessObject() )
57
{
58
    WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_INFO );
59 60 61 62 63 64 65 66
    // initialize members
}

WModuleContainer::~WModuleContainer()
{
    // cleanup
}

67 68 69 70 71 72 73 74 75 76 77 78 79
void WModuleContainer::moduleMain()
{
    // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
    // Only set the ready flag.
    ready();
}

boost::shared_ptr< WModule > WModuleContainer::factory() const
{
    // this factory is not used actually.
    return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
}

80
void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
81
{
82 83
    WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
            "ModuleContainer (" + getName() + ")", LL_INFO );
84

85
    if ( !module->isInitialized()() )
86 87
    {
        std::ostringstream s;
88
        s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
89 90 91 92 93 94 95 96 97 98 99

        throw WModuleUninitialized( s.str() );
    }

    // already associated with this container?
    if ( module->getAssociatedContainer() == shared_from_this() )
    {
        return;
    }

    // is this module already associated?
100
    if ( module->isAssociated()() )
101 102 103 104 105
    {
        module->getAssociatedContainer()->remove( module );
    }

    // get write lock
106 107 108 109
    m_moduleAccess->beginWrite();
    m_moduleAccess->get().insert( module );
    m_moduleAccess->endWrite();

110 111
    module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
    WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
112
            LL_INFO );
113 114

    // now module->isUsable() is true
115 116 117

    // Connect the error handler and all default handlers:
    m_moduleSubscriptionsAccess->beginWrite();
118

119 120
    // connect the containers signal handler explicitly
    t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
121 122
    boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
    m_moduleSubscriptionsAccess->get().insert( ModuleSubscription( module, signalCon ) );
123

124
    // connect default ready/error notifiers
125
    boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
126 127
    for ( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
    {
128 129
        signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
        m_moduleSubscriptionsAccess->get().insert( ModuleSubscription( module, signalCon ) );
130
    }
131 132 133 134 135 136 137 138 139
    slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
    for ( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
    {
        // call associated notifier
        ( *iter )( module );
    }
    slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
    for ( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
    {
140 141
        signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
        m_moduleSubscriptionsAccess->get().insert( ModuleSubscription( module, signalCon ) );
142
    }
143
    slock.unlock();
144
    m_moduleSubscriptionsAccess->endWrite();
145

146 147 148
    // add the modules progress to local progress combiner
    m_progress->addSubProgress( module->getRootProgressCombiner() );

149 150 151 152 153
    // run it
    if ( run )
    {
        module->run();
    }
154 155 156 157
}

void WModuleContainer::remove( boost::shared_ptr< WModule > module )
{
158 159
    // simple flat removal.

160
    WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
161
            LL_DEBUG );
162

163 164 165 166 167
    if ( module->getAssociatedContainer() != shared_from_this() )
    {
        return;
    }

168 169
    // remove connections inside this container
    module->disconnect();
170

171 172 173
    // remove progress combiner
    m_progress->removeSubProgress( module->getRootProgressCombiner() );

174 175 176 177 178 179 180 181 182 183 184 185 186
    // remove signal subscriptions to this containers default notifiers
    m_moduleSubscriptionsAccess->beginWrite();

    // find all subscriptions for this module
    std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = m_moduleSubscriptionsAccess->get().equal_range( module );
    for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
    {
        // disconnect subscription.
        ( *it ).second.disconnect();
    }
    // erase them
    m_moduleSubscriptionsAccess->get().erase( subscriptions.first, subscriptions.second );
    m_moduleSubscriptionsAccess->endWrite();
187

188
    // get write lock
189 190 191 192
    m_moduleAccess->beginWrite();
    m_moduleAccess->get().erase( module );
    m_moduleAccess->endWrite();

193
    module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
194 195
}

196
WModuleContainer::DataModuleListType WModuleContainer::getDataModules()
197 198 199
{
    DataModuleListType l;

200
    m_moduleAccess->beginRead();
201 202

    // iterate module list
203
    for( ModuleConstIterator iter = m_moduleAccess->get().begin(); iter != m_moduleAccess->get().end(); ++iter )
204 205 206 207 208 209 210
    {
        // is this module a data module?
        if ( ( *iter )->getType() == MODULE_DATA )
        {
            boost::shared_ptr< WMData > dm = boost::shared_static_cast< WMData >( *iter );

            // now check the contained dataset ( isTexture and whether it is ready )
211
            if ( dm->isReady()() )
212 213 214 215 216
            {
                l.insert( dm );
            }
        }
    }
217
    m_moduleAccess->endRead();
218 219 220 221 222 223

    // now sort the list using the sorter

    return l;
}

224 225
void WModuleContainer::stop()
{
226
    WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
227 228 229

    // read lock
    boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
Sebastian Eichelbaum's avatar
[STYLE]  
Sebastian Eichelbaum committed
230 231
    for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
            ++listIter )
232 233 234 235 236
    {
        ( *listIter )->wait( true );
    }
    slock.unlock();

237
    WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
238 239

    // read lock
240 241
    m_moduleAccess->beginRead();
    for( ModuleConstIterator listIter = m_moduleAccess->get().begin(); listIter != m_moduleAccess->get().end(); ++listIter )
242
    {
243
        WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
244
                "ModuleContainer (" + getName() + ")", LL_INFO );
245
        ( *listIter )->wait( true );
246
        ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );   // remove last refs to this container inside the module
247
    }
248
    m_moduleAccess->endRead();
249 250

    // get write lock
251 252 253
    m_moduleAccess->beginWrite();
    m_moduleAccess->get().clear();
    m_moduleAccess->endWrite();
254 255 256 257 258 259 260 261 262 263 264 265
}

const std::string WModuleContainer::getName() const
{
    return m_name;
}

const std::string WModuleContainer::getDescription() const
{
    return m_description;
}

266
void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
267
{
268 269 270
    boost::unique_lock<boost::shared_mutex> lock;
    switch (signal)
    {
271 272 273 274 275
        case WM_ASSOCIATED:
            lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
            m_associatedNotifiers.push_back( notifier );
            lock.unlock();
            break;
276
        case WM_READY:
277 278 279 280 281 282 283 284 285 286
            lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
            m_readyNotifiers.push_back( notifier );
            lock.unlock();
            break;
        default:
            std::ostringstream s;
            s << "Could not subscribe to unknown signal.";
            throw WModuleSignalSubscriptionFailed( s.str() );
            break;
    }
287 288
}

289
void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
290
{
291 292 293
    boost::unique_lock<boost::shared_mutex> lock;
    switch (signal)
    {
294
        case WM_ERROR:
295 296 297 298 299 300 301 302 303 304
            lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
            m_errorNotifiers.push_back( notifier );
            lock.unlock();
            break;
        default:
            std::ostringstream s;
            s << "Could not subscribe to unknown signal.";
            throw WModuleSignalSubscriptionFailed( s.str() );
            break;
    }
305 306
}

307
boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
308
{
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
    boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
    if ( tryOnly )
    {
        // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
        prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
        if( !prototype )
        {
            return prototype;
        }
    }
    else
    {
        prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
    }

    return applyModule( applyOn, prototype );
325 326
}

327 328
boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
                                                                         boost::shared_ptr< WModule > prototype )
329 330 331 332 333 334 335 336 337 338 339 340
{
    // is this module already associated with another container?
    if ( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
    {
        throw WModuleAlreadyAssociated( "The specified module \"" + applyOn->getName() + "\" is associated with another container." );
    }

    // create a new initialized instance of the module
    boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );

    // add it
    add( m, true );
341 342 343 344 345
    applyOn->isReadyOrCrashed().wait();
    m->isReadyOrCrashed().wait();

    // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
    // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
346

347 348 349 350 351 352
    // get offered outputs
    std::set<boost::shared_ptr<WModuleInputConnector> > ins = m->getInputConnectors();
    // get offered inputs
    std::set<boost::shared_ptr<WModuleOutputConnector> > outs = applyOn->getOutputConnectors();

    // TODO(ebaum): search best matching instead of simply connecting both
353 354 355 356
    if ( !ins.empty() && !outs.empty() )
    {
        ( *ins.begin() )->connect( ( *outs.begin() ) );
    }
357 358 359 360

    return m;
}

361 362 363
boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames )
{
    // create thread which actually loads the data
364 365 366
    boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
                boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
    );
367 368 369 370 371 372 373
    t->run();
    return t;
}

void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames )
{
    // create thread which actually loads the data
374 375 376
    boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
                boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
    );
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
    t->run();
    t->wait();
}

void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
{
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
    m_pendingThreads.insert( thread );
    lock.unlock();
}

void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
{
    boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
    m_pendingThreads.erase( thread );
    lock.unlock();
}

395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
{
    errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";

    // simply forward it to the other signal handler
    signal_error( module, exception );

    if ( m_crashIfModuleCrashes )
    {
        infoLog() << "Crash caused this container to shutdown.";
        requestStop();
        m_isCrashed( true );
    }
}

void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
{
    m_crashIfModuleCrashes = crashIfCrashed;
}

415 416 417 418 419
WModuleContainer::ModuleSharedContainerType::WSharedAccess WModuleContainer::getAccessObject()
{
    return m_moduleAccess;
}