Commit 4743a565 authored by Jakub Klinkovský's avatar Jakub Klinkovský
Browse files

Optimized parallel OpenMP scan algorithm for expensive inputs

This adds back the original approach (prescan + uniform shift) which
was removed too early.
parent 8f8c301b
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -30,7 +30,8 @@ struct Scan< Devices::Sequential, Type, PhaseType >
   template< typename InputArray,
             typename OutputArray,
             typename Reduction >
   static void
   // returns the last value of inclusive scan (reduction of the whole input)
   static typename OutputArray::ValueType
   perform( const InputArray& input,
            OutputArray& output,
            typename InputArray::IndexType begin,
+130 −25
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ template< ScanType Type, ScanPhaseType PhaseType >
   template< typename InputArray,
             typename OutputArray,
             typename Reduction >
void
typename OutputArray::ValueType
Scan< Devices::Sequential, Type, PhaseType >::
perform( const InputArray& input,
         OutputArray& output,
@@ -57,6 +57,8 @@ perform( const InputArray& input,
         aux = reduction( aux, x );
      }
   }
   // return the last value of inclusive scan (reduction of the whole input)
   return aux;
}

template< ScanType Type, ScanPhaseType PhaseType >
@@ -72,6 +74,25 @@ performFirstPhase( const InputArray& input,
                   typename OutputArray::IndexType outputBegin,
                   Reduction&& reduction,
                   typename OutputArray::ValueType zero )
{
   if( end <= begin ) {
      Containers::Array< typename OutputArray::ValueType, Devices::Sequential > block_results( 1 );
      block_results.setValue( zero );
      return block_results;
   }

   switch( PhaseType )
   {
      case ScanPhaseType::WriteInFirstPhase:
      {
         // artificial second phase - pre-scan the block
         Containers::Array< typename OutputArray::ValueType, Devices::Sequential > block_results( 2 );
         block_results[ 0 ] = zero;
         block_results[ 1 ] = perform( input, output, begin, end, outputBegin, reduction, zero );
         return block_results;
      }

      case ScanPhaseType::WriteInSecondPhase:
      {
         // artificial first phase - only reduce the block
         Containers::Array< typename OutputArray::ValueType, Devices::Sequential > block_results( 2 );
@@ -79,6 +100,8 @@ performFirstPhase( const InputArray& input,
         block_results[ 1 ] = reduce< Devices::Sequential >( begin, end, input, reduction, zero );
         return block_results;
      }
   };
}

template< ScanType Type, ScanPhaseType PhaseType >
   template< typename InputArray,
@@ -96,9 +119,26 @@ performSecondPhase( const InputArray& input,
                    Reduction&& reduction,
                    typename OutputArray::ValueType zero,
                    typename OutputArray::ValueType shift )
{
   switch( PhaseType )
   {
      case ScanPhaseType::WriteInFirstPhase:
      {
         // artificial second phase - uniform shift of a pre-scanned block
         shift = reduction( shift, blockShifts[ 0 ] );
         typename InputArray::IndexType outputEnd = outputBegin + end - begin;
         for( typename InputArray::IndexType i = outputBegin; i < outputEnd; i++ )
            output[ i ] = reduction( output[ i ], shift );
         break;
      }

      case ScanPhaseType::WriteInSecondPhase:
      {
         // artificial second phase - only one block, use the shift as the initial value
   perform( input, output, begin, end, outputBegin, reduction, reduction( zero, reduction( shift, blockShifts[ 0 ] ) ) );
         perform( input, output, begin, end, outputBegin, reduction, reduction( shift, blockShifts[ 0 ] ) );
         break;
      }
   }
}

template< ScanType Type, ScanPhaseType PhaseType >
@@ -139,6 +179,32 @@ perform( const InputArray& input,
         const IndexType block_end = TNL::min( block_begin + block_size, end );
         const IndexType block_output_begin = outputBegin + block_offset;

         switch( PhaseType )
         {
            case ScanPhaseType::WriteInFirstPhase:
            {
               // step 1: pre-scan the block and save the result of the block reduction
               block_results[ block_idx ] = Scan< Devices::Sequential, Type >::perform( input, output, block_begin, block_end, block_output_begin, reduction, zero );

               #pragma omp barrier

               // step 2: scan the block results
               #pragma omp single
               {
                  Scan< Devices::Sequential, ScanType::Exclusive >::perform( block_results, block_results, 0, blocks + 1, 0, reduction, zero );
               }

               // step 3: uniform shift of the pre-scanned block
               const ValueType block_shift = block_results[ block_idx ];
               const IndexType block_output_end = block_output_begin + block_end - block_begin;
               for( IndexType i = block_output_begin; i < block_output_end; i++ )
                  output[ i ] = reduction( output[ i ], block_shift );

               break;
            }

            case ScanPhaseType::WriteInSecondPhase:
            {
               // step 1: per-block reductions, write the result into the buffer
               block_results[ block_idx ] = reduce< Devices::Sequential >( block_begin, block_end, input, reduction, zero );

@@ -152,6 +218,10 @@ perform( const InputArray& input,

               // step 3: per-block scan using the block results as initial values
               Scan< Devices::Sequential, Type >::perform( input, output, block_begin, block_end, block_output_begin, reduction, block_results[ block_idx ] );

               break;
            }
         }
      }
   }
   else
@@ -195,14 +265,30 @@ performFirstPhase( const InputArray& input,
      #pragma omp parallel num_threads(threads)
      {
         const int block_idx = omp_get_thread_num();
         const IndexType block_begin = begin + block_idx * block_size;
         const IndexType block_offset = block_idx * block_size;
         const IndexType block_begin = begin + block_offset;
         const IndexType block_end = TNL::min( block_begin + block_size, end );
         const IndexType block_output_begin = outputBegin + block_offset;

         // step 1: per-block reductions, write the result into the buffer
         switch( PhaseType )
         {
            case ScanPhaseType::WriteInFirstPhase:
            {
               // pre-scan the block, write the result of the block reduction into the buffer
               block_results[ block_idx ] = Scan< Devices::Sequential, Type >::perform( input, output, block_begin, block_end, block_output_begin, reduction, zero );
               break;
            }

            case ScanPhaseType::WriteInSecondPhase:
            {
               // upsweep: per-block reductions, write the result into the buffer
               block_results[ block_idx ] = reduce< Devices::Sequential >( block_begin, block_end, input, reduction, zero );
               break;
            }
         }
      }

      // step 2: scan the block results
      // spine step: scan the block results
      Scan< Devices::Sequential, ScanType::Exclusive >::perform( block_results, block_results, 0, blocks + 1, 0, reduction, zero );

      // block_results now contains shift values for each block - to be used in the second phase
@@ -231,6 +317,7 @@ performSecondPhase( const InputArray& input,
                    typename OutputArray::ValueType shift )
{
#ifdef HAVE_OPENMP
   using ValueType = typename OutputArray::ValueType;
   using IndexType = typename InputArray::IndexType;

   if( end <= begin )
@@ -251,8 +338,26 @@ performSecondPhase( const InputArray& input,
         const IndexType block_end = TNL::min( block_begin + block_size, end );
         const IndexType block_output_begin = outputBegin + block_offset;

         // phase 2: per-block scan using the block results as initial values
         Scan< Devices::Sequential, Type >::perform( input, output, block_begin, block_end, block_output_begin, reduction, reduction( zero, reduction( shift, blockShifts[ block_idx ] ) ) );
         const ValueType block_shift = reduction( shift, blockShifts[ block_idx ] );

         switch( PhaseType )
         {
            case ScanPhaseType::WriteInFirstPhase:
            {
               // uniform shift of a pre-scanned block
               const IndexType block_output_end = block_output_begin + block_end - block_begin;
               for( IndexType i = block_output_begin; i < block_output_end; i++ )
                  output[ i ] = reduction( output[ i ], block_shift );
               break;
            }

            case ScanPhaseType::WriteInSecondPhase:
            {
               // downsweep: per-block scan using the block results as initial values
               Scan< Devices::Sequential, Type >::perform( input, output, block_begin, block_end, block_output_begin, reduction, block_shift );
               break;
            }
         }
      }
   }
   else