diff --git a/src/TNL/Containers/DistributedNDArraySynchronizer.h b/src/TNL/Containers/DistributedNDArraySynchronizer.h index cea40bc21c3ec0c71ad891aafcfe620c9582754b..8dc7af454206ba317c01b17a360df5eb460e27c8 100644 --- a/src/TNL/Containers/DistributedNDArraySynchronizer.h +++ b/src/TNL/Containers/DistributedNDArraySynchronizer.h @@ -13,13 +13,69 @@ #pragma once #include <future> +// 3rd-party async library providing a thread-pool +#include <async/threadpool.h> #include <TNL/Containers/ndarray/SynchronizerBuffers.h> #include <TNL/MPI/Wrappers.h> +#include <TNL/Timer.h> namespace TNL { namespace Containers { +enum class SyncDirection : std::uint8_t { + // special - sync in all directions + All = 0xff, + // sync directions like in LBM + None = 0, + Right = 1 << 0, + Left = 1 << 1, + + // TODO: for 2D distribution: + // Right = 1 << 0, + // Left = 1 << 1, + // Top = 1 << 2, + // Bottom = 1 << 3, + // TopRight = Top | Right, + // TopLeft = Top | Left, + // BottomRight = Bottom | Right, + // BottomLeft = Bottom | Left + + // TODO: for 3D distribution: + // Right = 1 << 0, + // Left = 1 << 1, + // Top = 1 << 2, + // Bottom = 1 << 3, + // Back = 1 << 4, + // Front = 1 << 5, + // TopRight = Top | Right, + // TopLeft = Top | Left, + // BottomRight = Bottom | Right, + // BottomLeft = Bottom | Left + // BackRight = Back | Right, + // BackLeft = Back | Left, + // FrontRight = Front | Right, + // FrontLeft = Front | Left, + // BackTop = Back | Top, + // BackBottom = Back | Bottom, + // FrontTop = Front | Top, + // FrontBottom = Front | Bottom, + // BackTopRight = Back | Top | Right, + // BackTopLeft = Back | Top | Left, + // BackBottomRight = Back | Bottom | Right, + // BackBottomLeft = Back | Bottom | Left, + // FrontTopRight = Front | Top | Right, + // FrontTopLeft = Front | Top | Left, + // FrontBottomRight = Front | Bottom | Right, + // FrontBottomLeft = Front | Bottom | Left, +}; + +inline bool +operator&( SyncDirection a, SyncDirection b ) +{ + return std::uint8_t(a) & std::uint8_t(b); +} + template< typename DistributedNDArray, // This can be set to false to optimize out buffering when it is not needed // (e.g. for LBM with 1D distribution and specific orientation of the ndarray) @@ -28,26 +84,68 @@ template< typename DistributedNDArray, bool LBM_HACK = false > class DistributedNDArraySynchronizer { +private: + // NOTE: async::threadpool has alignment requirements, which causes problems: + // - it may become misaligned in derived classes, see e.g. + // https://stackoverflow.com/a/46475498 + // solution: specify it as the first member of the base class + // - operator new before C++17 may not support over-aligned types, see + // https://stackoverflow.com/a/53485295 + // solution: relaxed alignment requirements to not exceed the value of + // alignof(std::max_align_t), which is the strongest alignment supported + // by plain new. See https://github.com/d36u9/async/pull/2 + async::threadpool tp; + + int gpu_id = 0; + + int tag_offset = 0; + + static int reserve_tags(int count) + { + static int offset = 0; + // we could use a post-increment, but we don't have to start from 0 either... + return offset += count; + } + public: + using RequestsVector = std::vector< MPI_Request >; + + enum class AsyncPolicy { + synchronous, + deferred, + threadpool, + async, + }; + +// DistributedNDArraySynchronizer(int max_threads = std::thread::hardware_concurrency()) + DistributedNDArraySynchronizer(int max_threads = 1) + : tp(max_threads), + tag_offset(reserve_tags(2)) // reserve 2 communication tags (for left and right) + {} + void synchronize( DistributedNDArray& array ) { - auto future = synchronizeAsync( array, std::launch::deferred ); - future.wait(); + synchronizeAsync( array, AsyncPolicy::synchronous ); } // This method is not thread-safe - only the thread which created and "owns" the // instance of this object can call this method. - // Also note that this method must not be called again until the previous - // asynchronous operation has finished. - std::shared_future<void> synchronizeAsync( DistributedNDArray& array, std::launch policy = std::launch::async ) + // Also note that if (buffered == true), this method must not be called again until + // the previous asynchronous operation has finished. + void synchronizeAsync( DistributedNDArray& array, AsyncPolicy policy = AsyncPolicy::synchronous, SyncDirection mask = SyncDirection::All ) { + // wait for any previous synchronization (multiple objects can share the + // same synchronizer) + wait(); + + async_start_timer.start(); + // GOTCHA: https://devblogs.nvidia.com/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs/ #ifdef HAVE_CUDA if( std::is_same< typename DistributedNDArray::DeviceType, Devices::Cuda >::value ) cudaGetDevice(&this->gpu_id); #endif - // NOTE: the allocation cannot be done in the worker, otherwise CUDA would crash // skip allocation on repeated calls - compare only sizes, not the actual data if( array_view.getCommunicationGroup() != array.getCommunicationGroup() || array_view.getSizes() != array.getSizes() || @@ -55,6 +153,7 @@ public: array_view.getLocalEnds() != array.getLocalEnds() ) { array_view.bind( array.getView() ); + this->mask = mask; // allocate buffers Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), AllocateHelper >::execHost( buffers, array_view ); @@ -62,41 +161,98 @@ public: else { // only bind to the actual data array_view.bind( array.getView() ); + this->mask = mask; + } + + if( policy == AsyncPolicy::threadpool || policy == AsyncPolicy::async ) { + // everything offloaded to a separate thread + auto worker = [this] () { + // GOTCHA: https://devblogs.nvidia.com/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs/ + #ifdef HAVE_CUDA + if( std::is_same< typename DistributedNDArray::DeviceType, Devices::Cuda >::value ) + cudaSetDevice(this->gpu_id); + #endif + + auto requests = this->worker_init(); + MPI::Waitall( requests.data(), requests.size() ); + this->worker_finish(); + }; + + if( policy == AsyncPolicy::threadpool ) + async_op = tp.post( worker ); + else + async_op = std::async( std::launch::async, worker ); + } + else if( policy == AsyncPolicy::deferred ) { + // immediate start, deferred synchronization (but still in the same thread) + auto requests = worker_init(); + auto worker = [this, requests] () mutable { + MPI::Waitall( requests.data(), requests.size() ); + this->worker_finish(); + }; + this->async_op = std::async( std::launch::deferred, worker ); + } + else { + // synchronous + auto requests = this->worker_init(); + MPI::Waitall( requests.data(), requests.size() ); + this->worker_finish(); } - auto worker = [this](){ this->worker(); }; - return std::async( policy, worker ); + async_ops_count++; + async_start_timer.stop(); + } + + void wait() + { + if( async_op.valid() ) { + async_wait_timer.start(); + async_op.wait(); + async_wait_timer.stop(); + } } + ~DistributedNDArraySynchronizer() + { + if( this->async_op.valid() ) + this->async_op.wait(); + } + + /** + * \brief Can be used for checking if a synchronization started + * asynchronously has been finished. + */ + std::future< void > async_op; + + // attributes for profiling + Timer async_start_timer, async_wait_timer; + std::size_t async_ops_count = 0; + protected: using DistributedNDArrayView = typename DistributedNDArray::ViewType; using Buffers = __ndarray_impl::SynchronizerBuffers< DistributedNDArray >; DistributedNDArrayView array_view; + SyncDirection mask = SyncDirection::All; Buffers buffers; - int gpu_id = 0; - void worker() + RequestsVector worker_init() { - // GOTCHA: https://devblogs.nvidia.com/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs/ - #ifdef HAVE_CUDA - if( std::is_same< typename DistributedNDArray::DeviceType, Devices::Cuda >::value ) - cudaSetDevice(gpu_id); - #endif - // fill send buffers - Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, true ); + Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, true, mask ); // issue all send and receive async operations - std::vector< MPI_Request > requests; + RequestsVector requests; const MPI_Comm group = array_view.getCommunicationGroup(); - Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), SendHelper >::execHost( buffers, requests, group ); + Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), SendHelper >::execHost( buffers, requests, group, tag_offset, mask ); - // wait until send is done - MPI::Waitall( requests.data(), requests.size() ); + return requests; + } + void worker_finish() + { // copy data from receive buffers - Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, false ); + Algorithms::TemplateStaticFor< std::size_t, 0, DistributedNDArray::getDimension(), CopyHelper >::execHost( buffers, array_view, false, mask ); } template< std::size_t dim > @@ -106,9 +262,7 @@ protected: { auto& dim_buffers = buffers.template getDimBuffers< dim >(); - constexpr std::size_t overlap = __ndarray_impl::get< dim >( typename DistributedNDArray::OverlapsType{} ); - // TODO -// constexpr std::size_t overlap = array_view.template getOverlap< dim >(); + constexpr std::size_t overlap = DistributedNDArrayView::LocalIndexerType::template getOverlap< dim >(); if( overlap == 0 ) { dim_buffers.reset(); return; @@ -163,10 +317,10 @@ protected: template< std::size_t dim > struct CopyHelper { - static void exec( Buffers& buffers, DistributedNDArrayView& array_view, bool to_buffer ) + static void exec( Buffers& buffers, DistributedNDArrayView& array_view, bool to_buffer, SyncDirection mask ) { // skip if there are no overlaps - const std::size_t overlap = __ndarray_impl::get< dim >( typename DistributedNDArray::OverlapsType{} ); + constexpr std::size_t overlap = DistributedNDArrayView::LocalIndexerType::template getOverlap< dim >(); if( overlap == 0 ) return; @@ -179,22 +333,30 @@ protected: copy_kernel.to_buffer = to_buffer; if( to_buffer ) { - copy_kernel.buffer_view.bind( dim_buffers.left_send_view ); - copy_kernel.array_offsets = dim_buffers.left_send_offsets; - dim_buffers.left_send_view.forAll( copy_kernel ); - - copy_kernel.buffer_view.bind( dim_buffers.right_send_view ); - copy_kernel.array_offsets = dim_buffers.right_send_offsets; - dim_buffers.right_send_view.forAll( copy_kernel ); + if( mask & SyncDirection::Left ) { + copy_kernel.buffer_view.bind( dim_buffers.left_send_view ); + copy_kernel.array_offsets = dim_buffers.left_send_offsets; + dim_buffers.left_send_view.forAll( copy_kernel ); + } + + if( mask & SyncDirection::Right ) { + copy_kernel.buffer_view.bind( dim_buffers.right_send_view ); + copy_kernel.array_offsets = dim_buffers.right_send_offsets; + dim_buffers.right_send_view.forAll( copy_kernel ); + } } else { - copy_kernel.buffer_view.bind( dim_buffers.left_recv_view ); - copy_kernel.array_offsets = dim_buffers.left_recv_offsets; - dim_buffers.left_recv_view.forAll( copy_kernel ); - - copy_kernel.buffer_view.bind( dim_buffers.right_recv_view ); - copy_kernel.array_offsets = dim_buffers.right_recv_offsets; - dim_buffers.right_recv_view.forAll( copy_kernel ); + if( mask & SyncDirection::Right ) { + copy_kernel.buffer_view.bind( dim_buffers.left_recv_view ); + copy_kernel.array_offsets = dim_buffers.left_recv_offsets; + dim_buffers.left_recv_view.forAll( copy_kernel ); + } + + if( mask & SyncDirection::Left ) { + copy_kernel.buffer_view.bind( dim_buffers.right_recv_view ); + copy_kernel.array_offsets = dim_buffers.right_recv_offsets; + dim_buffers.right_recv_view.forAll( copy_kernel ); + } } } else { @@ -212,41 +374,45 @@ protected: struct SendHelper { template< typename Requests, typename Group > - static void exec( Buffers& buffers, Requests& requests, Group group ) + static void exec( Buffers& buffers, Requests& requests, Group group, int tag_offset, SyncDirection mask ) { - const std::size_t overlap = __ndarray_impl::get< dim >( typename DistributedNDArray::OverlapsType{} ); + constexpr std::size_t overlap = DistributedNDArrayView::LocalIndexerType::template getOverlap< dim >(); if( overlap == 0 ) return; auto& dim_buffers = buffers.template getDimBuffers< dim >(); if( LBM_HACK == false ) { - requests.push_back( MPI::Isend( dim_buffers.left_send_view.getData(), - dim_buffers.left_send_view.getStorageSize(), - dim_buffers.left_neighbor, 0, group ) ); - requests.push_back( MPI::Irecv( dim_buffers.left_recv_view.getData(), - dim_buffers.left_recv_view.getStorageSize(), - dim_buffers.left_neighbor, 1, group ) ); - requests.push_back( MPI::Isend( dim_buffers.right_send_view.getData(), - dim_buffers.right_send_view.getStorageSize(), - dim_buffers.right_neighbor, 1, group ) ); - requests.push_back( MPI::Irecv( dim_buffers.right_recv_view.getData(), - dim_buffers.right_recv_view.getStorageSize(), - dim_buffers.right_neighbor, 0, group ) ); + if( mask & SyncDirection::Left ) { + requests.push_back( MPI::Isend( dim_buffers.left_send_view.getData(), + dim_buffers.left_send_view.getStorageSize(), + dim_buffers.left_neighbor, tag_offset + 0, group ) ); + requests.push_back( MPI::Irecv( dim_buffers.right_recv_view.getData(), + dim_buffers.right_recv_view.getStorageSize(), + dim_buffers.right_neighbor, tag_offset + 0, group ) ); + } + if( mask & SyncDirection::Right ) { + requests.push_back( MPI::Isend( dim_buffers.right_send_view.getData(), + dim_buffers.right_send_view.getStorageSize(), + dim_buffers.right_neighbor, tag_offset + 1, group ) ); + requests.push_back( MPI::Irecv( dim_buffers.left_recv_view.getData(), + dim_buffers.left_recv_view.getStorageSize(), + dim_buffers.left_neighbor, tag_offset + 1, group ) ); + } } else { requests.push_back( MPI::Isend( dim_buffers.left_send_view.getData() + 0, dim_buffers.left_send_view.getStorageSize() / 27 * 9, - dim_buffers.left_neighbor, 0, group ) ); + dim_buffers.left_neighbor, tag_offset + 0, group ) ); requests.push_back( MPI::Irecv( dim_buffers.left_recv_view.getData() + dim_buffers.left_recv_view.getStorageSize() / 27 * 18, dim_buffers.left_recv_view.getStorageSize() / 27 * 9, - dim_buffers.left_neighbor, 1, group ) ); + dim_buffers.left_neighbor, tag_offset + 1, group ) ); requests.push_back( MPI::Isend( dim_buffers.right_send_view.getData() + dim_buffers.left_recv_view.getStorageSize() / 27 * 18, dim_buffers.right_send_view.getStorageSize() / 27 * 9, - dim_buffers.right_neighbor, 1, group ) ); + dim_buffers.right_neighbor, tag_offset + 1, group ) ); requests.push_back( MPI::Irecv( dim_buffers.right_recv_view.getData() + 0, dim_buffers.right_recv_view.getStorageSize() / 27 * 9, - dim_buffers.right_neighbor, 0, group ) ); + dim_buffers.right_neighbor, tag_offset + 0, group ) ); } } };