Updating interface for insitu analysis

This commit is contained in:
Mark Berrill 2018-03-08 13:03:22 -05:00
parent c472622e8e
commit dd21dd3985
5 changed files with 4448 additions and 4225 deletions

View File

@ -1,22 +1,22 @@
// Run the analysis, blob identification, and write restart files // Run the analysis, blob identification, and write restart files
#include "analysis/runAnalysis.h"
#include "analysis/analysis.h"
#include "common/Array.h" #include "common/Array.h"
#include "common/Communication.h" #include "common/Communication.h"
#include "common/MPI_Helpers.h" #include "common/MPI_Helpers.h"
#include "common/ScaLBL.h" #include "common/ScaLBL.h"
#include "IO/MeshDatabase.h" #include "IO/MeshDatabase.h"
#include "threadpool/thread_pool.h"
#include "ProfilerApp.h"
//#define ANALYSIS_INTERVAL 6
//#define ANALYSIS_INTERVAL 1000
//#define BLOBID_INTERVAL 1000
enum class AnalysisType : uint64_t { AnalyzeNone=0, IdentifyBlobs=0x01, CopyPhaseIndicator=0x02,
CopySimState=0x04, ComputeAverages=0x08, CreateRestart=0x10, WriteVis=0x20 };
AnalysisType& operator |=(AnalysisType &lhs, AnalysisType rhs) AnalysisType& operator |=(AnalysisType &lhs, AnalysisType rhs)
{ {
lhs = static_cast<AnalysisType> ( lhs = static_cast<AnalysisType>(
static_cast<std::underlying_type<AnalysisType>::type>(lhs) | static_cast<std::underlying_type<AnalysisType>::type>(lhs) |
static_cast<std::underlying_type<AnalysisType>::type>(rhs) static_cast<std::underlying_type<AnalysisType>::type>(rhs)
); );
return lhs; return lhs;
} }
@ -34,15 +34,6 @@ void DeleteArray( const TYPE *p )
} }
// Structure used to store ids
struct AnalysisWaitIdStruct {
ThreadPool::thread_id_t blobID;
ThreadPool::thread_id_t analysis;
ThreadPool::thread_id_t vis;
ThreadPool::thread_id_t restart;
};
// Helper class to write the restart file from a seperate thread // Helper class to write the restart file from a seperate thread
class WriteRestartWorkItem: public ThreadPool::WorkItemRet<void> class WriteRestartWorkItem: public ThreadPool::WorkItemRet<void>
{ {
@ -65,27 +56,24 @@ private:
// Helper class to compute the blob ids // Helper class to compute the blob ids
static const std::string id_map_filename = "lbpm_id_map.txt"; static const std::string id_map_filename = "lbpm_id_map.txt";
typedef std::shared_ptr<std::pair<int,IntArray> > BlobIDstruct;
typedef std::shared_ptr<std::vector<BlobIDType> > BlobIDList;
class BlobIdentificationWorkItem1: public ThreadPool::WorkItemRet<void> class BlobIdentificationWorkItem1: public ThreadPool::WorkItemRet<void>
{ {
public: public:
BlobIdentificationWorkItem1( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_, BlobIdentificationWorkItem1( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_,
std::shared_ptr<const DoubleArray> phase_, const DoubleArray& dist_, std::shared_ptr<const DoubleArray> phase_, const DoubleArray& dist_,
BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ ): BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_, runAnalysis::commWrapper&& comm_ ):
timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_), timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_),
phase(phase_), dist(dist_), last_id(last_id_), new_index(new_index_), new_id(new_id_), new_list(new_list_) phase(phase_), dist(dist_), last_id(last_id_), new_index(new_index_), new_id(new_id_), new_list(new_list_), comm(std::move(comm_))
{ {
MPI_Comm_dup(MPI_COMM_WORLD,&newcomm);
} }
~BlobIdentificationWorkItem1() { MPI_Comm_free(&newcomm); } ~BlobIdentificationWorkItem1() { }
virtual void run() { virtual void run() {
// Compute the global blob id and compare to the previous version // Compute the global blob id and compare to the previous version
PROFILE_START("Identify blobs",1); PROFILE_START("Identify blobs",1);
double vF = 0.0; double vF = 0.0;
double vS = -1.0; // one voxel buffer region around solid double vS = -1.0; // one voxel buffer region around solid
IntArray& ids = new_index->second; IntArray& ids = new_index->second;
new_index->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,*phase,dist,vF,vS,ids,newcomm); new_index->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,*phase,dist,vF,vS,ids,comm.comm);
PROFILE_STOP("Identify blobs",1); PROFILE_STOP("Identify blobs",1);
} }
private: private:
@ -97,20 +85,19 @@ private:
const DoubleArray& dist; const DoubleArray& dist;
BlobIDstruct last_id, new_index, new_id; BlobIDstruct last_id, new_index, new_id;
BlobIDList new_list; BlobIDList new_list;
MPI_Comm newcomm; runAnalysis::commWrapper comm;
}; };
class BlobIdentificationWorkItem2: public ThreadPool::WorkItemRet<void> class BlobIdentificationWorkItem2: public ThreadPool::WorkItemRet<void>
{ {
public: public:
BlobIdentificationWorkItem2( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_, BlobIdentificationWorkItem2( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_,
std::shared_ptr<const DoubleArray> phase_, const DoubleArray& dist_, std::shared_ptr<const DoubleArray> phase_, const DoubleArray& dist_,
BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ ): BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ , runAnalysis::commWrapper&& comm_ ):
timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_), timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_),
phase(phase_), dist(dist_), last_id(last_id_), new_index(new_index_), new_id(new_id_), new_list(new_list_) phase(phase_), dist(dist_), last_id(last_id_), new_index(new_index_), new_id(new_id_), new_list(new_list_), comm(std::move(comm_))
{ {
MPI_Comm_dup(MPI_COMM_WORLD,&newcomm);
} }
~BlobIdentificationWorkItem2() { MPI_Comm_free(&newcomm); } ~BlobIdentificationWorkItem2() { }
virtual void run() { virtual void run() {
// Compute the global blob id and compare to the previous version // Compute the global blob id and compare to the previous version
PROFILE_START("Identify blobs maps",1); PROFILE_START("Identify blobs maps",1);
@ -121,7 +108,7 @@ public:
if ( last_id.get()!=NULL ) { if ( last_id.get()!=NULL ) {
// Compute the timestep-timestep map // Compute the timestep-timestep map
const IntArray& old_ids = last_id->second; const IntArray& old_ids = last_id->second;
ID_map_struct map = computeIDMap(Nx,Ny,Nz,old_ids,ids,newcomm); ID_map_struct map = computeIDMap(Nx,Ny,Nz,old_ids,ids,comm.comm);
// Renumber the current timestep's ids // Renumber the current timestep's ids
getNewIDs(map,max_id,*new_list); getNewIDs(map,max_id,*new_list);
renumberIDs(*new_list,new_id->second); renumberIDs(*new_list,new_id->second);
@ -143,7 +130,7 @@ private:
const DoubleArray& dist; const DoubleArray& dist;
BlobIDstruct last_id, new_index, new_id; BlobIDstruct last_id, new_index, new_id;
BlobIDList new_list; BlobIDList new_list;
MPI_Comm newcomm; runAnalysis::commWrapper comm;
}; };
@ -152,12 +139,11 @@ class WriteVisWorkItem: public ThreadPool::WorkItemRet<void>
{ {
public: public:
WriteVisWorkItem( int timestep_, std::vector<IO::MeshDataStruct>& visData_, WriteVisWorkItem( int timestep_, std::vector<IO::MeshDataStruct>& visData_,
TwoPhase& Avgerages_, fillHalo<double>& fillData_ ): TwoPhase& Avgerages_, fillHalo<double>& fillData_, runAnalysis::commWrapper&& comm_ ):
timestep(timestep_), visData(visData_), Averages(Avgerages_), fillData(fillData_) timestep(timestep_), visData(visData_), Averages(Avgerages_), fillData(fillData_), comm(std::move(comm_))
{ {
MPI_Comm_dup(MPI_COMM_WORLD,&newcomm);
} }
~WriteVisWorkItem() { MPI_Comm_free(&newcomm); } ~WriteVisWorkItem() { }
virtual void run() { virtual void run() {
PROFILE_START("Save Vis",1); PROFILE_START("Save Vis",1);
ASSERT(visData[0].vars[0]->name=="phase"); ASSERT(visData[0].vars[0]->name=="phase");
@ -172,7 +158,7 @@ public:
fillData.copy(Averages.Press,PressData); fillData.copy(Averages.Press,PressData);
fillData.copy(Averages.SDs,SignData); fillData.copy(Averages.SDs,SignData);
fillData.copy(Averages.Label_NWP,BlobData); fillData.copy(Averages.Label_NWP,BlobData);
IO::writeData( timestep, visData, newcomm ); IO::writeData( timestep, visData, comm.comm );
PROFILE_STOP("Save Vis",1); PROFILE_STOP("Save Vis",1);
}; };
private: private:
@ -181,7 +167,7 @@ private:
std::vector<IO::MeshDataStruct>& visData; std::vector<IO::MeshDataStruct>& visData;
TwoPhase& Averages; TwoPhase& Averages;
fillHalo<double>& fillData; fillHalo<double>& fillData;
MPI_Comm newcomm; runAnalysis::commWrapper comm;
}; };
@ -233,25 +219,177 @@ private:
}; };
// Function to start the analysis /******************************************************************
void run_analysis( int timestep, int restart_interval, int ANALYSIS_INTERVAL, int BLOBID_INTERVAL, * MPI comm wrapper for use with analysis *
const RankInfoStruct& rank_info, ScaLBL_Communicator &ScaLBL_Comm, TwoPhase& Averages, ******************************************************************/
BlobIDstruct& last_ids, BlobIDstruct& last_index, BlobIDList& last_id_map, runAnalysis::commWrapper::commWrapper( int tag_, MPI_Comm comm_, runAnalysis* analysis_ ):
int Np, int Nx, int Ny, int Nz, bool pBC, double beta, double err, tag(tag_),
const double *Phi, double *Pressure, double *Velocity, comm(comm_),
IntArray Map, double *fq, double *Den, analysis(analysis_)
const char *LocalRestartFile, std::vector<IO::MeshDataStruct>& visData, fillHalo<double>& fillData,
ThreadPool& tpool, AnalysisWaitIdStruct& wait )
{ {
int N = Nx*Ny*Nz; }
runAnalysis::commWrapper::commWrapper( commWrapper &&rhs ):
tag(rhs.tag),
comm(rhs.comm),
analysis(rhs.analysis)
{
rhs.tag = -1;
}
runAnalysis::commWrapper::~commWrapper()
{
if ( tag == -1 )
return;
MPI_Barrier( comm );
analysis->d_comm_used[tag] = false;
}
runAnalysis::commWrapper runAnalysis::getComm( )
{
// Get a tag from root
int tag = -1;
if ( d_rank == 0 ) {
for (int i=0; i<1024; i++) {
if ( !d_comm_used[i] ) {
tag = i;
break;
}
}
if ( tag == -1 )
ERROR("Unable to get comm");
}
MPI_Bcast( &tag, 1, MPI_INT, 0, d_comm );
d_comm_used[tag] = true;
if ( d_comms[tag] == MPI_COMM_NULL )
MPI_Comm_dup( MPI_COMM_WORLD, &d_comms[tag] );
return commWrapper(tag,d_comms[tag],this);
}
// Determin the analysis we want to perform
/******************************************************************
* Constructor/Destructors *
******************************************************************/
runAnalysis::runAnalysis( int N_threads, int restart_interval, int analysis_interval,
int blobid_interval, const RankInfoStruct& rank_info, const ScaLBL_Communicator &ScaLBL_Comm, const Domain& Dm,
int Np, int Nx, int Ny, int Nz, double Lx, double Ly, double Lz, bool pBC, double beta, double err,
IntArray Map, const std::string& LocalRestartFile ):
d_Np( Np ),
d_restart_interval( restart_interval ),
d_analysis_interval( analysis_interval ),
d_blobid_interval( blobid_interval ),
d_beta( beta ),
d_tpool( N_threads ),
d_ScaLBL_Comm( ScaLBL_Comm ),
d_rank_info( rank_info ),
d_Map( Map ),
d_fillData(Dm.Comm,Dm.rank_info,Nx-2,Ny-2,Nz-2,1,1,1,0,1),
d_restartFile( LocalRestartFile )
{
d_N[0] = Nx;
d_N[1] = Ny;
d_N[2] = Nz;
d_rank = MPI_WORLD_RANK();
writeIDMap(ID_map_struct(),0,id_map_filename);
// Create the MeshDataStruct
d_meshData.resize(1);
d_meshData[0].meshName = "domain";
d_meshData[0].mesh = std::make_shared<IO::DomainMesh>( Dm.rank_info,Nx-2,Ny-2,Nz-2,Lx,Ly,Lz );
auto PhaseVar = std::make_shared<IO::Variable>();
auto PressVar = std::make_shared<IO::Variable>();
auto SignDistVar = std::make_shared<IO::Variable>();
auto BlobIDVar = std::make_shared<IO::Variable>();
PhaseVar->name = "phase";
PhaseVar->type = IO::VariableType::VolumeVariable;
PhaseVar->dim = 1;
PhaseVar->data.resize(Nx-2,Ny-2,Nz-2);
d_meshData[0].vars.push_back(PhaseVar);
PressVar->name = "Pressure";
PressVar->type = IO::VariableType::VolumeVariable;
PressVar->dim = 1;
PressVar->data.resize(Nx-2,Ny-2,Nz-2);
d_meshData[0].vars.push_back(PressVar);
SignDistVar->name = "SignDist";
SignDistVar->type = IO::VariableType::VolumeVariable;
SignDistVar->dim = 1;
SignDistVar->data.resize(Nx-2,Ny-2,Nz-2);
d_meshData[0].vars.push_back(SignDistVar);
BlobIDVar->name = "BlobID";
BlobIDVar->type = IO::VariableType::VolumeVariable;
BlobIDVar->dim = 1;
BlobIDVar->data.resize(Nx-2,Ny-2,Nz-2);
d_meshData[0].vars.push_back(BlobIDVar);
// Initialize the comms
MPI_Comm_dup(MPI_COMM_WORLD,&d_comm);
for (int i=0; i<1024; i++) {
d_comms[i] = MPI_COMM_NULL;
d_comm_used[i] = false;
}
}
runAnalysis::~runAnalysis( )
{
// Finish processing analysis
finish();
// Clear internal data
MPI_Comm_free( &d_comm );
for (int i=0; i<1024; i++) {
if ( d_comms[i] != MPI_COMM_NULL )
MPI_Comm_free(&d_comms[i]);
}
}
void runAnalysis::finish( )
{
// Wait for the work items to finish
d_tpool.wait_pool_finished();
// Clear the wait ids
d_wait_blobID.reset();
d_wait_analysis.reset();
d_wait_vis.reset();
d_wait_restart.reset();
// Syncronize
MPI_Barrier( d_comm );
}
/******************************************************************
* Set the thread affinities *
******************************************************************/
void print( const std::vector<int>& ids )
{
if ( ids.empty() )
return;
printf("%i",ids[0]);
for (size_t i=1; i<ids.size(); i++)
printf(", %i",ids[i]);
printf("\n");
}
void runAnalysis::setAffinities( const std::string& method )
{
int N_procs = d_tpool.getNumberOfProcessors();
int proc = d_tpool.getCurrentProcessor();
auto affinity = d_tpool.getProcessAffinity();
// Print the current affinities
if ( d_rank == 0 ) {
printf("Affinities - rank 0:\n");
printf("Main: ");
print(d_tpool.getProcessAffinity());
for (int i=0; i<d_tpool.getNumThreads(); i++) {
printf("Thread %i: ",i+1);
print(d_tpool.getThreadAffinity(i));
}
}
}
/******************************************************************
* Check which analysis we want to perform *
******************************************************************/
AnalysisType runAnalysis::computeAnalysisType( int timestep )
{
AnalysisType type = AnalysisType::AnalyzeNone; AnalysisType type = AnalysisType::AnalyzeNone;
if ( timestep%ANALYSIS_INTERVAL + 8 == ANALYSIS_INTERVAL ) { if ( timestep%d_analysis_interval + 8 == d_analysis_interval ) {
// Copy the phase indicator field for the earlier timestep // Copy the phase indicator field for the earlier timestep
type |= AnalysisType::CopyPhaseIndicator; type |= AnalysisType::CopyPhaseIndicator;
} }
if ( timestep%BLOBID_INTERVAL == 0 ) { if ( timestep%d_blobid_interval == 0 ) {
// Identify blobs and update global ids in time // Identify blobs and update global ids in time
type |= AnalysisType::IdentifyBlobs; type |= AnalysisType::IdentifyBlobs;
} }
@ -263,31 +401,50 @@ void run_analysis( int timestep, int restart_interval, int ANALYSIS_INTERVAL, in
type |= AnalysisType::IdentifyBlobs; type |= AnalysisType::IdentifyBlobs;
} }
#endif */ #endif */
if ( timestep%ANALYSIS_INTERVAL + 4 == 0 ) { if ( timestep%d_analysis_interval + 4 == 0 ) {
// Copy the averages to the CPU (and identify blobs) // Copy the averages to the CPU (and identify blobs)
type |= AnalysisType::CopySimState; type |= AnalysisType::CopySimState;
type |= AnalysisType::IdentifyBlobs; type |= AnalysisType::IdentifyBlobs;
} }
if ( timestep%ANALYSIS_INTERVAL == 0 ) { if ( timestep%d_analysis_interval == 0 ) {
// Run the analysis // Run the analysis
type |= AnalysisType::ComputeAverages; type |= AnalysisType::ComputeAverages;
} }
if (timestep%restart_interval == 0) { if (timestep%d_restart_interval == 0) {
// Write the restart file // Write the restart file
type |= AnalysisType::CreateRestart; type |= AnalysisType::CreateRestart;
} }
if (timestep%restart_interval == 0) { if (timestep%d_restart_interval == 0) {
// Write the visualization data // Write the visualization data
type |= AnalysisType::WriteVis; type |= AnalysisType::WriteVis;
type |= AnalysisType::CopySimState; type |= AnalysisType::CopySimState;
type |= AnalysisType::IdentifyBlobs; type |= AnalysisType::IdentifyBlobs;
} }
return type;
// Return if we are not doing anything }
/******************************************************************
* Run the analysis *
******************************************************************/
void runAnalysis::run( int timestep, TwoPhase& Averages, const double *Phi,
double *Pressure, double *Velocity, double *fq, double *Den )
{
int N = d_N[0]*d_N[1]*d_N[2];
// Check which analysis steps we need to perform
auto type = computeAnalysisType( timestep );
if ( type == AnalysisType::AnalyzeNone ) if ( type == AnalysisType::AnalyzeNone )
return; return;
PROFILE_START("start_analysis"); // Check how may queued items we have
if ( d_tpool.N_queued() > 20 ) {
std::cerr << "Analysis queue is getting behind, waiting ...\n";
finish();
}
PROFILE_START("run");
// Copy the appropriate variables to the host (so we can spawn new threads) // Copy the appropriate variables to the host (so we can spawn new threads)
ScaLBL_DeviceBarrier(); ScaLBL_DeviceBarrier();
@ -298,44 +455,41 @@ void run_analysis( int timestep, int restart_interval, int ANALYSIS_INTERVAL, in
matches(type,AnalysisType::CopySimState) || matches(type,AnalysisType::CopySimState) ||
matches(type,AnalysisType::IdentifyBlobs) ) matches(type,AnalysisType::IdentifyBlobs) )
{ {
phase = std::shared_ptr<DoubleArray>(new DoubleArray(Nx,Ny,Nz)); phase = std::shared_ptr<DoubleArray>(new DoubleArray(d_N[0],d_N[1],d_N[2]));
ScaLBL_CopyToHost(phase->data(),Phi,N*sizeof(double)); ScaLBL_CopyToHost(phase->data(),Phi,N*sizeof(double));
} }
if ( matches(type,AnalysisType::CopyPhaseIndicator) ) { if ( matches(type,AnalysisType::CopyPhaseIndicator) ) {
memcpy(Averages.Phase_tplus.data(),phase->data(),N*sizeof(double)); memcpy(Averages.Phase_tplus.data(),phase->data(),N*sizeof(double));
//Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus); //Averages.ColorToSignedDistance(d_beta,Averages.Phase,Averages.Phase_tplus);
} }
if ( matches(type,AnalysisType::ComputeAverages) ) { if ( matches(type,AnalysisType::ComputeAverages) ) {
memcpy(Averages.Phase_tminus.data(),phase->data(),N*sizeof(double)); memcpy(Averages.Phase_tminus.data(),phase->data(),N*sizeof(double));
//Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tminus); //Averages.ColorToSignedDistance(d_beta,Averages.Phase,Averages.Phase_tminus);
} }
if ( matches(type,AnalysisType::CopySimState) ) { if ( matches(type,AnalysisType::CopySimState) ) {
// Copy the members of Averages to the cpu (phase was copied above) // Copy the members of Averages to the cpu (phase was copied above)
// Wait
PROFILE_START("Copy-Pressure",1); PROFILE_START("Copy-Pressure",1);
ScaLBL_D3Q19_Pressure(fq,Pressure,Np); ScaLBL_D3Q19_Pressure(fq,Pressure,d_Np);
ScaLBL_D3Q19_Momentum(fq,Velocity,Np); ScaLBL_D3Q19_Momentum(fq,Velocity,d_Np);
ScaLBL_DeviceBarrier(); ScaLBL_DeviceBarrier();
PROFILE_STOP("Copy-Pressure",1); PROFILE_STOP("Copy-Pressure",1);
PROFILE_START("Copy-Wait",1); PROFILE_START("Copy-Wait",1);
tpool.wait(wait.analysis);
tpool.wait(wait.vis); // Make sure we are done using analysis before modifying
PROFILE_STOP("Copy-Wait",1); PROFILE_STOP("Copy-Wait",1);
PROFILE_START("Copy-State",1); PROFILE_START("Copy-State",1);
memcpy(Averages.Phase.data(),phase->data(),N*sizeof(double)); memcpy(Averages.Phase.data(),phase->data(),N*sizeof(double));
ScaLBL_Comm.RegularLayout(Map,Pressure,Averages.Press); d_ScaLBL_Comm.RegularLayout(d_Map,Pressure,Averages.Press);
ScaLBL_Comm.RegularLayout(Map,&Velocity[0],Averages.Vel_x); d_ScaLBL_Comm.RegularLayout(d_Map,&Velocity[0],Averages.Vel_x);
ScaLBL_Comm.RegularLayout(Map,&Velocity[Np],Averages.Vel_y); d_ScaLBL_Comm.RegularLayout(d_Map,&Velocity[d_Np],Averages.Vel_y);
ScaLBL_Comm.RegularLayout(Map,&Velocity[2*Np],Averages.Vel_z); d_ScaLBL_Comm.RegularLayout(d_Map,&Velocity[2*d_Np],Averages.Vel_z);
PROFILE_STOP("Copy-State",1); PROFILE_STOP("Copy-State",1);
} }
std::shared_ptr<double> cDen, cfq; std::shared_ptr<double> cDen, cfq;
if ( matches(type,AnalysisType::CreateRestart) ) { if ( matches(type,AnalysisType::CreateRestart) ) {
// Copy restart data to the CPU // Copy restart data to the CPU
cDen = std::shared_ptr<double>(new double[2*Np],DeleteArray<double>); cDen = std::shared_ptr<double>(new double[2*d_Np],DeleteArray<double>);
cfq = std::shared_ptr<double>(new double[19*Np],DeleteArray<double>); cfq = std::shared_ptr<double>(new double[19*d_Np],DeleteArray<double>);
ScaLBL_CopyToHost(cfq.get(),fq,19*Np*sizeof(double)); ScaLBL_CopyToHost(cfq.get(),fq,19*d_Np*sizeof(double));
ScaLBL_CopyToHost(cDen.get(),Den,2*Np*sizeof(double)); ScaLBL_CopyToHost(cDen.get(),Den,2*d_Np*sizeof(double));
} }
PROFILE_STOP("Copy data to host",1); PROFILE_STOP("Copy data to host",1);
@ -344,30 +498,29 @@ void run_analysis( int timestep, int restart_interval, int ANALYSIS_INTERVAL, in
BlobIDstruct new_index(new std::pair<int,IntArray>(0,IntArray())); BlobIDstruct new_index(new std::pair<int,IntArray>(0,IntArray()));
BlobIDstruct new_ids(new std::pair<int,IntArray>(0,IntArray())); BlobIDstruct new_ids(new std::pair<int,IntArray>(0,IntArray()));
BlobIDList new_list(new std::vector<BlobIDType>()); BlobIDList new_list(new std::vector<BlobIDType>());
auto work1 = new BlobIdentificationWorkItem1(timestep,Nx,Ny,Nz,rank_info, auto work1 = new BlobIdentificationWorkItem1(timestep,d_N[0],d_N[1],d_N[2],d_rank_info,
phase,Averages.SDs,last_ids,new_index,new_ids,new_list); phase,Averages.SDs,d_last_ids,new_index,new_ids,new_list,getComm());
auto work2 = new BlobIdentificationWorkItem2(timestep,Nx,Ny,Nz,rank_info, auto work2 = new BlobIdentificationWorkItem2(timestep,d_N[0],d_N[1],d_N[2],d_rank_info,
phase,Averages.SDs,last_ids,new_index,new_ids,new_list); phase,Averages.SDs,d_last_ids,new_index,new_ids,new_list,getComm());
work1->add_dependency(wait.blobID); work1->add_dependency(d_wait_blobID);
work2->add_dependency(tpool.add_work(work1)); work2->add_dependency(d_tpool.add_work(work1));
wait.blobID = tpool.add_work(work2); d_wait_blobID = d_tpool.add_work(work2);
last_index = new_index; d_last_index = new_index;
last_ids = new_ids; d_last_ids = new_ids;
last_id_map = new_list; d_last_id_map = new_list;
} }
// Spawn threads to do the analysis work // Spawn threads to do the analysis work
if ( matches(type,AnalysisType::ComputeAverages) ) { if ( matches(type,AnalysisType::ComputeAverages) ) {
auto work = new AnalysisWorkItem(type,timestep,Averages,last_index,last_id_map,beta); auto work = new AnalysisWorkItem(type,timestep,Averages,d_last_index,d_last_id_map,d_beta);
work->add_dependency(wait.blobID); work->add_dependency(d_wait_blobID);
work->add_dependency(wait.analysis); work->add_dependency(d_wait_analysis);
work->add_dependency(wait.vis); // Make sure we are done using analysis before modifying work->add_dependency(d_wait_vis); // Make sure we are done using analysis before modifying
wait.analysis = tpool.add_work(work); d_wait_analysis = d_tpool.add_work(work);
} }
// Spawn a thread to write the restart file // Spawn a thread to write the restart file
if ( matches(type,AnalysisType::CreateRestart) ) { if ( matches(type,AnalysisType::CreateRestart) ) {
int rank = MPI_WORLD_RANK();
/* if (pBC) { /* if (pBC) {
err = fabs(sat_w - sat_w_previous); err = fabs(sat_w - sat_w_previous);
sat_w_previous = sat_w; sat_w_previous = sat_w;
@ -375,32 +528,28 @@ void run_analysis( int timestep, int restart_interval, int ANALYSIS_INTERVAL, in
printf("Timestep %i: change in saturation since last checkpoint is %f \n",timestep,err); printf("Timestep %i: change in saturation since last checkpoint is %f \n",timestep,err);
} }
} */ } */
// Wait for previous restart files to finish writing (not necessary, but helps to ensure memory usage is limited)
tpool.wait(wait.restart);
// Retain the timestep associated with the restart files // Retain the timestep associated with the restart files
if (rank==0) { if (d_rank==0) {
FILE *Rst = fopen("Restart.txt","w"); FILE *Rst = fopen("Restart.txt","w");
fprintf(Rst,"%i\n",timestep+4); fprintf(Rst,"%i\n",timestep+4);
fclose(Rst); fclose(Rst);
} }
// Write the restart file (using a seperate thread) // Write the restart file (using a seperate thread)
auto work = new WriteRestartWorkItem(LocalRestartFile,cDen,cfq,Np); auto work = new WriteRestartWorkItem(d_restartFile.c_str(),cDen,cfq,d_Np);
work->add_dependency(wait.restart); work->add_dependency(d_wait_restart);
wait.restart = tpool.add_work(work); d_wait_restart = d_tpool.add_work(work);
} }
// Save the results for visualization // Save the results for visualization
if ( matches(type,AnalysisType::CreateRestart) ) { if ( matches(type,AnalysisType::CreateRestart) ) {
// Wait for previous restart files to finish writing (not necessary, but helps to ensure memory usage is limited)
tpool.wait(wait.vis);
// Write the vis files // Write the vis files
auto work = new WriteVisWorkItem( timestep, visData, Averages, fillData ); auto work = new WriteVisWorkItem( timestep, d_meshData, Averages, d_fillData, getComm() );
work->add_dependency(wait.blobID); work->add_dependency(d_wait_blobID);
work->add_dependency(wait.analysis); work->add_dependency(d_wait_analysis);
work->add_dependency(wait.vis); work->add_dependency(d_wait_vis);
wait.vis = tpool.add_work(work); d_wait_vis = d_tpool.add_work(work);
} }
PROFILE_STOP("start_analysis"); PROFILE_STOP("run");
} }

