Commit 3771bb31 authored by Jakub Klinkovský's avatar Jakub Klinkovský
Browse files

Refucktoring DistributedNDArraySynchronizer

parent 2c679a8d
Loading
Loading
Loading
Loading
+225 −59
Original line number Diff line number Diff line
@@ -13,13 +13,69 @@
#pragma once

#include <future>
// 3rd-party async library providing a thread-pool
#include <async/threadpool.h>

#include <TNL/Containers/ndarray/SynchronizerBuffers.h>
#include <TNL/MPI/Wrappers.h>
#include <TNL/Timer.h>

namespace TNL {
namespace Containers {

enum class SyncDirection : std::uint8_t {
   // special - sync in all directions
   All = 0xff,
   // sync directions like in LBM
   None = 0,
   Right = 1 << 0,
   Left = 1 << 1,

   // TODO: for 2D distribution:
   // Right = 1 << 0,
   // Left = 1 << 1,
   // Top = 1 << 2,
   // Bottom = 1 << 3,
   // TopRight = Top | Right,
   // TopLeft = Top | Left,
   // BottomRight = Bottom | Right,
   // BottomLeft = Bottom | Left

   // TODO: for 3D distribution:
   // Right = 1 << 0,
   // Left = 1 << 1,
   // Top = 1 << 2,
   // Bottom = 1 << 3,
   // Back = 1 << 4,
   // Front = 1 << 5,
   // TopRight = Top | Right,
   // TopLeft = Top | Left,
   // BottomRight = Bottom | Right,
   // BottomLeft = Bottom | Left
   // BackRight = Back | Right,
   // BackLeft = Back | Left,
   // FrontRight = Front | Right,
   // FrontLeft = Front | Left,
   // BackTop = Back | Top,
   // BackBottom = Back | Bottom,
   // FrontTop = Front | Top,
   // FrontBottom = Front | Bottom,
   // BackTopRight = Back | Top | Right,
   // BackTopLeft = Back | Top | Left,
   // BackBottomRight = Back | Bottom | Right,
   // BackBottomLeft = Back | Bottom | Left,
   // FrontTopRight = Front | Top | Right,
   // FrontTopLeft = Front | Top | Left,
   // FrontBottomRight = Front | Bottom | Right,
   // FrontBottomLeft = Front | Bottom | Left,
};

inline bool
operator&( SyncDirection a, SyncDirection b )
{
   return std::uint8_t(a) & std::uint8_t(b);
}

template< typename DistributedNDArray,
          // This can be set to false to optimize out buffering when it is not needed
          // (e.g. for LBM with 1D distribution and specific orientation of the ndarray)
@@ -28,26 +84,68 @@ template< typename DistributedNDArray,
          bool LBM_HACK = false >
class DistributedNDArraySynchronizer
{
private:
   // NOTE: async::threadpool has alignment requirements, which causes problems:
   //  - it may become misaligned in derived classes, see e.g.
   //    https://stackoverflow.com/a/46475498
   //    solution: specify it as the first member of the base class
   //  - operator new before C++17 may not support over-aligned types, see
   //    https://stackoverflow.com/a/53485295
   //    solution: relaxed alignment requirements to not exceed the value of
   //    alignof(std::max_align_t), which is the strongest alignment supported
   //    by plain new. See https://github.com/d36u9/async/pull/2
   async::threadpool tp;

   int gpu_id = 0;

   int tag_offset = 0;

   static int reserve_tags(int count)
   {
      static int offset = 0;
      // we could use a post-increment, but we don't have to start from 0 either...
      return offset += count;
   }

public:
   using RequestsVector = std::vector< MPI_Request >;

   enum class AsyncPolicy {
      synchronous,
      deferred,
      threadpool,
      async,
   };

//   DistributedNDArraySynchronizer(int max_threads = std::thread::hardware_concurrency())
   DistributedNDArraySynchronizer(int max_threads = 1)
   : tp(max_threads),
     tag_offset(reserve_tags(2))  // reserve 2 communication tags (for left and right)
   {}

