diff --git a/IO/MeshDatabase.cpp b/IO/MeshDatabase.cpp index 1fad9231..2c03ddde 100644 --- a/IO/MeshDatabase.cpp +++ b/IO/MeshDatabase.cpp @@ -1,7 +1,8 @@ #include "IO/MeshDatabase.h" #include "IO/Mesh.h" +#include "IO/PackData.h" #include "IO/IOHelpers.h" -#include "common/MPI_Helpers.h" +#include "common/MPI.h" #include "common/Utilities.h" #include @@ -13,8 +14,6 @@ -/**************************************************** -****************************************************/ // MeshType template<> size_t packsize( const IO::MeshType& rhs ) @@ -247,80 +246,76 @@ void DatabaseEntry::read( const std::string& line ) // Gather the mesh databases from all processors inline int tod( int N ) { return (N+7)/sizeof(double); } -std::vector gatherAll( const std::vector& meshes, MPI_Comm comm ) +std::vector gatherAll( const std::vector& meshes, const Utilities::MPI& comm ) { - #ifdef USE_MPI - PROFILE_START("gatherAll"); - PROFILE_START("gatherAll-pack",2); - int size = MPI_WORLD_SIZE(); - // First pack the mesh data to local buffers - int localsize = 0; - for (size_t i=0; i data; - pos = 0; - while ( pos < globalsize ) { - MeshDatabase tmp; - unpack(tmp,(char*)&globalbuf[pos]); - pos += tod(packsize(tmp)); - std::map::iterator it = data.find(tmp.name); - if ( it==data.end() ) { - data[tmp.name] = tmp; - } else { - for (size_t i=0; isecond.domains.push_back(tmp.domains[i]); - for (size_t i=0; isecond.variables.push_back(tmp.variables[i]); - it->second.variable_data.insert(tmp.variable_data.begin(),tmp.variable_data.end()); - } - } - for (std::map::iterator it=data.begin(); it!=data.end(); ++it) { - // Get the unique variables - std::set data2(it->second.variables.begin(),it->second.variables.end()); - it->second.variables = std::vector(data2.begin(),data2.end()); - } - // Free temporary memory - delete [] localbuf; - delete [] recvsize; - delete [] disp; - delete [] globalbuf; - // Return the results - std::vector data2(data.size()); - size_t i=0; - for (std::map::iterator it=data.begin(); it!=data.end(); ++it, ++i) - data2[i] = it->second; - PROFILE_STOP("gatherAll-unpack",2); - PROFILE_STOP("gatherAll"); - return data2; - #else + if ( comm.getSize() == 1 ) return meshes; - #endif + PROFILE_START("gatherAll"); + PROFILE_START("gatherAll-pack",2); + int size = comm.getSize(); + // First pack the mesh data to local buffers + int localsize = 0; + for (size_t i=0; i data; + pos = 0; + while ( pos < globalsize ) { + MeshDatabase tmp; + unpack(tmp,(char*)&globalbuf[pos]); + pos += tod(packsize(tmp)); + std::map::iterator it = data.find(tmp.name); + if ( it==data.end() ) { + data[tmp.name] = tmp; + } else { + for (size_t i=0; isecond.domains.push_back(tmp.domains[i]); + for (size_t i=0; isecond.variables.push_back(tmp.variables[i]); + it->second.variable_data.insert(tmp.variable_data.begin(),tmp.variable_data.end()); + } + } + for (auto it=data.begin(); it!=data.end(); ++it) { + // Get the unique variables + std::set data2(it->second.variables.begin(),it->second.variables.end()); + it->second.variables = std::vector(data2.begin(),data2.end()); + } + // Free temporary memory + delete [] localbuf; + delete [] disp; + delete [] globalbuf; + // Return the results + std::vector data2(data.size()); + size_t i=0; + for (std::map::iterator it=data.begin(); it!=data.end(); ++it, ++i) + data2[i] = it->second; + PROFILE_STOP("gatherAll-unpack",2); + PROFILE_STOP("gatherAll"); + return data2; } diff --git a/IO/MeshDatabase.h b/IO/MeshDatabase.h index 9f544925..8e501624 100644 --- a/IO/MeshDatabase.h +++ b/IO/MeshDatabase.h @@ -2,7 +2,7 @@ #define MeshDatabase_INC #include "IO/Mesh.h" -#include "common/MPI_Helpers.h" +#include "common/MPI.h" #include #include @@ -70,7 +70,7 @@ public: //! Gather the mesh databases from all processors -std::vector gatherAll( const std::vector& meshes, MPI_Comm comm ); +std::vector gatherAll( const std::vector& meshes, const Utilities::MPI& comm ); //! Write the mesh databases to a file diff --git a/analysis/runAnalysis.cpp b/analysis/runAnalysis.cpp index 9cb85b6c..c09b71c2 100644 --- a/analysis/runAnalysis.cpp +++ b/analysis/runAnalysis.cpp @@ -462,7 +462,7 @@ private: /****************************************************************** * MPI comm wrapper for use with analysis * ******************************************************************/ -runAnalysis::commWrapper::commWrapper( int tag_, MPI_Comm comm_, runAnalysis* analysis_ ): +runAnalysis::commWrapper::commWrapper( int tag_, const Utilities::MPI& comm_, runAnalysis* analysis_ ): comm(comm_), tag(tag_), analysis(analysis_) @@ -479,7 +479,7 @@ runAnalysis::commWrapper::~commWrapper() { if ( tag == -1 ) return; - MPI_Barrier( comm ); + comm.barrier(); analysis->d_comm_used[tag] = false; } runAnalysis::commWrapper runAnalysis::getComm( ) @@ -496,10 +496,10 @@ runAnalysis::commWrapper runAnalysis::getComm( ) if ( tag == -1 ) ERROR("Unable to get comm"); } - MPI_Bcast( &tag, 1, MPI_INT, 0, d_comm ); + tag = d_comm.bcast( tag, 0 ); d_comm_used[tag] = true; - if ( d_comms[tag] == MPI_COMM_NULL ) - MPI_Comm_dup( MPI_COMM_WORLD, &d_comms[tag] ); + if ( d_comms[tag].isNull() ) + d_comms[tag] = d_comm.dup(); return commWrapper(tag,d_comms[tag],this); } @@ -560,7 +560,7 @@ runAnalysis::runAnalysis( std::shared_ptr input_db, d_restartFile = restart_file + "." + rankString; - d_rank = MPI_WORLD_RANK(); + d_rank = d_comm.getRank(); writeIDMap(ID_map_struct(),0,id_map_filename); // Initialize IO for silo IO::initialize("","silo","false"); @@ -629,11 +629,8 @@ runAnalysis::runAnalysis( std::shared_ptr input_db, // Initialize the comms - MPI_Comm_dup(MPI_COMM_WORLD,&d_comm); - for (int i=0; i<1024; i++) { - d_comms[i] = MPI_COMM_NULL; + for (int i=0; i<1024; i++) d_comm_used[i] = false; - } // Initialize the threads int N_threads = db->getWithDefault( "N_threads", 4 ); auto method = db->getWithDefault( "load_balance", "default" ); @@ -643,12 +640,6 @@ runAnalysis::~runAnalysis( ) { // Finish processing analysis finish(); - // Clear internal data - MPI_Comm_free( &d_comm ); - for (int i=0; i<1024; i++) { - if ( d_comms[i] != MPI_COMM_NULL ) - MPI_Comm_free(&d_comms[i]); - } } void runAnalysis::finish( ) { @@ -662,7 +653,7 @@ void runAnalysis::finish( ) d_wait_subphase.reset(); d_wait_restart.reset(); // Syncronize - MPI_Barrier( d_comm ); + d_comm.barrier(); PROFILE_STOP("finish"); } @@ -915,12 +906,12 @@ void runAnalysis::run(int timestep, std::shared_ptr input_db, TwoPhase // Spawn a thread to write the restart file // if ( matches(type,AnalysisType::CreateRestart) ) { if (timestep%d_restart_interval==0){ - auto Restart_db = input_db->cloneDatabase(); - // Restart_db->putScalar( "Restart", true ); + if (d_rank==0) { - // std::ofstream OutStream("Restart.db"); - // Restart_db->print(OutStream, ""); - // OutStream.close(); + input_db->putScalar( "Restart", true ); + std::ofstream OutStream("Restart.db"); + input_db->print(OutStream, ""); + OutStream.close(); } // Write the restart file (using a seperate thread) auto work = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np); @@ -1019,21 +1010,21 @@ void runAnalysis::basic(int timestep, std::shared_ptr input_db, SubPha cfq = std::shared_ptr(new double[19*d_Np],DeleteArray); ScaLBL_CopyToHost(cfq.get(),fq,19*d_Np*sizeof(double)); ScaLBL_CopyToHost(cDen.get(),Den,2*d_Np*sizeof(double)); - // clone the input database to avoid modifying shared data - auto Restart_db = input_db->cloneDatabase(); - auto tmp_color_db = Restart_db->getDatabase( "Color" ); - tmp_color_db->putScalar("timestep",timestep); - tmp_color_db->putScalar( "Restart", true ); - Restart_db->putDatabase("Color", tmp_color_db); + if (d_rank==0) { + color_db->putScalar("timestep",timestep); + color_db->putScalar( "Restart", true ); + input_db->putDatabase("Color", color_db); std::ofstream OutStream("Restart.db"); - Restart_db->print(OutStream, ""); + input_db->print(OutStream, ""); OutStream.close(); + } // Write the restart file (using a seperate thread) auto work1 = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np); work1->add_dependency(d_wait_restart); d_wait_restart = d_tpool.add_work(work1); + } if (timestep%d_visualization_interval==0){ diff --git a/analysis/runAnalysis.h b/analysis/runAnalysis.h index 55032d65..33adbcb0 100644 --- a/analysis/runAnalysis.h +++ b/analysis/runAnalysis.h @@ -68,10 +68,10 @@ public: class commWrapper { public: - MPI_Comm comm; + Utilities::MPI comm; int tag; runAnalysis *analysis; - commWrapper( int tag, MPI_Comm comm, runAnalysis *analysis ); + commWrapper( int tag, const Utilities::MPI& comm, runAnalysis *analysis ); commWrapper( ) = delete; commWrapper( const commWrapper &rhs ) = delete; commWrapper& operator=( const commWrapper &rhs ) = delete; @@ -100,8 +100,8 @@ private: BlobIDList d_last_id_map; std::vector d_meshData; std::string d_restartFile; - MPI_Comm d_comm; - MPI_Comm d_comms[1024]; + Utilities::MPI d_comm; + Utilities::MPI d_comms[1024]; volatile bool d_comm_used[1024]; std::shared_ptr d_ScaLBL_Comm; diff --git a/analysis/uCT.cpp b/analysis/uCT.cpp index 912f8e85..28d677c1 100644 --- a/analysis/uCT.cpp +++ b/analysis/uCT.cpp @@ -228,8 +228,7 @@ void filter_final( Array& ID, Array& Dist, Array& Mean, Array& Dist1, Array& Dist2 ) { PROFILE_SCOPED(timer,"filter_final"); - int rank; - MPI_Comm_rank(Dm.Comm,&rank); + int rank = Dm.Comm.getRank(); int Nx = Dm.Nx-2; int Ny = Dm.Ny-2; int Nz = Dm.Nz-2; @@ -242,7 +241,7 @@ void filter_final( Array& ID, Array& Dist, float tmp = 0; for (size_t i=0; i(Dist0.length()) ); const float dx1 = 0.3*tmp; const float dx2 = 1.05*dx1; if (rank==0) @@ -285,7 +284,7 @@ void filter_final( Array& ID, Array& Dist, Phase.fill(1); ComputeGlobalBlobIDs( Nx, Ny, Nz, Dm.rank_info, Phase, SignDist, 0, 0, GlobalBlobID, Dm.Comm ); fillInt.fill(GlobalBlobID); - int N_blobs = maxReduce(Dm.Comm,GlobalBlobID.max()+1); + int N_blobs = Dm.Comm.maxReduce(GlobalBlobID.max()+1); std::vector mean(N_blobs,0); std::vector count(N_blobs,0); for (int k=1; k<=Nz; k++) { @@ -321,8 +320,8 @@ void filter_final( Array& ID, Array& Dist, } } } - mean = sumReduce(Dm.Comm,mean); - count = sumReduce(Dm.Comm,count); + mean = Dm.Comm.sumReduce(mean); + count = Dm.Comm.sumReduce(count); for (size_t i=0; i