Commit a4e15b08 authored by Jakub Klinkovský's avatar Jakub Klinkovský
Browse files

Refactored parallel OpenMP scan

The first phase performs only per-block reduction, not scan. The output
array elements are written only in the second phase, so overall we
perform only `n` instead of `2n` write operations.
parent 63d567e4
Loading
Loading
Loading
Loading
+72 −52
Original line number Diff line number Diff line
@@ -104,15 +104,42 @@ perform( Vector& v,
         const typename Vector::ValueType zero )
{
#ifdef HAVE_OPENMP
   if( Devices::Host::isOMPEnabled() && Devices::Host::getMaxThreadsCount() >= 2 ) {
      const auto blockShifts = performFirstPhase( v, begin, end, reduction, zero );
      performSecondPhase( v, blockShifts, begin, end, reduction, zero );
   using ValueType = typename Vector::ValueType;
   using IndexType = typename Vector::IndexType;

   const IndexType size = end - begin;
   const int max_threads = Devices::Host::getMaxThreadsCount();
   const IndexType block_size = TNL::max( 1024, TNL::roundUpDivision( size, max_threads ) );
   const IndexType blocks = TNL::roundUpDivision( size, block_size );

   if( Devices::Host::isOMPEnabled() && blocks >= 2 ) {
      const int threads = TNL::min( blocks, Devices::Host::getMaxThreadsCount() );
      Containers::Array< ValueType > block_results( blocks + 1 );

      #pragma omp parallel num_threads(threads)
      {
         const IndexType block_idx = omp_get_thread_num();
         const IndexType block_begin = begin + block_idx * block_size;
         const IndexType block_end = TNL::min( block_begin + block_size, end );

         // step 1: per-block reductions, write the result into the buffer
         block_results[ block_idx ] = reduce< Devices::Sequential >( block_begin, block_end, v, reduction, zero );

         #pragma omp barrier

         // step 2: scan the block results
         #pragma omp single
         {
            Scan< Devices::Sequential, ScanType::Exclusive >::perform( block_results, 0, blocks + 1, reduction, zero );
         }

         // step 3: per-block scan using the block results as initial values
         Scan< Devices::Sequential, Type >::perform( v, block_begin, block_end, reduction, block_results[ block_idx ] );
      }
   }
   else
      Scan< Devices::Sequential, Type >::perform( v, begin, end, reduction, zero );
#else
   Scan< Devices::Sequential, Type >::perform( v, begin, end, reduction, zero );
#endif
      Scan< Devices::Sequential, Type >::perform( v, begin, end, reduction, zero );
}

template< ScanType Type >
@@ -130,45 +157,34 @@ performFirstPhase( Vector& v,
   using ValueType = typename Vector::ValueType;
   using IndexType = typename Vector::IndexType;

   const int threads = Devices::Host::getMaxThreadsCount();
   Containers::Array< ValueType > block_results( threads + 1 );
   const IndexType size = end - begin;
   const int max_threads = Devices::Host::getMaxThreadsCount();
   const IndexType block_size = TNL::max( 1024, TNL::roundUpDivision( size, max_threads ) );
   const IndexType blocks = TNL::roundUpDivision( size, block_size );

   if( Devices::Host::isOMPEnabled() && blocks >= 2 ) {
      const int threads = TNL::min( blocks, Devices::Host::getMaxThreadsCount() );
      Containers::Array< ValueType, Devices::Sequential > block_results( blocks + 1 );

      #pragma omp parallel num_threads(threads)
      {
      // init
      const int thread_idx = omp_get_thread_num();
      ValueType block_result = zero;
         const IndexType block_idx = omp_get_thread_num();
         const IndexType block_begin = begin + block_idx * block_size;
         const IndexType block_end = TNL::min( block_begin + block_size, end );

      // perform scan on blocks statically assigned to threads
      if( Type == ScanType::Inclusive ) {
         #pragma omp for schedule(static)
         for( IndexType i = begin; i < end; i++ ) {
            block_result = reduction( block_result, v[ i ] );
            v[ i ] = block_result;
         }
      }
      else {
         #pragma omp for schedule(static)
         for( IndexType i = begin; i < end; i++ ) {
            const ValueType x = v[ i ];
            v[ i ] = block_result;
            block_result = reduction( block_result, x );
         }
      }

      // write the block result into the buffer
      block_results[ thread_idx ] = block_result;
         // step 1: per-block reductions, write the result into the buffer
         block_results[ block_idx ] = reduce< Devices::Sequential >( block_begin, block_end, v, reduction, zero );
      }

   // block_results now contains scan results for each block. The first phase
   // ends by computing an exclusive scan of this array.
   Scan< Devices::Sequential, ScanType::Exclusive >::perform( block_results, 0, threads + 1, reduction, zero );
      // step 2: scan the block results
      Scan< Devices::Sequential, ScanType::Exclusive >::perform( block_results, 0, blocks + 1, reduction, zero );

      // block_results now contains shift values for each block - to be used in the second phase
      return block_results;
#else
   return Scan< Devices::Sequential, Type >::performFirstPhase( v, begin, end, reduction, zero );
   }
   else
#endif
      return Scan< Devices::Sequential, Type >::performFirstPhase( v, begin, end, reduction, zero );
}

template< ScanType Type >
@@ -188,22 +204,26 @@ performSecondPhase( Vector& v,
   using ValueType = typename Vector::ValueType;
   using IndexType = typename Vector::IndexType;

   const int threads = blockShifts.getSize() - 1;
   const IndexType size = end - begin;
   const int max_threads = Devices::Host::getMaxThreadsCount();
   const IndexType block_size = TNL::max( 1024, TNL::roundUpDivision( size, max_threads ) );
   const IndexType blocks = TNL::roundUpDivision( size, block_size );

   // launch exactly the same number of threads as in the first phase
   if( Devices::Host::isOMPEnabled() && blocks >= 2 ) {
      const int threads = TNL::min( blocks, Devices::Host::getMaxThreadsCount() );
      #pragma omp parallel num_threads(threads)
      {
      const int thread_idx = omp_get_thread_num();
      const ValueType offset = reduction( zero, blockShifts[ thread_idx ] );
         const IndexType block_idx = omp_get_thread_num();
         const IndexType block_begin = begin + block_idx * block_size;
         const IndexType block_end = TNL::min( block_begin + block_size, end );

      // shift intermediate results by the offset
      #pragma omp for schedule(static)
      for( IndexType i = begin; i < end; i++ )
         v[ i ] = reduction( v[ i ], offset );
         // phase 2: per-block scan using the block results as initial values
         Scan< Devices::Sequential, Type >::perform( v, block_begin, block_end, reduction, reduction( zero, blockShifts[ block_idx ] ) );
      }
#else
   Scan< Devices::Sequential, Type >::performSecondPhase( v, blockShifts, begin, end, reduction, zero );
   }
   else
#endif
      Scan< Devices::Sequential, Type >::performSecondPhase( v, blockShifts, begin, end, reduction, zero );
}

template< ScanType Type >