   void synchronize( DistributedNDArray& array )
   {
      auto future = synchronizeAsync( array, std::launch::deferred );
      future.wait();
      synchronizeAsync( array, AsyncPolicy::synchronous );
   }

   // This method is not thread-safe - only the thread which created and "owns" the
   // instance of this object can call this method.
   // Also note that this method must not be called again until the previous
   // asynchronous operation has finished.
   std::shared_future<void> synchronizeAsync( DistributedNDArray& array, std::launch policy = std::launch::async )
   // Also note that if (buffered == true), this method must not be called again until
   // the previous asynchronous operation has finished.
   void synchronizeAsync( DistributedNDArray& array, AsyncPolicy policy = AsyncPolicy::synchronous, SyncDirection mask = SyncDirection::All )
   {
      // wait for any previous synchronization (multiple objects can share the
      // same synchronizer)
      wait();

      async_start_timer.start();

      // GOTCHA: https://devblogs.nvidia.com/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs/
      #ifdef HAVE_CUDA
      if( std::is_same< typename DistributedNDArray::DeviceType, Devices::Cuda >::value )
         cudaGetDevice(&this->gpu_id);
      #endif

      // NOTE: the allocation cannot be done in the worker, otherwise CUDA would crash
      // skip allocation on repeated calls - compare only sizes, not the actual data
      if( array_view.getCommunicationGroup() != array.getCommunicationGroup() ||
          array_view.getSizes() != array.getSizes() ||
@@ -55,6 +153,7 @@ public:
          array_view.getLocalEnds() != array.getLocalEnds() )
      {
         array_view.bind( array.getView() );
         this->mask = mask;

         // allocate buffers
         Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), AllocateHelper >::execHost( buffers, array_view );
@@ -62,41 +161,98 @@ public:
      else {
         // only bind to the actual data
         array_view.bind( array.getView() );
         this->mask = mask;
      }

      auto worker = [this](){ this->worker(); };
      return std::async( policy, worker );
      if( policy == AsyncPolicy::threadpool || policy == AsyncPolicy::async ) {
         // everything offloaded to a separate thread
         auto worker = [this] () {
            // GOTCHA: https://devblogs.nvidia.com/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs/
            #ifdef HAVE_CUDA
            if( std::is_same< typename DistributedNDArray::DeviceType, Devices::Cuda >::value )
               cudaSetDevice(this->gpu_id);
            #endif

            auto requests = this->worker_init();
            MPI::Waitall( requests.data(), requests.size() );
            this->worker_finish();
         };

         if( policy == AsyncPolicy::threadpool )
            async_op = tp.post( worker );
         else
            async_op = std::async( std::launch::async, worker );
      }
      else if( policy == AsyncPolicy::deferred ) {
         // immediate start, deferred synchronization (but still in the same thread)
         auto requests = worker_init();
         auto worker = [this, requests] () mutable {
            MPI::Waitall( requests.data(), requests.size() );
            this->worker_finish();
         };
         this->async_op = std::async( std::launch::deferred, worker );
      }
      else {
         // synchronous
         auto requests = this->worker_init();
         MPI::Waitall( requests.data(), requests.size() );
         this->worker_finish();
      }

      async_ops_count++;
      async_start_timer.stop();
   }

   void wait()
   {
      if( async_op.valid() ) {
         async_wait_timer.start();
         async_op.wait();
         async_wait_timer.stop();
      }
   }

   ~DistributedNDArraySynchronizer()
   {
      if( this->async_op.valid() )
         this->async_op.wait();
   }

   /**
    * \brief Can be used for checking if a synchronization started
    * asynchronously has been finished.
    */
   std::future< void > async_op;

