gradually putting it all back together

This commit is contained in:
James McClure 2021-01-04 23:35:10 -05:00
parent 0983e63fbf
commit cf52427539
5 changed files with 101 additions and 116 deletions

View File

@ -1,7 +1,8 @@
#include "IO/MeshDatabase.h" #include "IO/MeshDatabase.h"
#include "IO/Mesh.h" #include "IO/Mesh.h"
#include "IO/PackData.h"
#include "IO/IOHelpers.h" #include "IO/IOHelpers.h"
#include "common/MPI_Helpers.h" #include "common/MPI.h"
#include "common/Utilities.h" #include "common/Utilities.h"
#include <vector> #include <vector>
@ -13,8 +14,6 @@
/****************************************************
****************************************************/
// MeshType // MeshType
template<> template<>
size_t packsize<IO::MeshType>( const IO::MeshType& rhs ) size_t packsize<IO::MeshType>( const IO::MeshType& rhs )
@ -247,80 +246,76 @@ void DatabaseEntry::read( const std::string& line )
// Gather the mesh databases from all processors // Gather the mesh databases from all processors
inline int tod( int N ) { return (N+7)/sizeof(double); } inline int tod( int N ) { return (N+7)/sizeof(double); }
std::vector<MeshDatabase> gatherAll( const std::vector<MeshDatabase>& meshes, MPI_Comm comm ) std::vector<MeshDatabase> gatherAll( const std::vector<MeshDatabase>& meshes, const Utilities::MPI& comm )
{ {
#ifdef USE_MPI if ( comm.getSize() == 1 )
PROFILE_START("gatherAll");
PROFILE_START("gatherAll-pack",2);
int size = MPI_WORLD_SIZE();
// First pack the mesh data to local buffers
int localsize = 0;
for (size_t i=0; i<meshes.size(); i++)
localsize += tod(packsize(meshes[i]));
auto localbuf = new double[localsize];
int pos = 0;
for (size_t i=0; i<meshes.size(); i++) {
pack( meshes[i], (char*) &localbuf[pos] );
pos += tod(packsize(meshes[i]));
}
PROFILE_STOP("gatherAll-pack",2);
// Get the number of bytes each processor will be sending/recieving
PROFILE_START("gatherAll-send1",2);
auto recvsize = new int[size];
MPI_Allgather(&localsize,1,MPI_INT,recvsize,1,MPI_INT,comm);
int globalsize = recvsize[0];
auto disp = new int[size];
disp[0] = 0;
for (int i=1; i<size; i++) {
disp[i] = disp[i-1] + recvsize[i];
globalsize += recvsize[i];
}
PROFILE_STOP("gatherAll-send1",2);
// Send/recv the global data
PROFILE_START("gatherAll-send2",2);
auto globalbuf = new double[globalsize];
MPI_Allgatherv(localbuf,localsize,MPI_DOUBLE,globalbuf,recvsize,disp,MPI_DOUBLE,comm);
PROFILE_STOP("gatherAll-send2",2);
// Unpack the data
PROFILE_START("gatherAll-unpack",2);
std::map<std::string,MeshDatabase> data;
pos = 0;
while ( pos < globalsize ) {
MeshDatabase tmp;
unpack(tmp,(char*)&globalbuf[pos]);
pos += tod(packsize(tmp));
std::map<std::string,MeshDatabase>::iterator it = data.find(tmp.name);
if ( it==data.end() ) {
data[tmp.name] = tmp;
} else {
for (size_t i=0; i<tmp.domains.size(); i++)
it->second.domains.push_back(tmp.domains[i]);
for (size_t i=0; i<tmp.variables.size(); i++)
it->second.variables.push_back(tmp.variables[i]);
it->second.variable_data.insert(tmp.variable_data.begin(),tmp.variable_data.end());
}
}
for (std::map<std::string,MeshDatabase>::iterator it=data.begin(); it!=data.end(); ++it) {
// Get the unique variables
std::set<VariableDatabase> data2(it->second.variables.begin(),it->second.variables.end());
it->second.variables = std::vector<VariableDatabase>(data2.begin(),data2.end());
}
// Free temporary memory
delete [] localbuf;
delete [] recvsize;
delete [] disp;
delete [] globalbuf;
// Return the results
std::vector<MeshDatabase> data2(data.size());
size_t i=0;
for (std::map<std::string,MeshDatabase>::iterator it=data.begin(); it!=data.end(); ++it, ++i)
data2[i] = it->second;
PROFILE_STOP("gatherAll-unpack",2);
PROFILE_STOP("gatherAll");
return data2;
#else
return meshes; return meshes;
#endif PROFILE_START("gatherAll");
PROFILE_START("gatherAll-pack",2);
int size = comm.getSize();
// First pack the mesh data to local buffers
int localsize = 0;
for (size_t i=0; i<meshes.size(); i++)
localsize += tod(packsize(meshes[i]));
auto localbuf = new double[localsize];
int pos = 0;
for (size_t i=0; i<meshes.size(); i++) {
pack( meshes[i], (char*) &localbuf[pos] );
pos += tod(packsize(meshes[i]));
}
PROFILE_STOP("gatherAll-pack",2);
// Get the number of bytes each processor will be sending/recieving
PROFILE_START("gatherAll-send1",2);
auto recvsize = comm.allGather( localsize );
int globalsize = recvsize[0];
auto disp = new int[size];
disp[0] = 0;
for (int i=1; i<size; i++) {
disp[i] = disp[i-1] + recvsize[i];
globalsize += recvsize[i];
}
PROFILE_STOP("gatherAll-send1",2);
// Send/recv the global data
PROFILE_START("gatherAll-send2",2);
auto globalbuf = new double[globalsize];
comm.allGather(localbuf,localsize,globalbuf,recvsize.data(),disp,true);
PROFILE_STOP("gatherAll-send2",2);
// Unpack the data
PROFILE_START("gatherAll-unpack",2);
std::map<std::string,MeshDatabase> data;
pos = 0;
while ( pos < globalsize ) {
MeshDatabase tmp;
unpack(tmp,(char*)&globalbuf[pos]);
pos += tod(packsize(tmp));
std::map<std::string,MeshDatabase>::iterator it = data.find(tmp.name);
if ( it==data.end() ) {
data[tmp.name] = tmp;
} else {
for (size_t i=0; i<tmp.domains.size(); i++)
it->second.domains.push_back(tmp.domains[i]);
for (size_t i=0; i<tmp.variables.size(); i++)
it->second.variables.push_back(tmp.variables[i]);
it->second.variable_data.insert(tmp.variable_data.begin(),tmp.variable_data.end());
}
}
for (auto it=data.begin(); it!=data.end(); ++it) {
// Get the unique variables
std::set<VariableDatabase> data2(it->second.variables.begin(),it->second.variables.end());
it->second.variables = std::vector<VariableDatabase>(data2.begin(),data2.end());
}
// Free temporary memory
delete [] localbuf;
delete [] disp;
delete [] globalbuf;
// Return the results
std::vector<MeshDatabase> data2(data.size());
size_t i=0;
for (std::map<std::string,MeshDatabase>::iterator it=data.begin(); it!=data.end(); ++it, ++i)
data2[i] = it->second;
PROFILE_STOP("gatherAll-unpack",2);
PROFILE_STOP("gatherAll");
return data2;
} }

