Working on more frequent blob id updates (for GPUs)
This commit is contained in:
@@ -101,10 +101,10 @@ inline void ZeroHalo(double *Data, int Nx, int Ny, int Nz)
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
// Initialize MPI
|
||||
int provided_thread_support=-1;
|
||||
//MPI_Init_thread(&argc,&argv,MPI_THREAD_MULTIPLE,&provided_thread_support);
|
||||
MPI_Init_thread(&argc,&argv,MPI_THREAD_SINGLE,&provided_thread_support);
|
||||
if ( provided_thread_support<MPI_THREAD_MULTIPLE )
|
||||
int provided_thread_support = -1;
|
||||
int required_thread_support = MPI_THREAD_MULTIPLE; // MPI_THREAD_SINGLE, MPI_THREAD_MULTIPLE
|
||||
MPI_Init_thread(&argc,&argv,MPI_THREAD_MULTIPLE,&required_thread_support);
|
||||
if ( provided_thread_support < required_thread_support )
|
||||
std::cerr << "Warning: Failed to start MPI with necessary thread support, thread support will be disabled" << std::endl;
|
||||
MPI_Comm comm;
|
||||
MPI_Comm_dup(MPI_COMM_WORLD,&comm);
|
||||
@@ -674,7 +674,7 @@ int main(int argc, char **argv)
|
||||
if (rank==0) printf("Begin timesteps: error tolerance is %f \n", tol);
|
||||
// Create the thread pool
|
||||
int N_threads = 0;
|
||||
if ( provided_thread_support<MPI_THREAD_MULTIPLE )
|
||||
if ( provided_thread_support < required_thread_support )
|
||||
N_threads = 0;
|
||||
if ( N_threads > 0 ) {
|
||||
// Set the affinity
|
||||
|
||||
@@ -41,10 +41,10 @@ private:
|
||||
static const std::string id_map_filename = "lbpm_id_map.txt";
|
||||
typedef std::shared_ptr<std::pair<int,IntArray> > BlobIDstruct;
|
||||
typedef std::shared_ptr<std::vector<BlobIDType> > BlobIDList;
|
||||
class BlobIdentificationWorkItem: public ThreadPool::WorkItem
|
||||
class BlobIdentificationWorkItem1: public ThreadPool::WorkItem
|
||||
{
|
||||
public:
|
||||
BlobIdentificationWorkItem( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_,
|
||||
BlobIdentificationWorkItem1( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_,
|
||||
std::shared_ptr<const DoubleArray> phase_, const DoubleArray& dist_,
|
||||
BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ ):
|
||||
timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_),
|
||||
@@ -52,13 +52,41 @@ public:
|
||||
virtual void run() {
|
||||
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
|
||||
// Compute the global blob id and compare to the previous version
|
||||
PROFILE_START("Identify blobs and maps",1);
|
||||
PROFILE_START("Identify blobs",1);
|
||||
MPI_Comm newcomm;
|
||||
MPI_Comm_dup(MPI_COMM_WORLD,&newcomm);
|
||||
double vF = 0.0;
|
||||
double vS = 0.0;
|
||||
IntArray& ids = new_index->second;
|
||||
new_index->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,*phase,dist,vF,vS,ids,newcomm);
|
||||
PROFILE_STOP("Identify blobs",1);
|
||||
ThreadPool::WorkItem::d_state = 2; // Change state to finished
|
||||
}
|
||||
private:
|
||||
BlobIdentificationWorkItem1();
|
||||
int timestep;
|
||||
int Nx, Ny, Nz;
|
||||
const RankInfoStruct& rank_info;
|
||||
std::shared_ptr<const DoubleArray> phase;
|
||||
const DoubleArray& dist;
|
||||
BlobIDstruct last_id, new_index, new_id;
|
||||
BlobIDList new_list;
|
||||
};
|
||||
class BlobIdentificationWorkItem2: public ThreadPool::WorkItem
|
||||
{
|
||||
public:
|
||||
BlobIdentificationWorkItem2( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_,
|
||||
std::shared_ptr<const DoubleArray> phase_, const DoubleArray& dist_,
|
||||
BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ ):
|
||||
timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_),
|
||||
phase(phase_), dist(dist_), last_id(last_id_), new_index(new_index_), new_id(new_id_), new_list(new_list_) { }
|
||||
virtual void run() {
|
||||
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
|
||||
// Compute the global blob id and compare to the previous version
|
||||
PROFILE_START("Identify blobs maps",1);
|
||||
MPI_Comm newcomm;
|
||||
MPI_Comm_dup(MPI_COMM_WORLD,&newcomm);
|
||||
const IntArray& ids = new_index->second;
|
||||
static int max_id = -1;
|
||||
new_id->first = new_index->first;
|
||||
new_id->second = new_index->second;
|
||||
@@ -77,11 +105,11 @@ public:
|
||||
writeIDMap(map,timestep,id_map_filename);
|
||||
}
|
||||
MPI_Comm_free(&newcomm);
|
||||
PROFILE_STOP("Identify blobs and maps",1);
|
||||
PROFILE_STOP("Identify blobs maps",1);
|
||||
ThreadPool::WorkItem::d_state = 2; // Change state to finished
|
||||
}
|
||||
private:
|
||||
BlobIdentificationWorkItem();
|
||||
BlobIdentificationWorkItem2();
|
||||
int timestep;
|
||||
int Nx, Ny, Nz;
|
||||
const RankInfoStruct& rank_info;
|
||||
@@ -157,10 +185,18 @@ void run_analysis( int timestep, int restart_interval,
|
||||
// Copy the phase indicator field for the earlier timestep
|
||||
type = static_cast<AnalysisType>( type | CopyPhaseIndicator );
|
||||
}
|
||||
if ( timestep%200 == 0 ) {
|
||||
if ( timestep%250 == 0 ) {
|
||||
// Identify blobs and update global ids in time
|
||||
type = static_cast<AnalysisType>( type | IdentifyBlobs );
|
||||
}
|
||||
#ifdef USE_CUDA
|
||||
if ( tpool.getQueueSize()<=3 && tpool.getNumThreads()>0 && timestep%20==0 ) {
|
||||
// Keep a few blob identifications queued up to keep the processors busy,
|
||||
// allowing us to track the blobs as fast as possible
|
||||
// Add more detailed estimates of the update frequency required to track blobs
|
||||
type = static_cast<AnalysisType>( type | IdentifyBlobs );
|
||||
}
|
||||
#endif
|
||||
if ( timestep%1000 == 0 ) {
|
||||
// Copy the averages to the CPU (and identify blobs)
|
||||
type = static_cast<AnalysisType>( type | CopyAverages );
|
||||
@@ -223,13 +259,16 @@ void run_analysis( int timestep, int restart_interval,
|
||||
BlobIDstruct new_index(new std::pair<int,IntArray>(0,IntArray()));
|
||||
BlobIDstruct new_ids(new std::pair<int,IntArray>(0,IntArray()));
|
||||
BlobIDList new_list(new std::vector<BlobIDType>());
|
||||
ThreadPool::WorkItem *work = new BlobIdentificationWorkItem(timestep,
|
||||
ThreadPool::WorkItem *work1 = new BlobIdentificationWorkItem1(timestep,
|
||||
Nx,Ny,Nz,rank_info,phase,Averages.SDs,last_ids,new_index,new_ids,new_list);
|
||||
work->add_dependency(wait.blobID);
|
||||
ThreadPool::WorkItem *work2 = new BlobIdentificationWorkItem2(timestep,
|
||||
Nx,Ny,Nz,rank_info,phase,Averages.SDs,last_ids,new_index,new_ids,new_list);
|
||||
work1->add_dependency(wait.blobID);
|
||||
work2->add_dependency(tpool.add_work(work1));
|
||||
wait.blobID = tpool.add_work(work2);
|
||||
last_index = new_index;
|
||||
last_ids = new_ids;
|
||||
last_id_map = new_list;
|
||||
wait.blobID = tpool.add_work(work);
|
||||
}
|
||||
|
||||
// Spawn threads to do the analysis work
|
||||
|
||||
@@ -363,6 +363,10 @@ public:
|
||||
int getNumThreads() const { return d_N_threads; }
|
||||
|
||||
|
||||
//! Function to return the number of items in the queue (including processing items)
|
||||
int getQueueSize() const { return d_queue_size+d_num_active; }
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Function to set the number of threads in the thread pool
|
||||
* \details This function will change the number of worker threads in the ThreadPool
|
||||
|
||||
Reference in New Issue
Block a user