   // attributes for profiling
   Timer async_start_timer, async_wait_timer;
   std::size_t async_ops_count = 0;

protected:
   using DistributedNDArrayView = typename DistributedNDArray::ViewType;
   using Buffers = __ndarray_impl::SynchronizerBuffers< DistributedNDArray >;

   DistributedNDArrayView array_view;
   SyncDirection mask = SyncDirection::All;
   Buffers buffers;
   int gpu_id = 0;

   void worker()
   RequestsVector worker_init()
   {
      // GOTCHA: https://devblogs.nvidia.com/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs/
      #ifdef HAVE_CUDA
      if( std::is_same< typename DistributedNDArray::DeviceType, Devices::Cuda >::value )
         cudaSetDevice(gpu_id);
      #endif

      // fill send buffers
      Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, true );
      Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, true, mask );

      // issue all send and receive async operations
      std::vector< MPI_Request > requests;
      RequestsVector requests;
      const MPI_Comm group = array_view.getCommunicationGroup();
      Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), SendHelper >::execHost( buffers, requests, group );
      Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), SendHelper >::execHost( buffers, requests, group, tag_offset, mask );

      // wait until send is done
      MPI::Waitall( requests.data(), requests.size() );
      return requests;
   }

   void worker_finish()
   {
      // copy data from receive buffers
      Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, false );
      Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, false, mask );
   }

   template< std::size_t dim >