109
analysis/runAnalysis.h Normal file
View File

@ -0,0 +1,109 @@
#ifndef RunAnalysis_H_INC
#define RunAnalysis_H_INC
#include "analysis/analysis.h"
#include "analysis/TwoPhase.h"
#include "common/Communication.h"
#include "common/ScaLBL.h"
#include "threadpool/thread_pool.h"
typedef std::shared_ptr<std::pair<int,IntArray>> BlobIDstruct;
typedef std::shared_ptr<std::vector<BlobIDType>> BlobIDList;
// Types of analysis
enum class AnalysisType : uint64_t { AnalyzeNone=0, IdentifyBlobs=0x01, CopyPhaseIndicator=0x02,
CopySimState=0x04, ComputeAverages=0x08, CreateRestart=0x10, WriteVis=0x20 };
//! Class to run the analysis in multiple threads
class runAnalysis
{
public:
//! Constructor
runAnalysis( int N_threads, int restart_interval, int analysis_interval, int blobid_interval,
const RankInfoStruct& rank_info, const ScaLBL_Communicator &ScaLBL_Comm, const Domain& dm,
int Np, int Nx, int Ny, int Nz, double Lx, double Ly, double Lz, bool pBC, double beta, double err,
IntArray Map, const std::string& LocalRestartFile );
//! Destructor
~runAnalysis();
//! Run the next analysis
void run( int timestep, TwoPhase& Averages, const double *Phi,
double *Pressure, double *Velocity, double *fq, double *Den );
//! Finish all active analysis
void finish();
/*!
* \brief Set the affinities
* \details This function will set the affinity of this thread and all analysis threads
* @param[in] method Method used to control the affinities:
* none - Don't do anything
*/
void setAffinities( const std::string& method = "none" );
private:
runAnalysis();
// Determine the analysis to perform
AnalysisType computeAnalysisType( int timestep );
public:
class commWrapper
{
public:
MPI_Comm comm;
int tag;
runAnalysis *analysis;
commWrapper( int tag, MPI_Comm comm, runAnalysis *analysis );
commWrapper( ) = delete;
commWrapper( const commWrapper &rhs ) = delete;
commWrapper& operator=( const commWrapper &rhs ) = delete;
commWrapper( commWrapper &&rhs );
~commWrapper();
};
// Get a comm (not thread safe)
commWrapper getComm( );
private:
int d_N[3];
int d_Np;
int d_rank;
int d_restart_interval, d_analysis_interval, d_blobid_interval;
double d_beta;
ThreadPool d_tpool;
ScaLBL_Communicator d_ScaLBL_Comm;
RankInfoStruct d_rank_info;
IntArray d_Map;
BlobIDstruct d_last_ids;
BlobIDstruct d_last_index;
BlobIDList d_last_id_map;
std::vector<IO::MeshDataStruct> d_meshData;
fillHalo<double> d_fillData;
std::string d_restartFile;
MPI_Comm d_comm;
MPI_Comm d_comms[1024];
volatile bool d_comm_used[1024];
// Ids of work items to use for dependencies
ThreadPool::thread_id_t d_wait_blobID;
ThreadPool::thread_id_t d_wait_analysis;
ThreadPool::thread_id_t d_wait_vis;
ThreadPool::thread_id_t d_wait_restart;
// Friends
friend commWrapper::~commWrapper();
};
#endif