View File

@ -2,7 +2,7 @@
#define MeshDatabase_INC #define MeshDatabase_INC
#include "IO/Mesh.h" #include "IO/Mesh.h"
#include "common/MPI_Helpers.h" #include "common/MPI.h"
#include <iostream> #include <iostream>
#include <memory> #include <memory>
@ -70,7 +70,7 @@ public:
//! Gather the mesh databases from all processors //! Gather the mesh databases from all processors
std::vector<MeshDatabase> gatherAll( const std::vector<MeshDatabase>& meshes, MPI_Comm comm ); std::vector<MeshDatabase> gatherAll( const std::vector<MeshDatabase>& meshes, const Utilities::MPI& comm );
//! Write the mesh databases to a file //! Write the mesh databases to a file

View File

@ -462,7 +462,7 @@ private:
/****************************************************************** /******************************************************************
* MPI comm wrapper for use with analysis * * MPI comm wrapper for use with analysis *
******************************************************************/ ******************************************************************/
runAnalysis::commWrapper::commWrapper( int tag_, MPI_Comm comm_, runAnalysis* analysis_ ): runAnalysis::commWrapper::commWrapper( int tag_, const Utilities::MPI& comm_, runAnalysis* analysis_ ):
comm(comm_), comm(comm_),
tag(tag_), tag(tag_),
analysis(analysis_) analysis(analysis_)
@ -479,7 +479,7 @@ runAnalysis::commWrapper::~commWrapper()
{ {
if ( tag == -1 ) if ( tag == -1 )
return; return;
MPI_Barrier( comm ); comm.barrier();
analysis->d_comm_used[tag] = false; analysis->d_comm_used[tag] = false;
} }
runAnalysis::commWrapper runAnalysis::getComm( ) runAnalysis::commWrapper runAnalysis::getComm( )
@ -496,10 +496,10 @@ runAnalysis::commWrapper runAnalysis::getComm( )
if ( tag == -1 ) if ( tag == -1 )
ERROR("Unable to get comm"); ERROR("Unable to get comm");
} }
MPI_Bcast( &tag, 1, MPI_INT, 0, d_comm ); tag = d_comm.bcast( tag, 0 );
d_comm_used[tag] = true; d_comm_used[tag] = true;
if ( d_comms[tag] == MPI_COMM_NULL ) if ( d_comms[tag].isNull() )
MPI_Comm_dup( MPI_COMM_WORLD, &d_comms[tag] ); d_comms[tag] = d_comm.dup();
return commWrapper(tag,d_comms[tag],this); return commWrapper(tag,d_comms[tag],this);
} }
@ -560,7 +560,7 @@ runAnalysis::runAnalysis( std::shared_ptr<Database> input_db,
d_restartFile = restart_file + "." + rankString; d_restartFile = restart_file + "." + rankString;
d_rank = MPI_WORLD_RANK(); d_rank = d_comm.getRank();
writeIDMap(ID_map_struct(),0,id_map_filename); writeIDMap(ID_map_struct(),0,id_map_filename);
// Initialize IO for silo // Initialize IO for silo
IO::initialize("","silo","false"); IO::initialize("","silo","false");
@ -629,11 +629,8 @@ runAnalysis::runAnalysis( std::shared_ptr<Database> input_db,
// Initialize the comms // Initialize the comms
MPI_Comm_dup(MPI_COMM_WORLD,&d_comm); for (int i=0; i<1024; i++)
for (int i=0; i<1024; i++) {
d_comms[i] = MPI_COMM_NULL;
d_comm_used[i] = false; d_comm_used[i] = false;
}
// Initialize the threads // Initialize the threads
int N_threads = db->getWithDefault<int>( "N_threads", 4 ); int N_threads = db->getWithDefault<int>( "N_threads", 4 );
auto method = db->getWithDefault<std::string>( "load_balance", "default" ); auto method = db->getWithDefault<std::string>( "load_balance", "default" );
@ -643,12 +640,6 @@ runAnalysis::~runAnalysis( )
{ {
// Finish processing analysis // Finish processing analysis
finish(); finish();
// Clear internal data
MPI_Comm_free( &d_comm );
for (int i=0; i<1024; i++) {
if ( d_comms[i] != MPI_COMM_NULL )
MPI_Comm_free(&d_comms[i]);
}
} }
void runAnalysis::finish( ) void runAnalysis::finish( )
{ {
@ -662,7 +653,7 @@ void runAnalysis::finish( )
d_wait_subphase.reset(); d_wait_subphase.reset();
d_wait_restart.reset(); d_wait_restart.reset();
// Syncronize // Syncronize
MPI_Barrier( d_comm ); d_comm.barrier();
PROFILE_STOP("finish"); PROFILE_STOP("finish");
} }
@ -915,12 +906,12 @@ void runAnalysis::run(int timestep, std::shared_ptr<Database> input_db, TwoPhase
// Spawn a thread to write the restart file // Spawn a thread to write the restart file
// if ( matches(type,AnalysisType::CreateRestart) ) { // if ( matches(type,AnalysisType::CreateRestart) ) {
if (timestep%d_restart_interval==0){ if (timestep%d_restart_interval==0){
auto Restart_db = input_db->cloneDatabase();
// Restart_db->putScalar<bool>( "Restart", true );
if (d_rank==0) { if (d_rank==0) {
// std::ofstream OutStream("Restart.db"); input_db->putScalar<bool>( "Restart", true );
// Restart_db->print(OutStream, ""); std::ofstream OutStream("Restart.db");
// OutStream.close(); input_db->print(OutStream, "");
OutStream.close();
} }
// Write the restart file (using a seperate thread) // Write the restart file (using a seperate thread)
auto work = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np); auto work = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np);
@ -1019,21 +1010,21 @@ void runAnalysis::basic(int timestep, std::shared_ptr<Database> input_db, SubPha
cfq = std::shared_ptr<double>(new double[19*d_Np],DeleteArray<double>); cfq = std::shared_ptr<double>(new double[19*d_Np],DeleteArray<double>);
ScaLBL_CopyToHost(cfq.get(),fq,19*d_Np*sizeof(double)); ScaLBL_CopyToHost(cfq.get(),fq,19*d_Np*sizeof(double));
ScaLBL_CopyToHost(cDen.get(),Den,2*d_Np*sizeof(double)); ScaLBL_CopyToHost(cDen.get(),Den,2*d_Np*sizeof(double));
// clone the input database to avoid modifying shared data
auto Restart_db = input_db->cloneDatabase();
auto tmp_color_db = Restart_db->getDatabase( "Color" );
tmp_color_db->putScalar<int>("timestep",timestep);
tmp_color_db->putScalar<bool>( "Restart", true );
Restart_db->putDatabase("Color", tmp_color_db);
if (d_rank==0) { if (d_rank==0) {
color_db->putScalar<int>("timestep",timestep);
color_db->putScalar<bool>( "Restart", true );
input_db->putDatabase("Color", color_db);
std::ofstream OutStream("Restart.db"); std::ofstream OutStream("Restart.db");
Restart_db->print(OutStream, ""); input_db->print(OutStream, "");
OutStream.close(); OutStream.close();
} }
// Write the restart file (using a seperate thread) // Write the restart file (using a seperate thread)
auto work1 = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np); auto work1 = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np);
work1->add_dependency(d_wait_restart); work1->add_dependency(d_wait_restart);
d_wait_restart = d_tpool.add_work(work1); d_wait_restart = d_tpool.add_work(work1);
} }
if (timestep%d_visualization_interval==0){ if (timestep%d_visualization_interval==0){

View File

@ -68,10 +68,10 @@ public:
class commWrapper class commWrapper
{ {
public: public:
MPI_Comm comm; Utilities::MPI comm;
int tag; int tag;
runAnalysis *analysis; runAnalysis *analysis;
commWrapper( int tag, MPI_Comm comm, runAnalysis *analysis ); commWrapper( int tag, const Utilities::MPI& comm, runAnalysis *analysis );
commWrapper( ) = delete; commWrapper( ) = delete;
commWrapper( const commWrapper &rhs ) = delete; commWrapper( const commWrapper &rhs ) = delete;
commWrapper& operator=( const commWrapper &rhs ) = delete; commWrapper& operator=( const commWrapper &rhs ) = delete;
@ -100,8 +100,8 @@ private:
BlobIDList d_last_id_map; BlobIDList d_last_id_map;
std::vector<IO::MeshDataStruct> d_meshData; std::vector<IO::MeshDataStruct> d_meshData;
std::string d_restartFile; std::string d_restartFile;
MPI_Comm d_comm; Utilities::MPI d_comm;
MPI_Comm d_comms[1024]; Utilities::MPI d_comms[1024];
volatile bool d_comm_used[1024]; volatile bool d_comm_used[1024];
std::shared_ptr<ScaLBL_Communicator> d_ScaLBL_Comm; std::shared_ptr<ScaLBL_Communicator> d_ScaLBL_Comm;

View File

@ -228,8 +228,7 @@ void filter_final( Array<char>& ID, Array<float>& Dist,
Array<float>& Mean, Array<float>& Dist1, Array<float>& Dist2 ) Array<float>& Mean, Array<float>& Dist1, Array<float>& Dist2 )
{ {
PROFILE_SCOPED(timer,"filter_final"); PROFILE_SCOPED(timer,"filter_final");
int rank; int rank = Dm.Comm.getRank();
MPI_Comm_rank(Dm.Comm,&rank);
int Nx = Dm.Nx-2; int Nx = Dm.Nx-2;
int Ny = Dm.Ny-2; int Ny = Dm.Ny-2;
int Nz = Dm.Nz-2; int Nz = Dm.Nz-2;
@ -242,7 +241,7 @@ void filter_final( Array<char>& ID, Array<float>& Dist,
float tmp = 0; float tmp = 0;
for (size_t i=0; i<Dist0.length(); i++) for (size_t i=0; i<Dist0.length(); i++)
tmp += Dist0(i)*Dist0(i); tmp += Dist0(i)*Dist0(i);
tmp = sqrt( sumReduce(Dm.Comm,tmp) / sumReduce(Dm.Comm,(float)Dist0.length()) ); tmp = sqrt( Dm.Comm.sumReduce(tmp) / Dm.Comm.sumReduce<float>(Dist0.length()) );
const float dx1 = 0.3*tmp; const float dx1 = 0.3*tmp;
const float dx2 = 1.05*dx1; const float dx2 = 1.05*dx1;
if (rank==0) if (rank==0)
@ -285,7 +284,7 @@ void filter_final( Array<char>& ID, Array<float>& Dist,
Phase.fill(1); Phase.fill(1);
ComputeGlobalBlobIDs( Nx, Ny, Nz, Dm.rank_info, Phase, SignDist, 0, 0, GlobalBlobID, Dm.Comm ); ComputeGlobalBlobIDs( Nx, Ny, Nz, Dm.rank_info, Phase, SignDist, 0, 0, GlobalBlobID, Dm.Comm );
fillInt.fill(GlobalBlobID); fillInt.fill(GlobalBlobID);
int N_blobs = maxReduce(Dm.Comm,GlobalBlobID.max()+1); int N_blobs = Dm.Comm.maxReduce(GlobalBlobID.max()+1);
std::vector<float> mean(N_blobs,0); std::vector<float> mean(N_blobs,0);
std::vector<int> count(N_blobs,0); std::vector<int> count(N_blobs,0);
for (int k=1; k<=Nz; k++) { for (int k=1; k<=Nz; k++) {
@ -321,8 +320,8 @@ void filter_final( Array<char>& ID, Array<float>& Dist,
} }
} }
} }
mean = sumReduce(Dm.Comm,mean); mean = Dm.Comm.sumReduce(mean);
count = sumReduce(Dm.Comm,count); count = Dm.Comm.sumReduce(count);
for (size_t i=0; i<mean.size(); i++) for (size_t i=0; i<mean.size(); i++)
mean[i] /= count[i]; mean[i] /= count[i];
/*if (rank==0) { /*if (rank==0) {