Commit d7e0e175 authored by Tomáš Oberhuber's avatar Tomáš Oberhuber
Browse files

Refactoring CSR adaptive kernel.

parent 0bdbf8bb
Loading
Loading
Loading
Loading
+131 −70
Original line number Diff line number Diff line
@@ -28,6 +28,11 @@ enum class Type {
   VECTOR = 2
};

/*template< typename Index >
struct LongBlockDescription
{
   uint8_t type;
}*/
template< typename Index >
union Block
{
@@ -55,7 +60,7 @@ union Block

   Block() = default;

   Type getType() const
   __cuda_callable__ Type getType() const
   {
      if( byte[ sizeof( Index ) == 4 ? 7 : 15 ] & 0b1000000 )
         return Type::STREAM;
@@ -64,16 +69,27 @@ union Block
      return Type::LONG;
   }

   Index getFirstRow() const
   __cuda_callable__ const Index& getFirstSegment() const
   {
      return index[ 0 ];
   }

   Index getRowsInBlock() const
   /***
    * \brief Returns number of elements covered by the block.
    */
   __cuda_callable__ const Index getSize() const
   {
      return twobytes[ sizeof(Index) == 4 ? 2 : 4 ];
   }

   /***
    * \brief Returns number of segments covered by the block.
    */
   __cuda_callable__ const Index getSegmentsInBlock() const
   {
      return ( twobytes[ sizeof( Index ) == 4 ? 3 : 5 ] & 0x3FFF );
   }

   void print( std::ostream& str ) const
   {
      Type type = this->getType();
@@ -90,8 +106,8 @@ union Block
            str << " Long ";
            break;
      }
      str << " first row: " << getFirstRow();
      str << " rows per block: " << getRowsInBlock();
      str << " first segment: " << getFirstSegment();
      str << " block end: " << getSize();
      str << " index in warp: " << index[ 1 ];
   }
   Index index[2]; // index[0] is row pointer, index[1] is index in warp
@@ -109,10 +125,12 @@ std::ostream& operator<< ( std::ostream& str, const Block< Index >& block )

#ifdef HAVE_CUDA

template< int warpSize,
template< int CudaBlockSize,
          int warpSize,
          int WARPS,
          int SHARED_PER_WARP,
          int MAX_ELEM_PER_WARP,
          typename BlocksView,
          typename Offsets,
          typename Index,
          typename Fetch,
@@ -121,8 +139,7 @@ template< int warpSize,
          typename Real,
          typename... Args >