4077
common/ScaLBL.cpp Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -8,10 +8,10 @@
#include "common/Communication.h" #include "common/Communication.h"
#include "analysis/TwoPhase.h" #include "analysis/TwoPhase.h"
#include "analysis/runAnalysis.h"
#include "common/MPI_Helpers.h" #include "common/MPI_Helpers.h"
#include "ProfilerApp.h" #include "ProfilerApp.h"
#include "threadpool/thread_pool.h" #include "threadpool/thread_pool.h"
#include "lbpm_color_simulator.h"
/* /*
* Simulator for two-phase flow in porous media * Simulator for two-phase flow in porous media
@ -20,6 +20,7 @@
using namespace std; using namespace std;
//************************************************************************* //*************************************************************************
// Implementation of Two-Phase Immiscible LBM // Implementation of Two-Phase Immiscible LBM
//************************************************************************* //*************************************************************************
@ -527,53 +528,12 @@ int main(int argc, char **argv)
int N_threads = 4; int N_threads = 4;
if ( provided_thread_support < MPI_THREAD_MULTIPLE ) if ( provided_thread_support < MPI_THREAD_MULTIPLE )
N_threads = 0; N_threads = 0;
/*if ( N_threads > 0 ) {
// Set the affinity
int N_procs = ThreadPool::getNumberOfProcessors();
std::vector<int> procs(N_procs);
for (int i=0; i<N_procs; i++)
procs[i] = i;
ThreadPool::setProcessAffinity(procs);
}
*/
ThreadPool tpool(N_threads);
// Create the MeshDataStruct
fillHalo<double> fillData(Dm.Comm,Dm.rank_info,Nx-2,Ny-2,Nz-2,1,1,1,0,1);
std::vector<IO::MeshDataStruct> meshData(1);
meshData[0].meshName = "domain";
meshData[0].mesh = std::shared_ptr<IO::DomainMesh>( new IO::DomainMesh(Dm.rank_info,Nx-2,Ny-2,Nz-2,Lx,Ly,Lz) );
std::shared_ptr<IO::Variable> PhaseVar( new IO::Variable() );
std::shared_ptr<IO::Variable> PressVar( new IO::Variable() );
std::shared_ptr<IO::Variable> SignDistVar( new IO::Variable() );
std::shared_ptr<IO::Variable> BlobIDVar( new IO::Variable() );
PhaseVar->name = "phase";
PhaseVar->type = IO::VariableType::VolumeVariable;
PhaseVar->dim = 1;
PhaseVar->data.resize(Nx-2,Ny-2,Nz-2);
meshData[0].vars.push_back(PhaseVar);
PressVar->name = "Pressure";
PressVar->type = IO::VariableType::VolumeVariable;
PressVar->dim = 1;
PressVar->data.resize(Nx-2,Ny-2,Nz-2);
meshData[0].vars.push_back(PressVar);
SignDistVar->name = "SignDist";
SignDistVar->type = IO::VariableType::VolumeVariable;
SignDistVar->dim = 1;
SignDistVar->data.resize(Nx-2,Ny-2,Nz-2);
meshData[0].vars.push_back(SignDistVar);
BlobIDVar->name = "BlobID";
BlobIDVar->type = IO::VariableType::VolumeVariable;
BlobIDVar->dim = 1;
BlobIDVar->data.resize(Nx-2,Ny-2,Nz-2);
meshData[0].vars.push_back(BlobIDVar);
//************ MAIN ITERATION LOOP ***************************************/ //************ MAIN ITERATION LOOP ***************************************/
PROFILE_START("Loop"); PROFILE_START("Loop");
BlobIDstruct last_ids, last_index; runAnalysis analysis( N_threads, RESTART_INTERVAL,ANALYSIS_INTERVAL,BLOBID_INTERVAL,
BlobIDList last_id_map; rank_info, ScaLBL_Comm, Dm, Np, Nx, Ny, Nz, Lx, Ly, Lz, pBC, beta, err, Map, LocalRestartFile );
writeIDMap(ID_map_struct(),0,id_map_filename); analysis.setAffinities( "none" );
AnalysisWaitIdStruct work_ids;
while (timestep < timestepMax && err > tol ) { while (timestep < timestepMax && err > tol ) {
//if ( rank==0 ) { printf("Running timestep %i (%i MB)\n",timestep+1,(int)(Utilities::getMemoryUsage()/1048576)); } //if ( rank==0 ) { printf("Running timestep %i (%i MB)\n",timestep+1,(int)(Utilities::getMemoryUsage()/1048576)); }
PROFILE_START("Update"); PROFILE_START("Update");
@ -651,15 +611,13 @@ int main(int argc, char **argv)
PROFILE_STOP("Update"); PROFILE_STOP("Update");
// Run the analysis // Run the analysis
run_analysis(timestep,RESTART_INTERVAL,ANALYSIS_INTERVAL,BLOBID_INTERVAL,rank_info,ScaLBL_Comm,*Averages,last_ids,last_index,last_id_map, analysis.run( timestep, *Averages, Phi, Pressure, Velocity, fq, Den );
Np,Nx,Ny,Nz,pBC,beta,err,Phi,Pressure,Velocity,Map,fq,Den,
LocalRestartFile,meshData,fillData,tpool,work_ids);
// Save the timers // Save the timers
if ( timestep%50==0 ) if ( timestep%50==0 )
PROFILE_SAVE("lbpm_color_simulator",1); PROFILE_SAVE("lbpm_color_simulator",1);
} }
tpool.wait_pool_finished(); analysis.finish();
PROFILE_STOP("Loop"); PROFILE_STOP("Loop");
//************************************************************************ //************************************************************************
ScaLBL_DeviceBarrier(); ScaLBL_DeviceBarrier();