// Run the analysis, blob identification, and write restart files #include "analysis/runAnalysis.h" #include "analysis/analysis.h" #include "common/Array.h" #include "common/Communication.h" #include "common/MPI.h" #include "common/ScaLBL.h" #include "models/ColorModel.h" #include "IO/MeshDatabase.h" #include "threadpool/thread_pool.h" #include "ProfilerApp.h" AnalysisType &operator|=( AnalysisType &lhs, AnalysisType rhs ) { lhs = static_cast( static_cast::type>( lhs ) | static_cast::type>( rhs ) ); return lhs; } bool matches( AnalysisType x, AnalysisType y ) { return ( static_cast::type>( x ) & static_cast::type>( y ) ) != 0; } // Create a shared_ptr to an array of values template static inline std::shared_ptr make_shared_array( size_t N ) { return std::shared_ptr( new TYPE[N], []( const TYPE *p ) { delete[] p; } ); } // Helper class to write the restart file from a seperate thread class WriteRestartWorkItem : public ThreadPool::WorkItemRet { public: WriteRestartWorkItem( const std::string &filename_, std::shared_ptr cDen_, std::shared_ptr cfq_, int N_ ) : filename( filename_ ), cfq( cfq_ ), cDen( cDen_ ), N( N_ ) { } virtual void run() { PROFILE_START( "Save Checkpoint", 1 ); double value; ofstream File( filename, ios::binary ); for ( int n = 0; n < N; n++ ) { // Write the two density values value = cDen.get()[n]; File.write( (char *) &value, sizeof( value ) ); value = cDen.get()[N + n]; File.write( (char *) &value, sizeof( value ) ); } for ( int n = 0; n < N; n++ ) { // Write the distributions for ( int q = 0; q < 19; q++ ) { value = cfq.get()[q * N + n]; File.write( (char *) &value, sizeof( value ) ); } } File.close(); PROFILE_STOP( "Save Checkpoint", 1 ); }; private: WriteRestartWorkItem(); const std::string filename; std::shared_ptr cfq, cDen; const int N; }; // Helper class to compute the blob ids typedef std::shared_ptr> BlobIDstruct; typedef std::shared_ptr> BlobIDList; static const std::string id_map_filename = "lbpm_id_map.txt"; class BlobIdentificationWorkItem1 : public ThreadPool::WorkItemRet { public: BlobIdentificationWorkItem1( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct &rank_info_, std::shared_ptr phase_, const DoubleArray &dist_, BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_, runAnalysis::commWrapper &&comm_ ) : timestep( timestep_ ), Nx( Nx_ ), Ny( Ny_ ), Nz( Nz_ ), rank_info( rank_info_ ), phase( phase_ ), dist( dist_ ), last_id( last_id_ ), new_index( new_index_ ), new_id( new_id_ ), new_list( new_list_ ), comm( std::move( comm_ ) ) { } ~BlobIdentificationWorkItem1() {} virtual void run() { // Compute the global blob id and compare to the previous version PROFILE_START( "Identify blobs", 1 ); double vF = 0.0; double vS = -1.0; // one voxel buffer region around solid IntArray &ids = new_index->second; new_index->first = ComputeGlobalBlobIDs( Nx - 2, Ny - 2, Nz - 2, rank_info, *phase, dist, vF, vS, ids, comm.comm ); PROFILE_STOP( "Identify blobs", 1 ); } private: BlobIdentificationWorkItem1(); int timestep; int Nx, Ny, Nz; const RankInfoStruct rank_info; std::shared_ptr phase; const DoubleArray &dist; BlobIDstruct last_id, new_index, new_id; BlobIDList new_list; runAnalysis::commWrapper comm; }; class BlobIdentificationWorkItem2 : public ThreadPool::WorkItemRet { public: BlobIdentificationWorkItem2( int timestep_, int Nx_, int Ny_, int Nz_, const RankInfoStruct &rank_info_, std::shared_ptr phase_, const DoubleArray &dist_, BlobIDstruct last_id_, BlobIDstruct new_index_, BlobIDstruct new_id_, BlobIDList new_list_, runAnalysis::commWrapper &&comm_ ) : timestep( timestep_ ), Nx( Nx_ ), Ny( Ny_ ), Nz( Nz_ ), rank_info( rank_info_ ), phase( phase_ ), dist( dist_ ), last_id( last_id_ ), new_index( new_index_ ), new_id( new_id_ ), new_list( new_list_ ), comm( std::move( comm_ ) ) { } ~BlobIdentificationWorkItem2() {} virtual void run() { // Compute the global blob id and compare to the previous version PROFILE_START( "Identify blobs maps", 1 ); const IntArray &ids = new_index->second; static int max_id = -1; new_id->first = new_index->first; new_id->second = new_index->second; if ( last_id.get() != NULL ) { // Compute the timestep-timestep map const IntArray &old_ids = last_id->second; ID_map_struct map = computeIDMap( Nx, Ny, Nz, old_ids, ids, comm.comm ); // Renumber the current timestep's ids getNewIDs( map, max_id, *new_list ); renumberIDs( *new_list, new_id->second ); writeIDMap( map, timestep, id_map_filename ); } else { max_id = -1; ID_map_struct map( new_id->first ); getNewIDs( map, max_id, *new_list ); writeIDMap( map, timestep, id_map_filename ); } PROFILE_STOP( "Identify blobs maps", 1 ); } private: BlobIdentificationWorkItem2(); int timestep; int Nx, Ny, Nz; const RankInfoStruct rank_info; std::shared_ptr phase; const DoubleArray &dist; BlobIDstruct last_id, new_index, new_id; BlobIDList new_list; runAnalysis::commWrapper comm; }; // Helper class to write the vis file from a thread class WriteVisWorkItem : public ThreadPool::WorkItemRet { public: WriteVisWorkItem( int timestep_, std::vector &visData_, TwoPhase &Avgerages_, std::array n_, RankInfoStruct rank_info_, runAnalysis::commWrapper &&comm_ ) : timestep( timestep_ ), visData( visData_ ), Averages( Avgerages_ ), n( std::move( n_ ) ), rank_info( std::move( rank_info_ ) ), comm( std::move( comm_ ) ) { } ~WriteVisWorkItem() {} virtual void run() { PROFILE_START( "Save Vis", 1 ); fillHalo fillData( comm.comm, rank_info, n, { 1, 1, 1 }, 0, 1 ); ASSERT( visData[0].vars[0]->name == "phase" ); Array &PhaseData = visData[0].vars[0]->data; fillData.copy( Averages.SDn, PhaseData ); ASSERT( visData[0].vars[5]->name == "SignDist" ); Array &SignData = visData[0].vars[5]->data; fillData.copy( Averages.SDs, SignData ); ASSERT( visData[0].vars[1]->name == "Pressure" ); Array &PressData = visData[0].vars[1]->data; fillData.copy( Averages.Press, PressData ); ASSERT( visData[0].vars[2]->name == "Velocity_x" ); ASSERT( visData[0].vars[3]->name == "Velocity_y" ); ASSERT( visData[0].vars[4]->name == "Velocity_z" ); Array &VelxData = visData[0].vars[2]->data; Array &VelyData = visData[0].vars[3]->data; Array &VelzData = visData[0].vars[4]->data; fillData.copy( Averages.Vel_x, VelxData ); fillData.copy( Averages.Vel_y, VelyData ); fillData.copy( Averages.Vel_z, VelzData ); ASSERT( visData[0].vars[6]->name == "BlobID" ); Array &BlobData = visData[0].vars[6]->data; fillData.copy( Averages.Label_NWP, BlobData ); IO::writeData( timestep, visData, comm.comm ); PROFILE_STOP( "Save Vis", 1 ); }; private: WriteVisWorkItem(); int timestep; std::array n; RankInfoStruct rank_info; std::vector &visData; TwoPhase &Averages; runAnalysis::commWrapper comm; }; // Helper class to write the vis file from a thread class IOWorkItem : public ThreadPool::WorkItemRet { public: IOWorkItem( int timestep_, std::shared_ptr input_db_, std::vector &visData_, SubPhase &Averages_, std::array n_, RankInfoStruct rank_info_, runAnalysis::commWrapper &&comm_ ) : timestep( timestep_ ), input_db( input_db_ ), visData( visData_ ), Averages( Averages_ ), n( std::move( n_ ) ), rank_info( std::move( rank_info_ ) ), comm( std::move( comm_ ) ) { } ~IOWorkItem() {} virtual void run() { PROFILE_START( "Save Vis", 1 ); auto color_db = input_db->getDatabase( "Color" ); auto vis_db = input_db->getDatabase( "Visualization" ); // int timestep = color_db->getWithDefault( "timestep", 0 ); fillHalo fillData( comm.comm, rank_info, n, { 1, 1, 1 }, 0, 1 ); if ( vis_db->getWithDefault( "save_phase_field", true ) ) { ASSERT( visData[0].vars[0]->name == "phase" ); Array &PhaseData = visData[0].vars[0]->data; fillData.copy( Averages.Phi, PhaseData ); } if ( vis_db->getWithDefault( "save_pressure", false ) ) { ASSERT( visData[0].vars[1]->name == "Pressure" ); Array &PressData = visData[0].vars[1]->data; fillData.copy( Averages.Pressure, PressData ); } if ( vis_db->getWithDefault( "save_velocity", false ) ) { ASSERT( visData[0].vars[2]->name == "Velocity_x" ); ASSERT( visData[0].vars[3]->name == "Velocity_y" ); ASSERT( visData[0].vars[4]->name == "Velocity_z" ); Array &VelxData = visData[0].vars[2]->data; Array &VelyData = visData[0].vars[3]->data; Array &VelzData = visData[0].vars[4]->data; fillData.copy( Averages.Vel_x, VelxData ); fillData.copy( Averages.Vel_y, VelyData ); fillData.copy( Averages.Vel_z, VelzData ); } if ( vis_db->getWithDefault( "save_distance", false ) ) { ASSERT( visData[0].vars[5]->name == "SignDist" ); Array &SignData = visData[0].vars[5]->data; fillData.copy( Averages.SDs, SignData ); } if ( vis_db->getWithDefault( "save_connected_components", false ) ) { ASSERT( visData[0].vars[6]->name == "BlobID" ); Array &BlobData = visData[0].vars[6]->data; fillData.copy( Averages.morph_n->label, BlobData ); } if ( vis_db->getWithDefault( "write_silo", true ) ) IO::writeData( timestep, visData, comm.comm ); if ( vis_db->getWithDefault( "save_8bit_raw", true ) ) { char CurrentIDFilename[40]; sprintf( CurrentIDFilename, "id_t%d.raw", timestep ); Averages.AggregateLabels( CurrentIDFilename ); } PROFILE_STOP( "Save Vis", 1 ); }; private: IOWorkItem(); int timestep; std::array n; RankInfoStruct rank_info; std::shared_ptr input_db; std::vector &visData; SubPhase &Averages; runAnalysis::commWrapper comm; }; // Helper class to run the analysis from within a thread // Note: Averages will be modified after the constructor is called class AnalysisWorkItem : public ThreadPool::WorkItemRet { public: AnalysisWorkItem( AnalysisType type_, int timestep_, TwoPhase &Averages_, BlobIDstruct ids, BlobIDList id_list_, double beta_ ) : type( type_ ), timestep( timestep_ ), Averages( Averages_ ), blob_ids( ids ), id_list( id_list_ ), beta( beta_ ) { } ~AnalysisWorkItem() {} virtual void run() { Averages.NumberComponents_NWP = blob_ids->first; Averages.Label_NWP = blob_ids->second; Averages.Label_NWP_map = *id_list; Averages.NumberComponents_WP = 1; Averages.Label_WP.fill( 0.0 ); if ( matches( type, AnalysisType::CopyPhaseIndicator ) ) { // Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus); } if ( matches( type, AnalysisType::ComputeAverages ) ) { PROFILE_START( "Compute dist", 1 ); Averages.Initialize(); Averages.ComputeDelPhi(); Averages.ColorToSignedDistance( beta, Averages.Phase, Averages.SDn ); Averages.ColorToSignedDistance( beta, Averages.Phase_tminus, Averages.Phase_tminus ); Averages.ColorToSignedDistance( beta, Averages.Phase_tplus, Averages.Phase_tplus ); Averages.UpdateMeshValues(); Averages.ComputeLocal(); Averages.Reduce(); Averages.PrintAll( timestep ); Averages.Initialize(); Averages.ComponentAverages(); Averages.SortBlobs(); Averages.PrintComponents( timestep ); PROFILE_STOP( "Compute dist", 1 ); } } private: AnalysisWorkItem(); AnalysisType type; int timestep; TwoPhase &Averages; BlobIDstruct blob_ids; BlobIDList id_list; double beta; }; class TCATWorkItem : public ThreadPool::WorkItemRet { public: TCATWorkItem( AnalysisType type_, int timestep_, TwoPhase &Averages_, BlobIDstruct ids, BlobIDList id_list_, double beta_ ) : type( type_ ), timestep( timestep_ ), Averages( Averages_ ), blob_ids( ids ), id_list( id_list_ ), beta( beta_ ) { } ~TCATWorkItem() {} virtual void run() { Averages.NumberComponents_NWP = blob_ids->first; Averages.Label_NWP = blob_ids->second; Averages.Label_NWP_map = *id_list; Averages.NumberComponents_WP = 1; Averages.Label_WP.fill( 0.0 ); if ( matches( type, AnalysisType::CopyPhaseIndicator ) ) { // Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus); } if ( matches( type, AnalysisType::ComputeAverages ) ) { PROFILE_START( "Compute TCAT", 1 ); Averages.Initialize(); Averages.ComputeDelPhi(); Averages.ColorToSignedDistance( beta, Averages.Phase, Averages.SDn ); Averages.ColorToSignedDistance( beta, Averages.Phase_tminus, Averages.Phase_tminus ); Averages.ColorToSignedDistance( beta, Averages.Phase_tplus, Averages.Phase_tplus ); Averages.UpdateMeshValues(); Averages.ComputeLocal(); Averages.Reduce(); Averages.PrintAll( timestep ); PROFILE_STOP( "Compute TCAT", 1 ); } } private: TCATWorkItem(); AnalysisType type; int timestep; TwoPhase &Averages; BlobIDstruct blob_ids; BlobIDList id_list; double beta; }; class GanglionTrackingWorkItem : public ThreadPool::WorkItemRet { public: GanglionTrackingWorkItem( AnalysisType type_, int timestep_, TwoPhase &Averages_, BlobIDstruct ids, BlobIDList id_list_, double beta_ ) : type( type_ ), timestep( timestep_ ), Averages( Averages_ ), blob_ids( ids ), id_list( id_list_ ), beta( beta_ ) { } ~GanglionTrackingWorkItem() {} virtual void run() { Averages.NumberComponents_NWP = blob_ids->first; Averages.Label_NWP = blob_ids->second; Averages.Label_NWP_map = *id_list; Averages.NumberComponents_WP = 1; Averages.Label_WP.fill( 0.0 ); if ( matches( type, AnalysisType::CopyPhaseIndicator ) ) { // Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus); } if ( matches( type, AnalysisType::ComputeAverages ) ) { PROFILE_START( "Compute ganglion", 1 ); Averages.Initialize(); Averages.ComputeDelPhi(); Averages.ColorToSignedDistance( beta, Averages.Phase, Averages.SDn ); Averages.ColorToSignedDistance( beta, Averages.Phase_tminus, Averages.Phase_tminus ); Averages.ColorToSignedDistance( beta, Averages.Phase_tplus, Averages.Phase_tplus ); Averages.UpdateMeshValues(); Averages.ComponentAverages(); Averages.SortBlobs(); Averages.PrintComponents( timestep ); PROFILE_STOP( "Compute ganglion", 1 ); } } private: GanglionTrackingWorkItem(); AnalysisType type; int timestep; TwoPhase &Averages; BlobIDstruct blob_ids; BlobIDList id_list; double beta; }; class BasicWorkItem : public ThreadPool::WorkItemRet { public: BasicWorkItem( AnalysisType type_, int timestep_, SubPhase &Averages_ ) : type( type_ ), timestep( timestep_ ), Averages( Averages_ ) { } ~BasicWorkItem() {} virtual void run() { if ( matches( type, AnalysisType::CopyPhaseIndicator ) ) { // Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus); } if ( matches( type, AnalysisType::ComputeAverages ) ) { PROFILE_START( "Compute basic averages", 1 ); Averages.Basic(); PROFILE_STOP( "Compute basic averages", 1 ); } } private: BasicWorkItem(); AnalysisType type; int timestep; SubPhase &Averages; double beta; }; class SubphaseWorkItem : public ThreadPool::WorkItemRet { public: SubphaseWorkItem( AnalysisType type_, int timestep_, SubPhase &Averages_ ) : type( type_ ), timestep( timestep_ ), Averages( Averages_ ) { } ~SubphaseWorkItem() {} virtual void run() { PROFILE_START( "Compute subphase", 1 ); Averages.Full(); Averages.Write( timestep ); PROFILE_STOP( "Compute subphase", 1 ); } private: SubphaseWorkItem(); AnalysisType type; int timestep; SubPhase &Averages; double beta; }; /****************************************************************** * MPI comm wrapper for use with analysis * ******************************************************************/ runAnalysis::commWrapper::commWrapper( int tag_, const Utilities::MPI &comm_, runAnalysis *analysis_ ) : comm( comm_ ), tag( tag_ ), analysis( analysis_ ) { } runAnalysis::commWrapper::commWrapper( commWrapper &&rhs ) : comm( rhs.comm ), tag( rhs.tag ), analysis( rhs.analysis ) { rhs.tag = -1; } runAnalysis::commWrapper::~commWrapper() { if ( tag == -1 ) return; comm.barrier(); analysis->d_comm_used[tag] = false; } runAnalysis::commWrapper runAnalysis::getComm() { // Get a tag from root int tag = -1; if ( d_rank == 0 ) { for ( int i = 0; i < 1024; i++ ) { if ( !d_comm_used[i] ) { tag = i; break; } } if ( tag == -1 ) ERROR( "Unable to get comm" ); } tag = d_comm.bcast( tag, 0 ); d_comm_used[tag] = true; if ( d_comms[tag].isNull() ) d_comms[tag] = d_comm.dup(); return commWrapper( tag, d_comms[tag], this ); } /****************************************************************** * Constructor/Destructors * ******************************************************************/ runAnalysis::runAnalysis( std::shared_ptr input_db, const RankInfoStruct &rank_info, std::shared_ptr ScaLBL_Comm, std::shared_ptr Dm, int Np, bool Regular, IntArray Map ) : d_Np( Np ), d_regular( Regular ), d_rank_info( rank_info ), d_Map( Map ), d_comm( Dm->Comm.dup() ), d_ScaLBL_Comm( ScaLBL_Comm ) { auto db = input_db->getDatabase( "Analysis" ); auto vis_db = input_db->getDatabase( "Visualization" ); // Ids of work items to use for dependencies ThreadPool::thread_id_t d_wait_blobID; ThreadPool::thread_id_t d_wait_analysis; ThreadPool::thread_id_t d_wait_vis; ThreadPool::thread_id_t d_wait_restart; ThreadPool::thread_id_t d_wait_subphase; char rankString[20]; sprintf( rankString, "%05d", Dm->rank() ); d_n[0] = Dm->Nx - 2; d_n[1] = Dm->Ny - 2; d_n[2] = Dm->Nz - 2; d_N[0] = Dm->Nx; d_N[1] = Dm->Ny; d_N[2] = Dm->Nz; d_restart_interval = db->getScalar( "restart_interval" ); d_analysis_interval = db->getScalar( "analysis_interval" ); d_subphase_analysis_interval = INT_MAX; d_visualization_interval = INT_MAX; d_blobid_interval = INT_MAX; if ( db->keyExists( "blobid_interval" ) ) { d_blobid_interval = db->getScalar( "blobid_interval" ); } if ( db->keyExists( "visualization_interval" ) ) { d_visualization_interval = db->getScalar( "visualization_interval" ); } if ( db->keyExists( "subphase_analysis_interval" ) ) { d_subphase_analysis_interval = db->getScalar( "subphase_analysis_interval" ); } auto restart_file = db->getScalar( "restart_file" ); d_restartFile = restart_file + "." + rankString; d_rank = d_comm.getRank(); writeIDMap( ID_map_struct(), 0, id_map_filename ); // Initialize IO for silo IO::initialize( "", "silo", "false" ); // Create the MeshDataStruct d_meshData.resize( 1 ); d_meshData[0].meshName = "domain"; d_meshData[0].mesh = std::make_shared( d_rank_info, d_n[0], d_n[1], d_n[2], Dm->Lx, Dm->Ly, Dm->Lz ); auto PhaseVar = std::make_shared(); auto PressVar = std::make_shared(); auto VxVar = std::make_shared(); auto VyVar = std::make_shared(); auto VzVar = std::make_shared(); auto SignDistVar = std::make_shared(); auto BlobIDVar = std::make_shared(); if ( vis_db->getWithDefault( "save_phase_field", true ) ) { PhaseVar->name = "phase"; PhaseVar->type = IO::VariableType::VolumeVariable; PhaseVar->dim = 1; PhaseVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( PhaseVar ); } if ( vis_db->getWithDefault( "save_pressure", false ) ) { PressVar->name = "Pressure"; PressVar->type = IO::VariableType::VolumeVariable; PressVar->dim = 1; PressVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( PressVar ); } if ( vis_db->getWithDefault( "save_velocity", false ) ) { VxVar->name = "Velocity_x"; VxVar->type = IO::VariableType::VolumeVariable; VxVar->dim = 1; VxVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( VxVar ); VyVar->name = "Velocity_y"; VyVar->type = IO::VariableType::VolumeVariable; VyVar->dim = 1; VyVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( VyVar ); VzVar->name = "Velocity_z"; VzVar->type = IO::VariableType::VolumeVariable; VzVar->dim = 1; VzVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( VzVar ); } if ( vis_db->getWithDefault( "save_distance", false ) ) { SignDistVar->name = "SignDist"; SignDistVar->type = IO::VariableType::VolumeVariable; SignDistVar->dim = 1; SignDistVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( SignDistVar ); } if ( vis_db->getWithDefault( "save_connected_components", false ) ) { BlobIDVar->name = "BlobID"; BlobIDVar->type = IO::VariableType::VolumeVariable; BlobIDVar->dim = 1; BlobIDVar->data.resize( d_n[0], d_n[1], d_n[2] ); d_meshData[0].vars.push_back( BlobIDVar ); } // Initialize the comms for ( int i = 0; i < 1024; i++ ) d_comm_used[i] = false; // Initialize the threads int N_threads = db->getWithDefault( "N_threads", 4 ); auto method = db->getWithDefault( "load_balance", "default" ); createThreads( method, N_threads ); } runAnalysis::~runAnalysis() { // Finish processing analysis finish(); } void runAnalysis::finish() { PROFILE_START( "finish" ); // Wait for the work items to finish d_tpool.wait_pool_finished(); // Clear the wait ids d_wait_blobID.reset(); d_wait_analysis.reset(); d_wait_vis.reset(); d_wait_subphase.reset(); d_wait_restart.reset(); // Syncronize d_comm.barrier(); PROFILE_STOP( "finish" ); } /****************************************************************** * Set the thread affinities * ******************************************************************/ void print( const std::vector &ids ) { if ( ids.empty() ) return; printf( "%i", ids[0] ); for ( size_t i = 1; i < ids.size(); i++ ) printf( ", %i", ids[i] ); printf( "\n" ); } void runAnalysis::createThreads( const std::string &method, int N_threads ) { // Check if we are not using analysis threads if ( method == "none" ) return; // Check if we have thread support auto thread_support = Utilities::MPI::queryThreadSupport(); if ( thread_support != Utilities::MPI::ThreadSupport::MULTIPLE && N_threads > 0 ) std::cerr << "Warning: Failed to start MPI with necessary thread support, errors may occur\n"; // Create the threads const auto cores = d_tpool.getProcessAffinity(); if ( N_threads == 0 ) { // Special case to serials the analysis for debugging d_tpool.setNumThreads( 0 ); } else if ( cores.empty() ) { // We were not able to get the cores for the process d_tpool.setNumThreads( N_threads ); } else if ( method == "default" ) { // Create the given number of threads, but let the OS manage affinities d_tpool.setNumThreads( N_threads ); } else if ( method == "independent" ) { int N = cores.size() - 1; d_tpool.setNumThreads( N ); d_tpool.setThreadAffinity( { cores[0] } ); for ( int i = 0; i < N; i++ ) d_tpool.setThreadAffinity( i, { cores[i + 1] } ); } // Print the current affinities if ( d_rank == 0 ) { printf( "Affinities - rank 0:\n" ); printf( "Main: " ); print( d_tpool.getProcessAffinity() ); for ( int i = 0; i < d_tpool.getNumThreads(); i++ ) { printf( "Thread %i: ", i + 1 ); print( d_tpool.getThreadAffinity( i ) ); } } } /****************************************************************** * Check which analysis we want to perform * ******************************************************************/ AnalysisType runAnalysis::computeAnalysisType( int timestep ) { AnalysisType type = AnalysisType::AnalyzeNone; if ( timestep % d_analysis_interval + 8 == d_analysis_interval ) { // Copy the phase indicator field for the earlier timestep // printf("Copy phase indicator,timestep=%i\n",timestep); type |= AnalysisType::CopyPhaseIndicator; } if ( timestep % d_blobid_interval == 0 ) { // Identify blobs and update global ids in time type |= AnalysisType::IdentifyBlobs; } /*#ifdef USE_CUDA if ( tpool.getQueueSize()<=3 && tpool.getNumThreads()>0 && timestep%50==0 ) { // Keep a few blob identifications queued up to keep the processors busy, // allowing us to track the blobs as fast as possible // Add more detailed estimates of the update frequency required to track blobs type |= AnalysisType::IdentifyBlobs; } #endif */ if ( timestep % d_analysis_interval + 4 == d_analysis_interval ) { // Copy the averages to the CPU (and identify blobs) // printf("Copy sim state, timestep=%i \n",timestep); type |= AnalysisType::CopySimState; type |= AnalysisType::IdentifyBlobs; } if ( timestep % d_analysis_interval == 0 ) { // Run the analysis // printf("Compute averages, timestep=%i \n",timestep); type |= AnalysisType::ComputeAverages; } if ( timestep % d_restart_interval == 0 ) { // Write the restart file type |= AnalysisType::CreateRestart; } if ( timestep % d_visualization_interval == 0 ) { // Write the visualization data type |= AnalysisType::WriteVis; type |= AnalysisType::CopySimState; type |= AnalysisType::IdentifyBlobs; } return type; } /****************************************************************** * Run the analysis * ******************************************************************/ void runAnalysis::run( int timestep, std::shared_ptr input_db, TwoPhase &Averages, const double *Phi, double *Pressure, double *Velocity, double *fq, double *Den ) { int N = d_N[0] * d_N[1] * d_N[2]; NULL_USE( N ); NULL_USE( Phi ); auto db = input_db->getDatabase( "Analysis" ); // int timestep = db->getWithDefault( "timestep", 0 ); // Check which analysis steps we need to perform auto type = computeAnalysisType( timestep ); if ( type == AnalysisType::AnalyzeNone ) return; // Check how may queued items we have if ( d_tpool.N_queued() > 20 ) { std::cerr << "Analysis queue is getting behind, waiting ...\n"; finish(); } PROFILE_START( "run" ); // Copy the appropriate variables to the host (so we can spawn new threads) ScaLBL_DeviceBarrier(); PROFILE_START( "Copy data to host", 1 ); std::shared_ptr phase; /* if ( matches(type,AnalysisType::CopyPhaseIndicator) || matches(type,AnalysisType::ComputeAverages) || matches(type,AnalysisType::CopySimState) || matches(type,AnalysisType::IdentifyBlobs) ) { phase = std::make_shared(d_N[0],d_N[1],d_N[2]); //ScaLBL_CopyToHost(phase->data(),Phi,N*sizeof(double)); // try 2 d_ScaLBL_Comm.RegulLayout(d_Map,Phi,Averages.Phase); // memcpy(Averages.Phase.data(),phase->data(),N*sizeof(double)); int Nx = d_N[0]; int Ny = d_N[1]; int Nz = d_N[2]; double *TmpDat; TmpDat = new double [d_Np]; ScaLBL_CopyToHost(&TmpDat[0],&Phi[0], d_Np*sizeof(double)); for (int k=0; kdata()[n]=value; } } } } delete [] TmpDat; } */ // if ( matches(type,AnalysisType::CopyPhaseIndicator) ) { if ( timestep % d_analysis_interval + 8 == d_analysis_interval ) { if ( d_regular ) d_ScaLBL_Comm->RegularLayout( d_Map, Phi, Averages.Phase_tplus ); else ScaLBL_CopyToHost( Averages.Phase_tplus.data(), Phi, N * sizeof( double ) ); // memcpy(Averages.Phase_tplus.data(),phase->data(),N*sizeof(double)); } if ( timestep % d_analysis_interval == 0 ) { if ( d_regular ) d_ScaLBL_Comm->RegularLayout( d_Map, Phi, Averages.Phase_tminus ); else ScaLBL_CopyToHost( Averages.Phase_tminus.data(), Phi, N * sizeof( double ) ); // memcpy(Averages.Phase_tminus.data(),phase->data(),N*sizeof(double)); } // if ( matches(type,AnalysisType::CopySimState) ) { if ( timestep % d_analysis_interval + 4 == d_analysis_interval ) { // Copy the members of Averages to the cpu (phase was copied above) PROFILE_START( "Copy-Pressure", 1 ); ScaLBL_D3Q19_Pressure( fq, Pressure, d_Np ); // ScaLBL_D3Q19_Momentum(fq,Velocity,d_Np); ScaLBL_DeviceBarrier(); PROFILE_STOP( "Copy-Pressure", 1 ); PROFILE_START( "Copy-Wait", 1 ); PROFILE_STOP( "Copy-Wait", 1 ); PROFILE_START( "Copy-State", 1 ); // memcpy(Averages.Phase.data(),phase->data(),N*sizeof(double)); if ( d_regular ) d_ScaLBL_Comm->RegularLayout( d_Map, Phi, Averages.Phase ); else ScaLBL_CopyToHost( Averages.Phase.data(), Phi, N * sizeof( double ) ); // copy other variables d_ScaLBL_Comm->RegularLayout( d_Map, Pressure, Averages.Press ); d_ScaLBL_Comm->RegularLayout( d_Map, &Velocity[0], Averages.Vel_x ); d_ScaLBL_Comm->RegularLayout( d_Map, &Velocity[d_Np], Averages.Vel_y ); d_ScaLBL_Comm->RegularLayout( d_Map, &Velocity[2 * d_Np], Averages.Vel_z ); PROFILE_STOP( "Copy-State", 1 ); } std::shared_ptr cfq, cDen; // if ( matches(type,AnalysisType::CreateRestart) ) { if ( timestep % d_restart_interval == 0 ) { // Copy restart data to the CPU cDen = make_shared_array( 2 * d_Np ); cfq = make_shared_array( 19 * d_Np ); ScaLBL_CopyToHost( cfq.get(), fq, 19 * d_Np * sizeof( double ) ); ScaLBL_CopyToHost( cDen.get(), Den, 2 * d_Np * sizeof( double ) ); } PROFILE_STOP( "Copy data to host", 1 ); // Spawn threads to do blob identification work if ( matches( type, AnalysisType::IdentifyBlobs ) ) { phase = std::make_shared( d_N[0], d_N[1], d_N[2] ); if ( d_regular ) d_ScaLBL_Comm->RegularLayout( d_Map, Phi, *phase ); else ScaLBL_CopyToHost( phase->data(), Phi, N * sizeof( double ) ); auto new_index = std::make_shared>( 0, IntArray() ); auto new_ids = std::make_shared>( 0, IntArray() ); auto new_list = std::make_shared>(); auto work1 = new BlobIdentificationWorkItem1( timestep, d_N[0], d_N[1], d_N[2], d_rank_info, phase, Averages.SDs, d_last_ids, new_index, new_ids, new_list, getComm() ); auto work2 = new BlobIdentificationWorkItem2( timestep, d_N[0], d_N[1], d_N[2], d_rank_info, phase, Averages.SDs, d_last_ids, new_index, new_ids, new_list, getComm() ); work1->add_dependency( d_wait_blobID ); work2->add_dependency( d_tpool.add_work( work1 ) ); d_wait_blobID = d_tpool.add_work( work2 ); d_last_index = new_index; d_last_ids = new_ids; d_last_id_map = new_list; } // Spawn threads to do the analysis work // if (timestep%d_restart_interval==0){ // if ( matches(type,AnalysisType::ComputeAverages) ) { if ( timestep % d_analysis_interval == 0 ) { auto work = new AnalysisWorkItem( type, timestep, Averages, d_last_index, d_last_id_map, d_beta ); work->add_dependency( d_wait_blobID ); work->add_dependency( d_wait_analysis ); work->add_dependency( d_wait_vis ); // Make sure we are done using analysis before modifying d_wait_analysis = d_tpool.add_work( work ); } // Spawn a thread to write the restart file // if ( matches(type,AnalysisType::CreateRestart) ) { if ( timestep % d_restart_interval == 0 ) { if ( d_rank == 0 ) { input_db->putScalar( "Restart", true ); std::ofstream OutStream( "Restart.db" ); input_db->print( OutStream, "" ); OutStream.close(); } // Write the restart file (using a seperate thread) auto work = new WriteRestartWorkItem( d_restartFile.c_str(), cDen, cfq, d_Np ); work->add_dependency( d_wait_restart ); d_wait_restart = d_tpool.add_work( work ); } // Save the results for visualization // if ( matches(type,AnalysisType::CreateRestart) ) { if ( timestep % d_restart_interval == 0 ) { // Write the vis files auto work = new WriteVisWorkItem( timestep, d_meshData, Averages, d_n, d_rank_info, getComm() ); work->add_dependency( d_wait_blobID ); work->add_dependency( d_wait_analysis ); work->add_dependency( d_wait_vis ); d_wait_vis = d_tpool.add_work( work ); } PROFILE_STOP( "run" ); } /****************************************************************** * Run the analysis * ******************************************************************/ void runAnalysis::basic( int timestep, std::shared_ptr input_db, SubPhase &Averages, const double *Phi, double *Pressure, double *Velocity, double *fq, double *Den ) { int Nx = d_N[0]; int Ny = d_N[1]; int Nz = d_N[2]; int N = Nx * Ny * Nz; NULL_USE( N ); // Check which analysis steps we need to perform auto color_db = input_db->getDatabase( "Color" ); auto vis_db = input_db->getDatabase( "Visualization" ); // int timestep = color_db->getWithDefault( "timestep", 0 ); auto type = computeAnalysisType( timestep ); if ( type == AnalysisType::AnalyzeNone ) return; // Check how may queued items we have if ( d_tpool.N_queued() > 20 ) { std::cerr << "Analysis queue is getting behind, waiting ...\n"; finish(); } PROFILE_START( "basic" ); // Copy the appropriate variables to the host (so we can spawn new threads) ScaLBL_DeviceBarrier(); PROFILE_START( "Copy data to host", 1 ); // if ( matches(type,AnalysisType::CopySimState) ) { if ( timestep % d_analysis_interval == 0 ) { finish(); // can't copy if threads are still working on data // Copy the members of Averages to the cpu (phase was copied above) PROFILE_START( "Copy-Pressure", 1 ); ScaLBL_D3Q19_Pressure( fq, Pressure, d_Np ); // ScaLBL_D3Q19_Momentum(fq,Velocity,d_Np); ScaLBL_DeviceBarrier(); PROFILE_STOP( "Copy-Pressure", 1 ); PROFILE_START( "Copy-Wait", 1 ); PROFILE_STOP( "Copy-Wait", 1 ); PROFILE_START( "Copy-State", 1 ); /*if (d_regular) d_ScaLBL_Comm->RegularLayout(d_Map,Phi,Averages.Phi); else */ ScaLBL_CopyToHost( Averages.Phi.data(), Phi, N * sizeof( double ) ); // copy other variables d_ScaLBL_Comm->RegularLayout( d_Map, Pressure, Averages.Pressure ); d_ScaLBL_Comm->RegularLayout( d_Map, &Den[0], Averages.Rho_n ); d_ScaLBL_Comm->RegularLayout( d_Map, &Den[d_Np], Averages.Rho_w ); d_ScaLBL_Comm->RegularLayout( d_Map, &Velocity[0], Averages.Vel_x ); d_ScaLBL_Comm->RegularLayout( d_Map, &Velocity[d_Np], Averages.Vel_y ); d_ScaLBL_Comm->RegularLayout( d_Map, &Velocity[2 * d_Np], Averages.Vel_z ); PROFILE_STOP( "Copy-State", 1 ); } PROFILE_STOP( "Copy data to host" ); // Spawn threads to do the analysis work // if (timestep%d_restart_interval==0){ // if ( matches(type,AnalysisType::ComputeAverages) ) { if ( timestep % d_analysis_interval == 0 ) { auto work = new BasicWorkItem( type, timestep, Averages ); work->add_dependency( d_wait_subphase ); // Make sure we are done using analysis before modifying work->add_dependency( d_wait_analysis ); work->add_dependency( d_wait_vis ); d_wait_analysis = d_tpool.add_work( work ); } if ( timestep % d_subphase_analysis_interval == 0 ) { auto work = new SubphaseWorkItem( type, timestep, Averages ); work->add_dependency( d_wait_subphase ); // Make sure we are done using analysis before modifying work->add_dependency( d_wait_analysis ); work->add_dependency( d_wait_vis ); d_wait_subphase = d_tpool.add_work( work ); } if ( timestep % d_restart_interval == 0 ) { std::shared_ptr cfq, cDen; // Copy restart data to the CPU cDen = make_shared_array( 2 * d_Np ); cfq = make_shared_array( 19 * d_Np ); ScaLBL_CopyToHost( cfq.get(), fq, 19 * d_Np * sizeof( double ) ); ScaLBL_CopyToHost( cDen.get(), Den, 2 * d_Np * sizeof( double ) ); if ( d_rank == 0 ) { color_db->putScalar( "timestep", timestep ); color_db->putScalar( "Restart", true ); input_db->putDatabase( "Color", color_db ); std::ofstream OutStream( "Restart.db" ); input_db->print( OutStream, "" ); OutStream.close(); } // Write the restart file (using a seperate thread) auto work1 = new WriteRestartWorkItem( d_restartFile.c_str(), cDen, cfq, d_Np ); work1->add_dependency( d_wait_restart ); d_wait_restart = d_tpool.add_work( work1 ); } if ( timestep % d_visualization_interval == 0 ) { // Write the vis files auto work = new IOWorkItem( timestep, input_db, d_meshData, Averages, d_n, d_rank_info, getComm() ); work->add_dependency( d_wait_analysis ); work->add_dependency( d_wait_subphase ); work->add_dependency( d_wait_vis ); d_wait_vis = d_tpool.add_work( work ); } PROFILE_STOP( "basic" ); } void runAnalysis::WriteVisData( int timestep, std::shared_ptr input_db, SubPhase &Averages, const double *Phi, double *Pressure, double *Velocity, double *fq, double *Den ) { auto color_db = input_db->getDatabase( "Color" ); auto vis_db = input_db->getDatabase( "Visualization" ); // int timestep = color_db->getWithDefault( "timestep", 0 ); // Check which analysis steps we need to perform auto type = computeAnalysisType( timestep ); if ( type == AnalysisType::AnalyzeNone ) return; // Check how may queued items we have if ( d_tpool.N_queued() > 20 ) { std::cerr << "Analysis queue is getting behind, waiting ...\n"; finish(); } // Copy the appropriate variables to the host (so we can spawn new threads) ScaLBL_DeviceBarrier(); PROFILE_START( "write vis", 1 ); // if (Averages.WriteVis == true){ auto work2 = new IOWorkItem( timestep, input_db, d_meshData, Averages, d_n, d_rank_info, getComm() ); work2->add_dependency( d_wait_vis ); d_wait_vis = d_tpool.add_work( work2 ); // Averages.WriteVis = false; PROFILE_STOP( "write vis" ); }