Commit 6f74d8fa authored by Jakub Klinkovský's avatar Jakub Klinkovský
Browse files

MPI refactoring: removed MpiCommunicator from DistributedMesh

Also from DistributedMeshSynchronizer, PVTUReader and PVTUWriter
parent ee2fd25d
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@
#include <async/threadpool.h>

#include <TNL/Containers/ArrayView.h>
#include <TNL/Communicators/MpiCommunicator.h>
#include <TNL/MPI/Wrappers.h>
#include <TNL/Timer.h>

namespace TNL {
@@ -42,7 +42,7 @@ private:

public:
   using ByteArrayView = ArrayView< std::uint8_t, Device, Index >;
   using RequestsVector = std::vector< typename Communicators::MpiCommunicator::Request >;
   using RequestsVector = std::vector< MPI_Request >;

   enum class AsyncPolicy {
      synchronous,
@@ -105,7 +105,7 @@ public:
         // immediate start, deferred synchronization (but still in the same thread)
         auto requests = synchronizeByteArrayAsyncWorker( array, bytesPerValue );
         auto worker = [requests] () mutable {
            Communicators::MpiCommunicator::WaitAll( requests.data(), requests.size() );
            MPI::Waitall( requests.data(), requests.size() );
         };
         this->async_op = std::async( std::launch::deferred, worker );
      }
+9 −11
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@
#pragma once

#include <TNL/Containers/Array.h>
#include <TNL/Communicators/MpiCommunicator.h>
#include <TNL/MPI/Wrappers.h>
#include <TNL/Meshes/DistributedMeshes/GlobalIndexStorage.h>
#include <TNL/Meshes/MeshDetails/IndexPermutationApplier.h>

@@ -34,8 +34,6 @@ public:
   using PointType          = typename Mesh::PointType;
   using RealType           = typename PointType::RealType;
   using GlobalIndexArray   = typename Mesh::GlobalIndexArray;
   using CommunicatorType   = Communicators::MpiCommunicator;
   using CommunicationGroup = typename CommunicatorType::CommunicationGroup;
   using VTKTypesArrayType  = Containers::Array< std::uint8_t, Devices::Sequential, GlobalIndexType >;

   DistributedMesh() = default;
@@ -101,12 +99,12 @@ public:
   /**
    * Methods specific to the distributed mesh
    */
   void setCommunicationGroup( CommunicationGroup group )
   void setCommunicationGroup( MPI_Comm group )
   {
      this->group = group;
   }

   CommunicationGroup getCommunicationGroup() const
   MPI_Comm getCommunicationGroup() const
   {
      return group;
   }
@@ -190,10 +188,10 @@ public:
      const GlobalIndexType verticesCount = localMesh.template getEntitiesCount< 0 >();
      const GlobalIndexType cellsCount = localMesh.template getEntitiesCount< Mesh::getMeshDimension() >();

      CommunicatorType::Barrier();
      for( int i = 0; i < CommunicatorType::GetSize(); i++ ) {
         if( i == CommunicatorType::GetRank() ) {
            str << "MPI rank:\t" << CommunicatorType::GetRank() << "\n"
      MPI::Barrier();
      for( int i = 0; i < MPI::GetSize(); i++ ) {
         if( i == MPI::GetRank() ) {
            str << "MPI rank:\t" << MPI::GetRank() << "\n"
                << "\tMesh dimension:\t" << getMeshDimension() << "\n"
                << "\tCell topology:\t" << getType( typename Cell::EntityTopology{} ) << "\n"
                << "\tCells count:\t" << cellsCount << "\n"
@@ -230,13 +228,13 @@ public:
            }
            str.flush();
         }
         CommunicatorType::Barrier();
         MPI::Barrier();
      }
   }

protected:
   MeshType localMesh;
   CommunicationGroup group = CommunicatorType::NullGroup;
   MPI_Comm group = MPI::NullGroup();
   int ghostLevels = 0;