__global__ void
segmentsReductionCSRAdaptiveKernel( const Block< Index > *blocks,
                                    Index blocksSize,
segmentsReductionCSRAdaptiveKernel( BlocksView blocks,
                                    int gridIdx,
                                    Offsets offsets,
                                    Index first,
@@ -133,46 +150,51 @@ segmentsReductionCSRAdaptiveKernel( const Block< Index > *blocks,
                                    Real zero,
                                    Args... args )
{
   __shared__ Real shared[WARPS][SHARED_PER_WARP];
   __shared__ Real streamShared[WARPS][SHARED_PER_WARP];
   __shared__ Real multivectorShared[ CudaBlockSize / warpSize ];
   constexpr size_t MAX_X_DIM = 2147483647;
   const Index index = (gridIdx * MAX_X_DIM) + (blockIdx.x * blockDim.x) + threadIdx.x;
   const Index blockIdx = index / warpSize;
   if (blockIdx >= blocksSize)
   if( blockIdx >= blocks.getSize() - 1 )
      return;

   if( threadIdx.x < CudaBlockSize / warpSize )
      multivectorShared[ threadIdx.x ] = zero;
   Real result = zero;
   bool compute( true );
   const Index laneID = threadIdx.x & 31; // & is cheaper than %
   Block<Index> block = blocks[blockIdx];
   const Index minID = offsets[block.index[0]/* minRow */];
   Index to, maxID;
   const Block< Index > block = blocks[ blockIdx ];
   const Index& firstSegmentIdx = block.getFirstSegment();
   const Index begin = offsets[ firstSegmentIdx ];
   //Index to, maxID;

   if (block.byte[sizeof(Index) == 4 ? 7 : 15] & 0b1000000)
   const auto blockType = block.getType();
   if( blockType == Type::STREAM )
   {
      const Index warpID = threadIdx.x / 32;
      maxID = minID + /* maxID - minID */block.twobytes[sizeof(Index) == 4 ? 2 : 4];
      const Index end = begin + block.getSize();

      /* Stream data to shared memory */
      for( Index globalIdx = laneID + minID; globalIdx < maxID; globalIdx += warpSize )
      // Stream data to shared memory
      for( Index globalIdx = laneID + begin; globalIdx < end; globalIdx += warpSize )
      {
         shared[warpID][globalIdx - minID] = //fetch( globalIdx, compute );
         streamShared[warpID][globalIdx - begin ] = //fetch( globalIdx, compute );
            details::FetchLambdaAdapter< Index, Fetch >::call( fetch, -1, -1, globalIdx, compute );
         //printf( "Stream: Fetch at %d -> %d \n", globalIdx, shared[warpID][globalIdx - minID] );
         //details::FetchLambdaAdapter< Index, Fetch >::call( fetch, -1, -1, globalIdx, compute ) );
            // TODO:: fix this
         // TODO:: fix this by template specialization so that we can assume fetch lambda
         // with short parameters
      }

      const Index maxRow = block.index[0]/* minRow */ +
         /* maxRow - minRow */(block.twobytes[sizeof(Index) == 4 ? 3 : 5] & 0x3FFF);
      const Index maxRow = firstSegmentIdx + block.getSegmentsInBlock();
      /* minRow */ //+
         /* maxRow - minRow *///(block.twobytes[sizeof(Index) == 4 ? 3 : 5] & 0x3FFF);
      /// Calculate result 
      for( Index i = block.index[0]/* minRow */ + laneID; i < maxRow; i += warpSize )
      {
         to = offsets[i + 1] - minID; // end of preprocessed data
         const Index to = offsets[i + 1] - begin; // end of preprocessed data
         result = zero;
         // Scalar reduction
         for( Index sharedID = offsets[ i ] - minID; sharedID < to; ++sharedID)
         for( Index sharedID = offsets[ i ] - begin; sharedID < to; ++sharedID)
         {
            result = reduce( result, shared[warpID][sharedID] );
            result = reduce( result, streamShared[warpID][sharedID] );
            //printf( " threadIdx %d is adding %d in segment %d -> %d\n", threadIdx.x, shared[warpID][sharedID], i, result );
         }

@@ -180,16 +202,15 @@ segmentsReductionCSRAdaptiveKernel( const Block< Index > *blocks,
         keep( i, result );
      }
   }
   else //if (block.byte[sizeof(Index) == 4 ? 7 : 15] & 0b10000000)
   else if( blockType == Type::VECTOR )
   {
      printf( "Vector: threadIdx = %d \n", threadIdx );
      //printf( "Vector: threadIdx = %d \n", threadIdx );
      /////////////////////////////////////* CSR VECTOR *//////////////
      maxID = minID + /* maxID - minID */block.twobytes[sizeof(Index) == 4 ? 2 : 4];
      const Index end = begin + block.getSize(); //block.twobytes[sizeof(Index) == 4 ? 2 : 4];
      const Index segmentIdx = block.index[0];

      for( Index globalIdx = minID + laneID; globalIdx < maxID; globalIdx += warpSize )
      for( Index globalIdx = begin + laneID; globalIdx < end; globalIdx += warpSize )
         result = reduce( result, details::FetchLambdaAdapter< Index, Fetch >::call( fetch, segmentIdx, -1, globalIdx, compute ) ); // fix local idx
         //values[i] * inVector[columnIndexes[i]];

      /* Parallel reduction */
      result = reduce( result, __shfl_down_sync( 0xFFFFFFFF, result, 16 ) );
@@ -199,31 +220,65 @@ segmentsReductionCSRAdaptiveKernel( const Block< Index > *blocks,
      result = reduce( result, __shfl_down_sync( 0xFFFFFFFF, result,  1 ) );
      if( laneID == 0 )
      {
         printf( "Vector: threadIdx = %d result for segment %d is %d \n", threadIdx, segmentIdx, result );
         //printf( "Vector: threadIdx = %d result for segment %d is %f \n", threadIdx, segmentIdx, result );
         keep( segmentIdx, result );
          //outVector[block.index[0]/* minRow */] = result; // Write result
      }
   }/*
   else
   }
   else // blockType == Type::LONG
   {
      ///////////////////////////////////// CSR VECTOR L /////////////
      // Number of elements processed by previous warps
      const Index offset = block.index[1] * MAX_ELEM_PER_WARP;
      to = minID + (block.index[1]  + 1) * MAX_ELEM_PER_WARP;
      maxID = offsets[block.index[0] + 1];
      if( to > maxID )
         to = maxID;
      for( Index globalIdx = minID + offset + laneID; globalIdx < to; globalIdx += warpSize )
         result = reduce( result, details::FetchLambdaAdapter< Index, Fetch >::call( fetch, segmentIdx, localIdx, globalIdx, compute ) );
      Index to = begin + (block.index[1]  + 1) * MAX_ELEM_PER_WARP;
      const Index segmentIdx = block.index[0];
      //minID = offsets[block.index[0] ];
      const Index end = offsets[block.index[0] + 1];
      const int tid = threadIdx.x;
      
      if( to > end )
         to = end;
      result = zero;
      //printf( "tid %d : start = %d \n", tid, minID + laneID );
      for( Index globalIdx = begin + laneID + offset; globalIdx < to; globalIdx += warpSize )
      {
         result = reduce( result, details::FetchLambdaAdapter< Index, Fetch >::call( fetch, segmentIdx, -1, globalIdx, compute ) );
         //printf( "tid %d -> %d \n", tid, details::FetchLambdaAdapter< Index, Fetch >::call( fetch, segmentIdx, -1, globalIdx, compute ) );
         //result += values[i] * inVector[columnIndexes[i]];
      }


      result += __shfl_down_sync(0xFFFFFFFF, result, 16);
      result += __shfl_down_sync(0xFFFFFFFF, result, 8);
      result += __shfl_down_sync(0xFFFFFFFF, result, 4);
      result += __shfl_down_sync(0xFFFFFFFF, result, 2);
      result += __shfl_down_sync(0xFFFFFFFF, result, 1);
      if (laneID == 0) atomicAdd(&outVector[block.index[0] ], result);
   }*/
      const Index warpID = threadIdx.x / 32;
      if( laneID == 0 )
         multivectorShared[ warpID ] = result;
      __syncthreads();
      // Reduction in multivectorShared
      if( tid < 16 )
      {
         multivectorShared[ tid ] =  reduce( multivectorShared[ tid ], multivectorShared[ tid + 16 ] );
         __syncwarp();
         multivectorShared[ tid ] =  reduce( multivectorShared[ tid ], multivectorShared[ tid +  8 ] );
         __syncwarp();
         multivectorShared[ tid ] =  reduce( multivectorShared[ tid ], multivectorShared[ tid +  4 ] );
         __syncwarp();
         multivectorShared[ tid ] =  reduce( multivectorShared[ tid ], multivectorShared[ tid +  2 ] );
         __syncwarp();
         multivectorShared[ tid ] =  reduce( multivectorShared[ tid ], multivectorShared[ tid +  1 ] );
         __syncwarp();
         if( tid == 0 )
         {
            printf( "Long: segmentIdx %d -> %d \n", segmentIdx, multivectorShared[ 0 ] );
            keep( segmentIdx, multivectorShared[ 0 ] );
         }
      }

      //if (laneID == 0) atomicAdd(&outVector[block.index[0] ], result);
   }
}
#endif

@@ -278,10 +333,10 @@ struct CSRKernelAdaptiveView
         return;
      }

      this->printBlocks();
      //this->printBlocks();
      static constexpr Index THREADS_ADAPTIVE = sizeof(Index) == 8 ? 128 : 256;
      //static constexpr Index THREADS_SCALAR = 128;
      static constexpr Index THREADS_VECTOR = 128;
      //static constexpr Index THREADS_VECTOR = 128;
      static constexpr Index THREADS_LIGHT = 128;

      /* Max length of row to process one warp for CSR Light, MultiVector */
@@ -310,7 +365,7 @@ struct CSRKernelAdaptiveView
      constexpr size_t MAX_X_DIM = 2147483647;

      /* Fill blocks */
      size_t neededThreads = blocks.getSize() * warpSize; // one warp per block
      size_t neededThreads = this->blocks.getSize() * warpSize; // one warp per block
      /* Execute kernels on device */
      for (Index gridIdx = 0; neededThreads != 0; gridIdx++ )
      {
@@ -326,15 +381,16 @@ struct CSRKernelAdaptiveView
         }

         segmentsReductionCSRAdaptiveKernel<
               THREADS_ADAPTIVE,
               warpSize,
               WARPS,
               SHARED_PER_WARP,
               MAX_ELEMENTS_PER_WARP_ADAPT,
               BlocksView,
               OffsetsView,
               Index, Fetch, Reduction, ResultKeeper, Real, Args... >
            <<<blocksCount, threads>>>(
               blocks.getData(),
               blocks.getSize() - 1, // last block shouldn't be used
               this->blocks,
               gridIdx,
               offsets,
               first,
@@ -408,15 +464,15 @@ struct CSRKernelAdaptive
                    Index &sum )
   {
      sum = 0;
      for (Index current = start; current < size - 1; ++current)
      for (Index current = start; current < size - 1; current++ )
      {
         Index elements = offsets.getElement(current + 1) -
                           offsets.getElement(current);
         sum += elements;
         if( sum > SHARED_PER_WARP )
         {
            if (current - start > 0)
            { // extra row
            if( current - start > 0 ) // extra row
            {
               type = Type::STREAM;
               return current;
            }
@@ -425,7 +481,8 @@ struct CSRKernelAdaptive
               if( sum <= 2 * MAX_ELEMENTS_PER_WARP_ADAPT )
                  type = Type::VECTOR;
               else
               type = Type::LONG;
                  type = Type::VECTOR; // TODO: Put LONG back
                  //type = Type::LONG; //
               return current + 1;
            }
         }
@@ -438,7 +495,7 @@ struct CSRKernelAdaptive
    void init( const Offsets& offsets )
    {
        const Index rows = offsets.getSize();
        Index sum, start = 0, nextStart = 0;
        Index sum, start( 0 ), nextStart( 0 );

        // Fill blocks
        std::vector< Block< Index > > inBlock;
@@ -451,11 +508,15 @@ struct CSRKernelAdaptive

            if( type == Type::LONG )
            {
                Index parts = roundUpDivision(sum, this->SHARED_PER_WARP);
                for (Index index = 0; index < parts; ++index)
               inBlock.emplace_back( start, Type::LONG, 0 );
               const Index blocksCount = inBlock.size();
               const Index warpsPerCudaBlock = THREADS_ADAPTIVE / TNL::Cuda::getWarpSize();
               const Index warpsLeft = roundUpDivision( blocksCount, warpsPerCudaBlock ) * warpsPerCudaBlock - blocksCount;
               //Index parts = roundUpDivision(sum, this->SHARED_PER_WARP);
               /*for( Index index = 1; index < warpsLeft; index++ )
               {
                  inBlock.emplace_back(start, Type::LONG, index);
                }
               }*/
            }
            else
            {