diff --git a/tests/lbpm_color_simulator.cpp b/tests/lbpm_color_simulator.cpp index dff124d2..e54ac0bc 100644 --- a/tests/lbpm_color_simulator.cpp +++ b/tests/lbpm_color_simulator.cpp @@ -101,10 +101,10 @@ inline void ZeroHalo(double *Data, int Nx, int Ny, int Nz) int main(int argc, char **argv) { // Initialize MPI - int provided_thread_support=-1; - //MPI_Init_thread(&argc,&argv,MPI_THREAD_MULTIPLE,&provided_thread_support); - MPI_Init_thread(&argc,&argv,MPI_THREAD_SINGLE,&provided_thread_support); - if ( provided_thread_support 0 ) { // Set the affinity diff --git a/tests/lbpm_color_simulator.h b/tests/lbpm_color_simulator.h index b748511b..3e8fcedb 100644 --- a/tests/lbpm_color_simulator.h +++ b/tests/lbpm_color_simulator.h @@ -41,10 +41,10 @@ private: static const std::string id_map_filename = "lbpm_id_map.txt"; typedef std::shared_ptr > BlobIDstruct; typedef std::shared_ptr > BlobIDList; -class BlobIdentificationWorkItem: public ThreadPool::WorkItem +class BlobIdentificationWorkItem1: public ThreadPool::WorkItem { public: - BlobIdentificationWorkItem( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_, + BlobIdentificationWorkItem1( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_, std::shared_ptr phase_, const DoubleArray& dist_, BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ ): timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_), @@ -52,13 +52,41 @@ public: virtual void run() { ThreadPool::WorkItem::d_state = 1; // Change state to in progress // Compute the global blob id and compare to the previous version - PROFILE_START("Identify blobs and maps",1); + PROFILE_START("Identify blobs",1); MPI_Comm newcomm; MPI_Comm_dup(MPI_COMM_WORLD,&newcomm); double vF = 0.0; double vS = 0.0; IntArray& ids = new_index->second; new_index->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,*phase,dist,vF,vS,ids,newcomm); + PROFILE_STOP("Identify blobs",1); + ThreadPool::WorkItem::d_state = 2; // Change state to finished + } +private: + BlobIdentificationWorkItem1(); + int timestep; + int Nx, Ny, Nz; + const RankInfoStruct& rank_info; + std::shared_ptr phase; + const DoubleArray& dist; + BlobIDstruct last_id, new_index, new_id; + BlobIDList new_list; +}; +class BlobIdentificationWorkItem2: public ThreadPool::WorkItem +{ +public: + BlobIdentificationWorkItem2( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_, + std::shared_ptr phase_, const DoubleArray& dist_, + BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_ ): + timestep(timestep_), Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_), + phase(phase_), dist(dist_), last_id(last_id_), new_index(new_index_), new_id(new_id_), new_list(new_list_) { } + virtual void run() { + ThreadPool::WorkItem::d_state = 1; // Change state to in progress + // Compute the global blob id and compare to the previous version + PROFILE_START("Identify blobs maps",1); + MPI_Comm newcomm; + MPI_Comm_dup(MPI_COMM_WORLD,&newcomm); + const IntArray& ids = new_index->second; static int max_id = -1; new_id->first = new_index->first; new_id->second = new_index->second; @@ -77,11 +105,11 @@ public: writeIDMap(map,timestep,id_map_filename); } MPI_Comm_free(&newcomm); - PROFILE_STOP("Identify blobs and maps",1); + PROFILE_STOP("Identify blobs maps",1); ThreadPool::WorkItem::d_state = 2; // Change state to finished } private: - BlobIdentificationWorkItem(); + BlobIdentificationWorkItem2(); int timestep; int Nx, Ny, Nz; const RankInfoStruct& rank_info; @@ -157,10 +185,18 @@ void run_analysis( int timestep, int restart_interval, // Copy the phase indicator field for the earlier timestep type = static_cast( type | CopyPhaseIndicator ); } - if ( timestep%200 == 0 ) { + if ( timestep%250 == 0 ) { // Identify blobs and update global ids in time type = static_cast( type | IdentifyBlobs ); } + #ifdef USE_CUDA + if ( tpool.getQueueSize()<=3 && tpool.getNumThreads()>0 && timestep%20==0 ) { + // Keep a few blob identifications queued up to keep the processors busy, + // allowing us to track the blobs as fast as possible + // Add more detailed estimates of the update frequency required to track blobs + type = static_cast( type | IdentifyBlobs ); + } + #endif if ( timestep%1000 == 0 ) { // Copy the averages to the CPU (and identify blobs) type = static_cast( type | CopyAverages ); @@ -223,13 +259,16 @@ void run_analysis( int timestep, int restart_interval, BlobIDstruct new_index(new std::pair(0,IntArray())); BlobIDstruct new_ids(new std::pair(0,IntArray())); BlobIDList new_list(new std::vector()); - ThreadPool::WorkItem *work = new BlobIdentificationWorkItem(timestep, + ThreadPool::WorkItem *work1 = new BlobIdentificationWorkItem1(timestep, Nx,Ny,Nz,rank_info,phase,Averages.SDs,last_ids,new_index,new_ids,new_list); - work->add_dependency(wait.blobID); + ThreadPool::WorkItem *work2 = new BlobIdentificationWorkItem2(timestep, + Nx,Ny,Nz,rank_info,phase,Averages.SDs,last_ids,new_index,new_ids,new_list); + work1->add_dependency(wait.blobID); + work2->add_dependency(tpool.add_work(work1)); + wait.blobID = tpool.add_work(work2); last_index = new_index; last_ids = new_ids; last_id_map = new_list; - wait.blobID = tpool.add_work(work); } // Spawn threads to do the analysis work diff --git a/threadpool/thread_pool.h b/threadpool/thread_pool.h index e8ea30f1..f8cf46c4 100755 --- a/threadpool/thread_pool.h +++ b/threadpool/thread_pool.h @@ -363,6 +363,10 @@ public: int getNumThreads() const { return d_N_threads; } + //! Function to return the number of items in the queue (including processing items) + int getQueueSize() const { return d_queue_size+d_num_active; } + + /*! * \brief Function to set the number of threads in the thread pool * \details This function will change the number of worker threads in the ThreadPool