   // vtkGhostType arrays for points and cells (cached for output into VTK formats)
+26 −26
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
#include <TNL/Containers/ByteArraySynchronizer.h>
#include <TNL/Containers/Vector.h>
#include <TNL/Matrices/DenseMatrix.h>
#include <TNL/MPI/Wrappers.h>

namespace TNL {
namespace Meshes {
@@ -40,7 +41,6 @@ class DistributedMeshSynchronizer
public:
   using DeviceType = typename DistributedMesh::DeviceType;
   using GlobalIndexType = typename DistributedMesh::GlobalIndexType;
   using CommunicatorType = typename DistributedMesh::CommunicatorType;
   using ByteArrayView = typename Base::ByteArrayView;
   using RequestsVector = typename Base::RequestsVector;

@@ -61,8 +61,8 @@ public:
                     "Global indices are not allocated properly." );

      group = mesh.getCommunicationGroup();
      const int rank = CommunicatorType::GetRank( group );
      const int nproc = CommunicatorType::GetSize( group );
      const int rank = MPI::GetRank( group );
      const int nproc = MPI::GetSize( group );

      // exchange the global index offsets so that each rank can determine the
      // owner of every entity by its global index
@@ -71,7 +71,7 @@ public:
      {
         Containers::Array< GlobalIndexType, Devices::Host, int > sendbuf( nproc );
         sendbuf.setValue( ownStart );
         CommunicatorType::Alltoall( sendbuf.getData(), 1,
         MPI::Alltoall( sendbuf.getData(), 1,
                        globalOffsets.getData(), 1,
                        group );
      }
@@ -110,7 +110,7 @@ public:
         for( int j = 0; j < nproc; j++ )
         for( int i = 0; i < nproc; i++ )
            sendbuf.setElement( j, i, localGhostCounts[ i ] );
         CommunicatorType::Alltoall( &sendbuf(0, 0), nproc,
         MPI::Alltoall( &sendbuf(0, 0), nproc,
                        &ghostEntitiesCounts(0, 0), nproc,
                        group );
      }
@@ -136,7 +136,7 @@ public:
         ghostOffsets[ 0 ] = ghostOffset;
         for( int i = 0; i < nproc; i++ ) {
            if( ghostEntitiesCounts( rank, i ) > 0 ) {
               requests.push_back( CommunicatorType::ISend(
               requests.push_back( MPI::Isend(
                        mesh.template getGlobalIndices< EntityDimension >().getData() + ghostOffset,
                        ghostEntitiesCounts( rank, i ),
                        i, 0, group ) );
@@ -151,7 +151,7 @@ public:
         // receive ghost indices from the neighboring ranks
         for( int j = 0; j < nproc; j++ ) {
            if( ghostEntitiesCounts( j, rank ) > 0 ) {
               requests.push_back( CommunicatorType::IRecv(
               requests.push_back( MPI::Irecv(
                        ghostNeighbors.getData() + ghostNeighborOffsets[ j ],
                        ghostEntitiesCounts( j, rank ),
                        j, 0, group ) );
@@ -159,7 +159,7 @@ public:
         }

         // wait for all communications to finish
         CommunicatorType::WaitAll( requests.data(), requests.size() );
         MPI::Waitall( requests.data(), requests.size() );

         // convert received ghost indices from global to local
         ghostNeighbors -= ownStart;
@@ -201,7 +201,7 @@ public:
   virtual void synchronizeByteArray( ByteArrayView array, int bytesPerValue ) override
   {
      auto requests = synchronizeByteArrayAsyncWorker( array, bytesPerValue );
      CommunicatorType::WaitAll( requests.data(), requests.size() );
      MPI::Waitall( requests.data(), requests.size() );
   }

   virtual RequestsVector synchronizeByteArrayAsyncWorker( ByteArrayView array, int bytesPerValue ) override
@@ -209,8 +209,8 @@ public:
      TNL_ASSERT_EQ( array.getSize(), bytesPerValue * ghostOffsets[ ghostOffsets.getSize() - 1 ],
                     "The array does not have the expected size." );

      const int rank = CommunicatorType::GetRank( group );
      const int nproc = CommunicatorType::GetSize( group );
      const int rank = MPI::GetRank( group );
      const int nproc = MPI::GetSize( group );

      // allocate send buffers (setSize does nothing if the array size is already correct)
      sendBuffers.setSize( bytesPerValue * ghostNeighborOffsets[ nproc ] );
@@ -221,7 +221,7 @@ public:
      // issue all receive async operations
      for( int j = 0; j < nproc; j++ ) {
         if( ghostEntitiesCounts( rank, j ) > 0 ) {
            requests.push_back( CommunicatorType::IRecv(
            requests.push_back( MPI::Irecv(
                     array.getData() + bytesPerValue * ghostOffsets[ j ],
                     bytesPerValue * ghostEntitiesCounts( rank, j ),
                     j, 0, group ) );
@@ -245,7 +245,7 @@ public:
            Algorithms::ParallelFor< DeviceType >::exec( (GlobalIndexType) 0, ghostEntitiesCounts( i, rank ), copy_kernel, offset );

            // issue async send operation
            requests.push_back( CommunicatorType::ISend(
            requests.push_back( MPI::Isend(
                     sendBuffersView.getData() + bytesPerValue * ghostNeighborOffsets[ i ],
                     bytesPerValue * ghostEntitiesCounts( i, rank ),
                     i, 0, group ) );
@@ -268,8 +268,8 @@ public:
   {
      TNL_ASSERT_EQ( pattern.getRows(), ghostOffsets[ ghostOffsets.getSize() - 1 ], "invalid sparse pattern matrix" );

      const int rank = CommunicatorType::GetRank( group );
      const int nproc = CommunicatorType::GetSize( group );
      const int rank = MPI::GetRank( group );
      const int nproc = MPI::GetSize( group );

      // buffer for asynchronous communication requests
      RequestsVector requests;
@@ -306,7 +306,7 @@ public:
            // send our row sizes to the target rank
            if( ! assumeConsistentRowCapacities )
               // issue async send operation
               requests.push_back( CommunicatorType::ISend(
               requests.push_back( MPI::Isend(
                        send_rowCapacities.getData() + send_rankOffsets[ i ],
                        ghostNeighborOffsets[ i + 1 ] - ghostNeighborOffsets[ i ],
                        i, 1, group ) );
@@ -334,7 +334,7 @@ public:
            if( send_rankOffsets[ i + 1 ] == send_rankOffsets[ i ] )
               continue;
            // issue async send operation
            requests.push_back( CommunicatorType::ISend(
            requests.push_back( MPI::Isend(
                     send_columnIndices.getData() + send_rowPointers[ send_rankOffsets[ i ] ],
                     send_rowPointers[ send_rankOffsets[ i + 1 ] ] - send_rowPointers[ send_rankOffsets[ i ] ],
                     i, 0, group ) );
@@ -369,7 +369,7 @@ public:
            else {
               // receive row sizes from the sender
               // issue async recv operation
               row_lengths_requests.push_back( CommunicatorType::IRecv(
               row_lengths_requests.push_back( MPI::Irecv(
                        recv_rowPointers.getData() + recv_rankOffsets[ i ],
                        ghostOffsets[ i + 1 ] - ghostOffsets[ i ],
                        i, 1, group ) );
@@ -378,7 +378,7 @@ public:

         if( ! assumeConsistentRowCapacities ) {
            // wait for all row lengths
            CommunicatorType::WaitAll( row_lengths_requests.data(), row_lengths_requests.size() );
            MPI::Waitall( row_lengths_requests.data(), row_lengths_requests.size() );

            // scan the rowPointers array to convert
            Containers::VectorView< GlobalIndexType, Devices::Host, GlobalIndexType > rowPointersView;
@@ -393,7 +393,7 @@ public:
            if( recv_rankOffsets[ i + 1 ] == recv_rankOffsets[ i ] )
               continue;
            // issue async recv operation
            requests.push_back( CommunicatorType::IRecv(
            requests.push_back( MPI::Irecv(
                     recv_columnIndices.getData() + recv_rowPointers[ recv_rankOffsets[ i ] ],
                     recv_rowPointers[ recv_rankOffsets[ i + 1 ] ] - recv_rowPointers[ recv_rankOffsets[ i ] ],
                     i, 0, group ) );
@@ -401,7 +401,7 @@ public:
      }

      // wait for all communications to finish
      CommunicatorType::WaitAll( requests.data(), requests.size() );
      MPI::Waitall( requests.data(), requests.size() );

      return std::make_tuple( recv_rankOffsets, recv_rowPointers, recv_columnIndices );
   }
@@ -445,7 +445,7 @@ public:

protected:
   // communication group taken from the distributed mesh
   typename CommunicatorType::CommunicationGroup group;
   MPI_Comm group;

   /**
    * Global offsets: array of size nproc where the i-th value is the lowest
+32 −33
Original line number Diff line number Diff line
@@ -19,14 +19,14 @@ namespace TNL {
namespace Meshes {
namespace DistributedMeshes {

template< typename CommunicatorType, typename GlobalIndexType >
template< typename GlobalIndexType >
auto
exchangeGhostEntitySeeds( typename CommunicatorType::CommunicationGroup group,
exchangeGhostEntitySeeds( MPI_Comm group,
                          const std::vector< std::vector< GlobalIndexType > >& seeds_vertex_indices,
                          const std::vector< std::vector< GlobalIndexType > >& seeds_entity_offsets )
{
   const int rank = CommunicatorType::GetRank( group );
   const int nproc = CommunicatorType::GetSize( group );
   const int rank = MPI::GetRank( group );
   const int nproc = MPI::GetSize( group );

   // exchange sizes of the arrays
   Containers::Array< GlobalIndexType, Devices::Host, int > sizes_vertex_indices( nproc ), sizes_entity_offsets( nproc );
@@ -36,10 +36,10 @@ exchangeGhostEntitySeeds( typename CommunicatorType::CommunicationGroup group,
         sendbuf_indices[ i ] = seeds_vertex_indices[ i ].size();
         sendbuf_offsets[ i ] = seeds_entity_offsets[ i ].size();
      }
      CommunicatorType::Alltoall( sendbuf_indices.getData(), 1,
      MPI::Alltoall( sendbuf_indices.getData(), 1,
                     sizes_vertex_indices.getData(), 1,
                     group );
      CommunicatorType::Alltoall( sendbuf_offsets.getData(), 1,
      MPI::Alltoall( sendbuf_offsets.getData(), 1,
                     sizes_entity_offsets.getData(), 1,
                     group );
   }
@@ -54,17 +54,17 @@ exchangeGhostEntitySeeds( typename CommunicatorType::CommunicationGroup group,
   }

   // buffer for asynchronous communication requests
   std::vector< typename CommunicatorType::Request > requests;
   std::vector< MPI_Request > requests;

   // issue all async receive operations
   for( int j = 0; j < nproc; j++ ) {
      if( j == rank )
          continue;
      requests.push_back( CommunicatorType::IRecv(
      requests.push_back( MPI::Irecv(
               foreign_seeds_vertex_indices[ j ].data(),
               foreign_seeds_vertex_indices[ j ].size(),
               j, 0, group ) );
      requests.push_back( CommunicatorType::IRecv(
      requests.push_back( MPI::Irecv(
               foreign_seeds_entity_offsets[ j ].data(),
               foreign_seeds_entity_offsets[ j ].size(),
               j, 1, group ) );
@@ -74,30 +74,30 @@ exchangeGhostEntitySeeds( typename CommunicatorType::CommunicationGroup group,
   for( int i = 0; i < nproc; i++ ) {
      if( i == rank )
          continue;
      requests.push_back( CommunicatorType::ISend(
      requests.push_back( MPI::Isend(
               seeds_vertex_indices[ i ].data(),
               seeds_vertex_indices[ i ].size(),
               i, 0, group ) );
      requests.push_back( CommunicatorType::ISend(
      requests.push_back( MPI::Isend(
               seeds_entity_offsets[ i ].data(),
               seeds_entity_offsets[ i ].size(),
               i, 1, group ) );
   }

   // wait for all communications to finish
   CommunicatorType::WaitAll( requests.data(), requests.size() );
   MPI::Waitall( requests.data(), requests.size() );

   return std::make_tuple( foreign_seeds_vertex_indices, foreign_seeds_entity_offsets );
}

template< typename CommunicatorType, typename GlobalIndexType >
template< typename GlobalIndexType >
auto
exchangeGhostIndices( typename CommunicatorType::CommunicationGroup group,
exchangeGhostIndices( MPI_Comm group,
                      const std::vector< std::vector< GlobalIndexType > >& foreign_ghost_indices,
                      const std::vector< std::vector< GlobalIndexType > >& seeds_local_indices )
{
   const int rank = CommunicatorType::GetRank( group );
   const int nproc = CommunicatorType::GetSize( group );
   const int rank = MPI::GetRank( group );
   const int nproc = MPI::GetSize( group );

   // allocate arrays for the results
   std::vector< std::vector< GlobalIndexType > > ghost_indices;
@@ -106,13 +106,13 @@ exchangeGhostIndices( typename CommunicatorType::CommunicationGroup group,
      ghost_indices[ i ].resize( seeds_local_indices[ i ].size() );

   // buffer for asynchronous communication requests
   std::vector< typename CommunicatorType::Request > requests;
   std::vector< MPI_Request > requests;

   // issue all async receive operations
   for( int j = 0; j < nproc; j++ ) {
      if( j == rank )
          continue;
      requests.push_back( CommunicatorType::IRecv(
      requests.push_back( MPI::Irecv(
               ghost_indices[ j ].data(),
               ghost_indices[ j ].size(),
               j, 0, group ) );
@@ -122,14 +122,14 @@ exchangeGhostIndices( typename CommunicatorType::CommunicationGroup group,
   for( int i = 0; i < nproc; i++ ) {
      if( i == rank )
          continue;
      requests.push_back( CommunicatorType::ISend(
      requests.push_back( MPI::Isend(
               foreign_ghost_indices[ i ].data(),
               foreign_ghost_indices[ i ].size(),
               i, 0, group ) );
   }

   // wait for all communications to finish
   CommunicatorType::WaitAll( requests.data(), requests.size() );
   MPI::Waitall( requests.data(), requests.size() );

   return ghost_indices;
}
@@ -145,7 +145,6 @@ distributeSubentities( DistributedMesh& mesh, bool preferHighRanks = true )
   using GlobalIndexType = typename DistributedMesh::GlobalIndexType;
   using LocalIndexType = typename DistributedMesh::LocalIndexType;
   using LocalMesh = typename DistributedMesh::MeshType;
   using CommunicatorType = typename DistributedMesh::CommunicatorType;

   static_assert( ! std::is_same< DeviceType, Devices::Cuda >::value,
                  "this method can be called only for host meshes" );
@@ -154,8 +153,8 @@ distributeSubentities( DistributedMesh& mesh, bool preferHighRanks = true )
   if( mesh.getGhostLevels() <= 0 )
      throw std::logic_error( "There are no ghost levels on the distributed mesh." );

   const int rank = CommunicatorType::GetRank( mesh.getCommunicationGroup() );
   const int nproc = CommunicatorType::GetSize( mesh.getCommunicationGroup() );
   const int rank = MPI::GetRank( mesh.getCommunicationGroup() );
   const int nproc = MPI::GetSize( mesh.getCommunicationGroup() );

   // 0. exchange cell data to prepare getCellOwner for use in getEntityOwner
   DistributedMeshSynchronizer< DistributedMesh, DistributedMesh::getMeshDimension() > cell_synchronizer;
@@ -235,7 +234,7 @@ distributeSubentities( DistributedMesh& mesh, bool preferHighRanks = true )

      Containers::Array< GlobalIndexType, Devices::Host, int > sendbuf( nproc );
      sendbuf.setValue( localEntitiesCount );
      CommunicatorType::Alltoall( sendbuf.getData(), 1,
      MPI::Alltoall( sendbuf.getData(), 1,
                     globalOffsets.getData(), 1,
                     mesh.getCommunicationGroup() );
   }
@@ -288,7 +287,7 @@ distributeSubentities( DistributedMesh& mesh, bool preferHighRanks = true )
   }

   // 5. exchange seeds for ghost entities
   const auto foreign_seeds = exchangeGhostEntitySeeds< CommunicatorType >( mesh.getCommunicationGroup(), seeds_vertex_indices, seeds_entity_offsets );
   const auto foreign_seeds = exchangeGhostEntitySeeds( mesh.getCommunicationGroup(), seeds_vertex_indices, seeds_entity_offsets );
   const auto& foreign_seeds_vertex_indices = std::get< 0 >( foreign_seeds );
   const auto& foreign_seeds_entity_offsets = std::get< 1 >( foreign_seeds );

@@ -373,7 +372,7 @@ distributeSubentities( DistributedMesh& mesh, bool preferHighRanks = true )
      });

      // 6b. exchange global ghost indices
      const auto ghost_indices = exchangeGhostIndices< CommunicatorType >( mesh.getCommunicationGroup(), foreign_ghost_indices, seeds_local_indices );
      const auto ghost_indices = exchangeGhostIndices( mesh.getCommunicationGroup(), foreign_ghost_indices, seeds_local_indices );

      // 6c. set the global indices of our ghost entities
      bool done = true;
@@ -387,7 +386,7 @@ distributeSubentities( DistributedMesh& mesh, bool preferHighRanks = true )

      // 6d. check if finished
      bool all_done = false;
      CommunicatorType::Allreduce( &done, &all_done, 1, MPI_LAND, mesh.getCommunicationGroup() );
      MPI::Allreduce( &done, &all_done, 1, MPI_LAND, mesh.getCommunicationGroup() );
      if( all_done )
         break;
   }
+5 −8
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@

#include <experimental/filesystem>

#include <TNL/Communicators/MpiCommunicator.h>
#include <TNL/MPI/Wrappers.h>
#include <TNL/Meshes/Readers/VTUReader.h>
#include <TNL/Meshes/MeshDetails/layers/EntityTags/Traits.h>

@@ -67,13 +67,13 @@ class PVTUReader
         throw MeshReaderError( "PVTUReader", "the file does not contain any <Piece> element." );

      // check that the number of pieces matches the number of MPI ranks
      const int nproc = CommunicatorType::GetSize( group );
      const int nproc = MPI::GetSize( group );
      if( (int) pieceSources.size() != nproc )
         throw MeshReaderError( "PVTUReader", "the number of subdomains does not match the number of MPI ranks ("
                                              + std::to_string(pieceSources.size()) + " vs " + std::to_string(nproc) + ")." );

      // read the local piece source
      const int rank = CommunicatorType::GetRank( group );
      const int rank = MPI::GetRank( group );
      localReader.setFileName( pieceSources[ rank ] );
      localReader.detectMesh();

@@ -100,12 +100,9 @@ class PVTUReader
#endif

public:
   using CommunicatorType = Communicators::MpiCommunicator;
   using CommunicationGroup = typename CommunicatorType::CommunicationGroup;

   PVTUReader() = default;

   PVTUReader( const std::string& fileName, CommunicationGroup group = CommunicatorType::AllGroup )
   PVTUReader( const std::string& fileName, MPI_Comm group = MPI::AllGroup() )
   : XMLVTK( fileName ), group( group )
   {}

@@ -233,7 +230,7 @@ public:
   }

protected:
   CommunicationGroup group;
   MPI_Comm group;

   int ghostLevels = 0;
   int minCommonVertices = 0;
Loading