@@ -106,9 +262,7 @@ protected:
      {
         auto& dim_buffers = buffers.template getDimBuffers< dim >();

         constexpr std::size_t overlap = __ndarray_impl::get< dim >( typename DistributedNDArray::OverlapsType{} );
         // TODO
//         constexpr std::size_t overlap = array_view.template getOverlap< dim >();
         constexpr std::size_t overlap = DistributedNDArrayView::LocalIndexerType::template getOverlap< dim >();
         if( overlap == 0 ) {
            dim_buffers.reset();
            return;
@@ -163,10 +317,10 @@ protected:
   template< std::size_t dim >
   struct CopyHelper
   {
      static void exec( Buffers& buffers, DistributedNDArrayView& array_view, bool to_buffer )
      static void exec( Buffers& buffers, DistributedNDArrayView& array_view, bool to_buffer, SyncDirection mask )
      {
         // skip if there are no overlaps
         const std::size_t overlap = __ndarray_impl::get< dim >( typename DistributedNDArray::OverlapsType{} );
         constexpr std::size_t overlap = DistributedNDArrayView::LocalIndexerType::template getOverlap< dim >();
         if( overlap == 0 )
            return;

@@ -179,24 +333,32 @@ protected:
            copy_kernel.to_buffer = to_buffer;

            if( to_buffer ) {
               if( mask & SyncDirection::Left ) {
                  copy_kernel.buffer_view.bind( dim_buffers.left_send_view );
                  copy_kernel.array_offsets = dim_buffers.left_send_offsets;
                  dim_buffers.left_send_view.forAll( copy_kernel );
               }

               if( mask & SyncDirection::Right ) {
                  copy_kernel.buffer_view.bind( dim_buffers.right_send_view );
                  copy_kernel.array_offsets = dim_buffers.right_send_offsets;
                  dim_buffers.right_send_view.forAll( copy_kernel );
               }
            }
            else {
               if( mask & SyncDirection::Right ) {
                  copy_kernel.buffer_view.bind( dim_buffers.left_recv_view );
                  copy_kernel.array_offsets = dim_buffers.left_recv_offsets;
                  dim_buffers.left_recv_view.forAll( copy_kernel );
               }

               if( mask & SyncDirection::Left ) {
                  copy_kernel.buffer_view.bind( dim_buffers.right_recv_view );
                  copy_kernel.array_offsets = dim_buffers.right_recv_offsets;
                  dim_buffers.right_recv_view.forAll( copy_kernel );
               }
            }
         }
         else {
            // avoid buffering - bind buffer views directly to the array
            dim_buffers.left_send_view.bind( &call_with_offsets( dim_buffers.left_send_offsets, array_view ) );
@@ -212,41 +374,45 @@ protected:
   struct SendHelper
   {
      template< typename Requests, typename Group >
      static void exec( Buffers& buffers, Requests& requests, Group group )
      static void exec( Buffers& buffers, Requests& requests, Group group, int tag_offset, SyncDirection mask )
      {
         const std::size_t overlap = __ndarray_impl::get< dim >( typename DistributedNDArray::OverlapsType{} );
         constexpr std::size_t overlap = DistributedNDArrayView::LocalIndexerType::template getOverlap< dim >();
         if( overlap == 0 )
            return;

         auto& dim_buffers = buffers.template getDimBuffers< dim >();

         if( LBM_HACK == false ) {
            if( mask & SyncDirection::Left ) {
               requests.push_back( MPI::Isend( dim_buffers.left_send_view.getData(),
                                               dim_buffers.left_send_view.getStorageSize(),
                                            dim_buffers.left_neighbor, 0, group ) );
            requests.push_back( MPI::Irecv( dim_buffers.left_recv_view.getData(),
                                            dim_buffers.left_recv_view.getStorageSize(),
                                            dim_buffers.left_neighbor, 1, group ) );
            requests.push_back( MPI::Isend( dim_buffers.right_send_view.getData(),
                                            dim_buffers.right_send_view.getStorageSize(),
                                            dim_buffers.right_neighbor, 1, group ) );
                                               dim_buffers.left_neighbor, tag_offset + 0, group ) );
               requests.push_back( MPI::Irecv( dim_buffers.right_recv_view.getData(),
                                               dim_buffers.right_recv_view.getStorageSize(),
                                            dim_buffers.right_neighbor, 0, group ) );
                                               dim_buffers.right_neighbor, tag_offset + 0, group ) );
            }
            if( mask & SyncDirection::Right ) {
               requests.push_back( MPI::Isend( dim_buffers.right_send_view.getData(),
                                               dim_buffers.right_send_view.getStorageSize(),
                                               dim_buffers.right_neighbor, tag_offset + 1, group ) );
               requests.push_back( MPI::Irecv( dim_buffers.left_recv_view.getData(),
                                               dim_buffers.left_recv_view.getStorageSize(),
                                               dim_buffers.left_neighbor, tag_offset + 1, group ) );
            }
         }
         else {
            requests.push_back( MPI::Isend( dim_buffers.left_send_view.getData() + 0,
                                            dim_buffers.left_send_view.getStorageSize() / 27 * 9,
                                            dim_buffers.left_neighbor, 0, group ) );
                                            dim_buffers.left_neighbor, tag_offset + 0, group ) );
            requests.push_back( MPI::Irecv( dim_buffers.left_recv_view.getData() + dim_buffers.left_recv_view.getStorageSize() / 27 * 18,
                                            dim_buffers.left_recv_view.getStorageSize() / 27 * 9,
                                            dim_buffers.left_neighbor, 1, group ) );
                                            dim_buffers.left_neighbor, tag_offset + 1, group ) );
            requests.push_back( MPI::Isend( dim_buffers.right_send_view.getData() + dim_buffers.left_recv_view.getStorageSize() / 27 * 18,
                                            dim_buffers.right_send_view.getStorageSize() / 27 * 9,
                                            dim_buffers.right_neighbor, 1, group ) );
                                            dim_buffers.right_neighbor, tag_offset + 1, group ) );
            requests.push_back( MPI::Irecv( dim_buffers.right_recv_view.getData() + 0,
                                            dim_buffers.right_recv_view.getStorageSize() / 27 * 9,
                                            dim_buffers.right_neighbor, 0, group ) );
                                            dim_buffers.right_neighbor, tag_offset + 0, group ) );
         }
      }
   };