Commit 4fa44dcf authored by Mathias Goldau's avatar Mathias Goldau
Browse files

[FIX] Now the WThreadedPerVoxelOperation is able to use all cores, due to...

[FIX] Now the WThreadedPerVoxelOperation is able to use all cores, due to improved partitioning method (the simplest you could think of: just striping)
parent 2071941a
......@@ -25,12 +25,14 @@
#ifndef WTHREADEDJOBS_H
#define WTHREADEDJOBS_H
#include <iostream>
#include <string>
#include <boost/shared_ptr.hpp>
#include "WException.h"
#include "WFlag.h"
#include "WLogger.h"
/**
* \class WThreadedJobs
......@@ -117,7 +119,7 @@ WThreadedJobs< Input_T, Job_T >::~WThreadedJobs()
}
template< class Input_T, class Job_T >
void WThreadedJobs< Input_T, Job_T >::operator() ( std::size_t, std::size_t, WBoolFlag const& shutdown )
void WThreadedJobs< Input_T, Job_T >::operator() ( std::size_t /* id */, std::size_t /* numThreads */, WBoolFlag const& shutdown )
{
JobType job;
while( getJob( job ) && !shutdown() )
......@@ -126,4 +128,90 @@ void WThreadedJobs< Input_T, Job_T >::operator() ( std::size_t, std::size_t, WBo
}
}
/**
* Nearly the same class as WThreadedJobs, but this class is intended to be used for multithreaded operations on voxels and therefore it
* uses Striping to partition the data. This is necessarry since if the threads are not operating on blocks, they slow down!
*/
template< class Input_T, class Job_T >
class WThreadedStripingJobs
{
public:
//! the input type
typedef Input_T InputType;
//! the job type
typedef Job_T JobType;
/**
* Constructor.
*
* \param input The input.
*/
WThreadedStripingJobs( boost::shared_ptr< InputType const > input ); // NOLINT
/**
* Destructor.
*/
virtual ~WThreadedStripingJobs();
/**
* The threaded function operation. Pulls jobs and executes the \see compute()
* function.
*
* \param id The thread's ID.
* \param numThreads How many threads are working on the jobs.
* \param shutdown A shared flag indicating the thread should be stopped.
*/
void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
/**
* Abstract function that performs the actual computation per voxel.
*
* \param input The input data.
* \param voxelNum The voxel number to operate on.
*/
virtual void compute( boost::shared_ptr< InputType const > input, std::size_t voxelNum ) = 0;
protected:
//! the input
boost::shared_ptr< InputType const > m_input;
private:
};
template< class Input_T, class Job_T >
WThreadedStripingJobs< Input_T, Job_T >::WThreadedStripingJobs( boost::shared_ptr< InputType const > input )
: m_input( input )
{
if( !m_input )
{
throw WException( std::string( "Invalid input." ) );
}
}
template< class Input_T, class Job_T >
WThreadedStripingJobs< Input_T, Job_T >::~WThreadedStripingJobs()
{
}
template< class Input_T, class Job_T >
void WThreadedStripingJobs< Input_T, Job_T >::operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown )
{
WAssert( m_input, "Bug: operations of an invalid input requested." );
size_t numElements = m_input->size();
// partition the voxels via simple striping
size_t start = numElements / numThreads * id;
size_t end = ( id + 1 ) * ( numElements / numThreads );
if( id == numThreads - 1 ) // last thread may have less elements to take care.
{
end = numElements;
}
for( size_t voxelNum = start; ( voxelNum < end ) && !shutdown(); ++voxelNum )
{
compute( m_input, voxelNum );
}
}
#endif // WTHREADEDJOBS_H
......@@ -66,13 +66,13 @@ class WThreadedPerVoxelOperationTest;
* The subarray will have exactly numInputs entries.
*/
template< typename Value_T, std::size_t numValues, typename Output_T, std::size_t numOutputs >
class WThreadedPerVoxelOperation : public WThreadedJobs< WValueSet< Value_T >, std::size_t >
class WThreadedPerVoxelOperation : public WThreadedStripingJobs< WValueSet< Value_T >, std::size_t >
{
//! the test is a friend
friend class WThreadedPerVoxelOperationTest;
//! the base class
typedef WThreadedJobs< WValueSet< Value_T >, std::size_t > BaseType;
typedef WThreadedStripingJobs< WValueSet< Value_T >, std::size_t > BaseType;
public:
//! the input valueset's type
......@@ -103,21 +103,13 @@ public:
*/
virtual ~WThreadedPerVoxelOperation();
/**
* Get a job for every voxel.
*
* \param job The job to be done (in this case the voxel to be processed).
* \return false, iff no more voxels need to be processed.
*/
virtual bool getJob( std::size_t& job ); // NOLINT
/**
* Perform the computation for a specific voxel.
*
* \param input The input dataset.
* \param job The job to be done (in this case the voxel to be processed).
* \param voxelNum The voxel number to operate on.
*/
virtual void compute( boost::shared_ptr< ValueSetType const > input, std::size_t const& job );
virtual void compute( boost::shared_ptr< ValueSetType const > input, std::size_t voxelNum );
/**
* Get the output dataset.
......@@ -126,6 +118,9 @@ public:
*/
boost::shared_ptr< WDataSetSingle > getResult();
protected:
using BaseType::m_input;
private:
//! a threadsafe vector (container)
typedef boost::shared_ptr< std::vector< Output_T > > OutputVectorType;
......@@ -133,12 +128,6 @@ private:
//! stores the output of the per-voxel-operation
OutputVectorType m_output;
//! the current position in the dataset
WSharedObject< std::size_t > m_position;
//! the size of the valueset
std::size_t m_size;
//! the function applied to every voxel
FunctionType m_func;
......@@ -178,12 +167,10 @@ WThreadedPerVoxelOperation< Value_T, numValues, Output_T, numOutputs >::WThreade
throw WException( std::string( "No valid function provided." ) );
}
m_position.getWriteTicket()->get() = 0;
m_size = dataset->getValueSet()->size();
try
{
// allocate enough memory for the output data
m_output = OutputVectorType( new std::vector< Output_T >( m_size * numOutputs ) );
m_output = OutputVectorType( new std::vector< Output_T >( m_input->size() * numOutputs ) );
}
catch( std::exception const& e )
{
......@@ -198,25 +185,15 @@ WThreadedPerVoxelOperation< Value_T, numValues, Output_T, numOutputs >::~WThread
{
}
template< typename Value_T, std::size_t numValues, typename Output_T, std::size_t numOutputs >
bool WThreadedPerVoxelOperation< Value_T, numValues, Output_T, numOutputs >::getJob( std::size_t& job ) // NOLINT
{
typename WSharedObject< std::size_t >::WriteTicket t = m_position.getWriteTicket();
bool b = t->get() < m_size;
job = t->get();
t->get() += 1 * static_cast< std::size_t >( b );
return b;
}
template< typename Value_T, std::size_t numValues, typename Output_T, std::size_t numOutputs >
void WThreadedPerVoxelOperation< Value_T, numValues, Output_T, numOutputs >::compute( boost::shared_ptr< ValueSetType const > input,
std::size_t const& job )
std::size_t voxelNum )
{
TransmitType t = input->getSubArray( job * numValues, numValues );
TransmitType t = input->getSubArray( voxelNum * numValues, numValues );
OutTransmitType o = m_func( t );
for( std::size_t k = 0; k < numOutputs; ++k )
{
( *m_output )[ job * numOutputs + k ] = o[ k ];
( *m_output )[ voxelNum * numOutputs + k ] = o[ k ];
}
}
......
......@@ -86,24 +86,6 @@ public:
TPVO t( ds, boost::bind( &WThreadedPerVoxelOperationTest::func, this, _1 ) );
TS_ASSERT_EQUALS( ds->getGrid(), t.m_grid );
TS_ASSERT_EQUALS( t.m_size, 8 );
TS_ASSERT_EQUALS( t.m_position.getWriteTicket()->get(), 0 );
}
/**
* Test if jobs get created correctly.
*/
void testGetJobs()
{
boost::shared_ptr< WDataSetSingle > ds = buildTestData();
TPVO t( ds, boost::bind( &WThreadedPerVoxelOperationTest::func, this, _1 ) );
std::size_t job;
for( std::size_t i = 0; i < 8; ++i )
{
TS_ASSERT( t.getJob( job ) );
}
TS_ASSERT( !t.getJob( job ) );
}
/**
......@@ -129,14 +111,14 @@ public:
boost::shared_ptr< WDataSetSingle > res = t->getResult();
float shouldBe[] = {
2.0f, 2.0f, 5.0f,
-5.0f, 4.0f, -7.0f,
8.0f, 8.0f, 23.0f,
-4.0f, 3.0f, -6.0f,
-28.0f, 13.0f, -44.0f,
3.0f, 4.0f, 9.0f,
-4.0f, 5.0f, -4.0f,
2.0f, -4.0f, -1.0f
2.0f, 2.0f, 5.0f,
-5.0f, 4.0f, -7.0f,
8.0f, 8.0f, 23.0f,
-4.0f, 3.0f, -6.0f,
-28.0f, 13.0f, -44.0f,
3.0f, 4.0f, 9.0f,
-4.0f, 5.0f, -4.0f,
2.0f, -4.0f, -1.0f
};
TS_ASSERT( res );
......
......@@ -44,7 +44,8 @@ class WDataSetSingle;
template< class T > class WModuleInputData;
template< class T > class WModuleOutputData;
#define WM_MORI_NUM_CORES W_AUTOMATIC_NB_THREADS
// #define WM_MORI_NUM_CORES W_AUTOMATIC_NB_THREADS
#define WM_MORI_NUM_CORES 16
/**
* \class WMDeterministicFTMori
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment