Modifying analysis in lbpm_color_simulator to run in seperate threads

This commit is contained in:
Mark Berrill 2015-08-22 11:03:46 -04:00
parent 4f6bceffc8
commit 3f771d9374
22 changed files with 5640 additions and 140 deletions

View File

@ -62,7 +62,7 @@ ADD_CUSTOM_TARGET( latex_docs )
ADD_CUSTOM_TARGET( build-test )
ADD_CUSTOM_TARGET( build-examples )
ADD_CUSTOM_TARGET( check COMMAND make test )
ADD_DISTCLEAN( analysis null_timer tests liblbpm-wia.* cpu gpu example common visit IO )
ADD_DISTCLEAN( analysis null_timer tests liblbpm-wia.* cpu gpu example common visit IO threadpool )
# Check the compile mode and compile flags
@ -133,6 +133,7 @@ IF ( NOT ONLY_BUILD_DOCS )
ADD_PACKAGE_SUBDIRECTORY( common )
ADD_PACKAGE_SUBDIRECTORY( analysis )
ADD_PACKAGE_SUBDIRECTORY( IO )
ADD_PACKAGE_SUBDIRECTORY( threadpool )
IF ( USE_CUDA )
ADD_PACKAGE_SUBDIRECTORY( gpu )
ELSE()
@ -140,6 +141,7 @@ IF ( NOT ONLY_BUILD_DOCS )
ENDIF()
INSTALL_LBPM_TARGET( lbpm-wia )
ADD_SUBDIRECTORY( tests )
ADD_SUBDIRECTORY( threadpool/test )
ADD_SUBDIRECTORY( example )
ENDIF()

View File

@ -1149,7 +1149,7 @@ inline void WriteLocalSolidDistance(char *FILENAME, double *Distance, int N)
}
inline void WriteCheckpoint(char *FILENAME, double *cDen, double *cDistEven, double *cDistOdd, int N)
inline void WriteCheckpoint(const char *FILENAME, const double *cDen, const double *cDistEven, const double *cDistOdd, int N)
{
int q,n;
double value;

View File

@ -13,9 +13,9 @@ extern "C" void AllocateDeviceMemory(void** address, size_t size);
//extern "C" void FreeDeviceMemory(void** address);
extern "C" void CopyToDevice(void* dest, void* source, size_t size);
extern "C" void CopyToDevice(void* dest, const void* source, size_t size);
extern "C" void CopyToHost(void* dest, void* source, size_t size);
extern "C" void CopyToHost(void* dest, const void* source, size_t size);
extern "C" void DeviceBarrier();
@ -47,8 +47,8 @@ extern "C" void MRT(char *ID, double *f_even, double *f_odd, double rlxA, double
extern "C" void ComputeVelocityD3Q19(char *ID, double *disteven, double *distodd, double *vel,
int Nx, int Ny, int Nz);
extern "C" void ComputePressureD3Q19(char *ID, double *disteven, double *distodd, double *Pressure,
int Nx, int Ny, int Nz);
extern "C" void ComputePressureD3Q19(const char *ID, const double *disteven, const double *distodd,
double *Pressure, int Nx, int Ny, int Nz);
extern "C" void PressureBC_inlet(double *disteven, double *distodd, double din,
int Nx, int Ny, int Nz);

View File

@ -388,8 +388,8 @@ extern "C" void ComputeVelocityD3Q19(char *ID, double *disteven, double *distodd
}
}
extern "C" void ComputePressureD3Q19(char *ID, double *disteven, double *distodd, double *Pressure,
int Nx, int Ny, int Nz)
extern "C" void ComputePressureD3Q19(const char *ID, const double *disteven, const double *distodd,
double *Pressure, int Nx, int Ny, int Nz)
{
int n,N;
// distributions

View File

@ -12,13 +12,13 @@ extern "C" void AllocateDeviceMemory(void** address, size_t size){
}
}
extern "C" void CopyToDevice(void* dest, void* source, size_t size){
extern "C" void CopyToDevice(void* dest, const void* source, size_t size){
// cudaMemcpy(dest,source,size,cudaMemcpyHostToDevice);
memcpy(dest, source, size);
}
extern "C" void CopyToHost(void* dest, void* source, size_t size){
extern "C" void CopyToHost(void* dest, const void* source, size_t size){
// cudaMemcpy(dest,source,size,cudaMemcpyDeviceToHost);
memcpy(dest, source, size);
}

View File

@ -300,8 +300,8 @@ __global__ void dvc_ComputeVelocityD3Q19(char *ID, double *disteven, double *di
}
}
__global__ void dvc_ComputePressureD3Q19(char *ID, double *disteven, double *distodd, double *Pressure,
int Nx, int Ny, int Nz)
__global__ void dvc_ComputePressureD3Q19(const char *ID, const double *disteven, const double *distodd,
double *Pressure, int Nx, int Ny, int Nz)
{
int n,N;
// distributions

View File

@ -10,7 +10,7 @@ extern "C" void AllocateDeviceMemory(void** address, size_t size){
}
}
extern "C" void CopyToDevice(void* dest, void* source, size_t size){
extern "C" void CopyToDevice(void* dest, const void* source, size_t size){
cudaMemcpy(dest,source,size,cudaMemcpyHostToDevice);
cudaError_t err = cudaGetLastError();
if (cudaSuccess != err){
@ -19,7 +19,7 @@ extern "C" void CopyToDevice(void* dest, void* source, size_t size){
}
extern "C" void CopyToHost(void* dest, void* source, size_t size){
extern "C" void CopyToHost(void* dest, const void* source, size_t size){
cudaMemcpy(dest,source,size,cudaMemcpyDeviceToHost);
cudaError_t err = cudaGetLastError();
if (cudaSuccess != err){

View File

@ -27,6 +27,7 @@ extern void GlobalFlipInitD3Q19(double *dist_even, double *dist_odd, int Nx, int
X = Nx*nprocx;
Y = Ny*nprocy;
Z = Nz*nprocz;
NULL_USE(Z);
N = (Nx+2)*(Ny+2)*(Nz+2); // size of the array including halo
for (k=0; k<Nz; k++){
for (j=0; j<Ny; j++){
@ -88,6 +89,7 @@ extern int GlobalCheckDebugDist(double *dist_even, double *dist_odd, int Nx, int
X = Nx*nprocx;
Y = Ny*nprocy;
Z = Nz*nprocz;
NULL_USE(Z);
N = (Nx+2)*(Ny+2)*(Nz+2); // size of the array including halo
for (k=0; k<Nz; k++){
for (j=0; j<Ny; j++){
@ -161,7 +163,6 @@ int main(int argc, char **argv)
// parallel domain size (# of sub-domains)
int nprocx,nprocy,nprocz;
int iproc,jproc,kproc;
int sendtag,recvtag;
//*****************************************
// MPI ranks for all 18 neighbors
//**********************************
@ -417,7 +418,6 @@ int main(int argc, char **argv)
starttime = MPI_Wtime();
//.........................................
sendtag = recvtag = 5;
//************ MAIN ITERATION LOOP (timing communications)***************************************/
while (timestep < 100){

View File

@ -229,7 +229,7 @@ int main(int argc, char **argv)
// Variables that specify the computational domain
string FILENAME;
unsigned int nBlocks, nthreads;
//unsigned int nBlocks, nthreads;
int Nx,Ny,Nz; // local sub-domain size
int nspheres; // number of spheres in the packing
double Lx,Ly,Lz; // Domain length
@ -248,8 +248,8 @@ int main(int argc, char **argv)
double fluid_isovalue,solid_isovalue;
fluid_isovalue = 0.0;
solid_isovalue = 0.0;
nBlocks = 32;
nthreads = 128;
//nBlocks = 32;
//nthreads = 128;
int RESTART_INTERVAL=20000;

View File

@ -55,9 +55,6 @@ int main(int argc, char **argv)
int i,j,k,n;
// pmmc threshold values
double fluid_isovalue,solid_isovalue;
fluid_isovalue = 0.0;
solid_isovalue = 0.0;
if (rank==0){
//.......................................................................

View File

@ -12,6 +12,9 @@
#include "common/MPI_Helpers.h"
#include "ProfilerApp.h"
#include "threadpool/thread_pool.h"
#include "lbpm_color_simulator.h"
//#define WRITE_SURFACES
@ -108,7 +111,6 @@ int main(int argc, char **argv)
// parallel domain size (# of sub-domains)
int nprocx,nprocy,nprocz;
int iproc,jproc,kproc;
int sendtag,recvtag;
//*****************************************
// MPI ranks for all 18 neighbors
//**********************************
@ -152,9 +154,9 @@ int main(int argc, char **argv)
int i,j,k;
// pmmc threshold values
double fluid_isovalue,solid_isovalue;
fluid_isovalue = 0.0;
solid_isovalue = 0.0;
//double fluid_isovalue,solid_isovalue;
//fluid_isovalue = 0.0;
//solid_isovalue = 0.0;
int RESTART_INTERVAL=20000;
@ -255,8 +257,7 @@ int main(int argc, char **argv)
double Ps = -(das-dbs)/(das+dbs);
double rlxA = 1.f/tau;
double rlxB = 8.f*(2.f-rlxA)/(8.f-rlxA);
double xIntPos;
xIntPos = log((1.0+phi_s)/(1.0-phi_s))/(2.0*beta);
//double xIntPos = log((1.0+phi_s)/(1.0-phi_s))/(2.0*beta);
// Set the density values inside the solid based on the input value phi_s
das = (phi_s+1.0)*0.5;
@ -299,6 +300,7 @@ int main(int argc, char **argv)
bool Restart;
if (InitialCondition==1) Restart=true;
else Restart=false;
NULL_USE(pBC); NULL_USE(velBC);
Domain Dm(Nx,Ny,Nz,rank,nprocx,nprocy,nprocz,Lx,Ly,Lz,BoundaryCondition);
TwoPhase Averages(Dm);
@ -523,11 +525,6 @@ int main(int argc, char **argv)
AllocateDeviceMemory((void **) &Den, 2*dist_mem_size);
AllocateDeviceMemory((void **) &Velocity, 3*dist_mem_size);
AllocateDeviceMemory((void **) &ColorGrad, 3*dist_mem_size);
//copies of data needed to perform checkpointing from cpu
double *cDen, *cDistEven, *cDistOdd;
cDen = new double[2*N];
cDistEven = new double[10*N];
cDistOdd = new double[9*N];
//...........................................................................
// Copy signed distance for device initialization
@ -552,12 +549,18 @@ int main(int argc, char **argv)
if (Restart == true){
if (rank==0) printf("Reading restart file! \n");
// Read in the restart file to CPU buffers
double *cDen = new double[2*N];
double *cDistEven = new double[10*N];
double *cDistOdd = new double[9*N];
ReadCheckpoint(LocalRestartFile, cDen, cDistEven, cDistOdd, N);
// Copy the restart data to the GPU
CopyToDevice(f_even,cDistEven,10*N*sizeof(double));
CopyToDevice(f_odd,cDistOdd,9*N*sizeof(double));
CopyToDevice(Den,cDen,2*N*sizeof(double));
DeviceBarrier();
delete [] cDen;
delete [] cDistEven;
delete [] cDistOdd;
MPI_Barrier(MPI_COMM_WORLD);
}
@ -651,7 +654,6 @@ int main(int argc, char **argv)
CopyToHost(Averages.Vel_z.get(),&Velocity[2*N],N*sizeof(double));
//...........................................................................
int timestep = -1;
if (rank==0) printf("********************************************************\n");
if (rank==0) printf("No. of timesteps: %i \n", timestepMax);
@ -662,15 +664,21 @@ int main(int argc, char **argv)
starttime = MPI_Wtime();
//.........................................
sendtag = recvtag = 5;
// Copy the data to the CPU
err = 1.0;
double sat_w_previous = 1.01; // slightly impossible value!
if (rank==0) printf("Begin timesteps: error tolerance is %f \n", tol);
//************ MAIN ITERATION LOOP ***************************************/
PROFILE_START("Loop");
IntArray GlobalBlobID, GlobalBlobID2;
while (timestep < timestepMax && err > tol ){
int N_procs = ThreadPool::getNumberOfProcessors();
std::vector<int> procs(N_procs);
for (int i=0; i<N_procs; i++)
procs[i] = i;
ThreadPool::setProcessAffinity(procs);
int timestep = -1;
AnalysisWaitIdStruct work_ids;
ThreadPool tpool(2);
BlobIDstruct last_ids;
while (timestep < timestepMax && err > tol ) {
PROFILE_START("Update");
//*************************************************************************
@ -773,98 +781,16 @@ int main(int argc, char **argv)
// Timestep completed!
timestep++;
//...................................................................
if (timestep%1000 == 995){
//...........................................................................
// Copy the phase indicator field for the earlier timestep
DeviceBarrier();
CopyToHost(Averages.Phase_tplus.get(),Phi,N*sizeof(double));
// Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus);
//...........................................................................
}
if (timestep%1000 == 0){
//...........................................................................
// Copy the data for for the analysis timestep
//...........................................................................
// Copy the phase from the GPU -> CPU
//...........................................................................
PROFILE_START("Copy phase");
DeviceBarrier();
ComputePressureD3Q19(ID,f_even,f_odd,Pressure,Nx,Ny,Nz);
CopyToHost(Averages.Phase.get(),Phi,N*sizeof(double));
CopyToHost(Averages.Press.get(),Pressure,N*sizeof(double));
CopyToHost(Averages.Vel_x.get(),&Velocity[0],N*sizeof(double));
CopyToHost(Averages.Vel_y.get(),&Velocity[N],N*sizeof(double));
CopyToHost(Averages.Vel_z.get(),&Velocity[2*N],N*sizeof(double));
MPI_Barrier(MPI_COMM_WORLD);
PROFILE_STOP("Copy phase");
}
if ( timestep%1000 == 0){
// Compute the global blob id and compare to the previous version
PROFILE_START("Identify blobs and maps");
DeviceBarrier();
CopyToHost(Averages.Phase.get(),Phi,N*sizeof(double));
double vF = 0.0;
double vS = 0.0;
int nblobs2 = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,
Averages.Phase,Averages.SDs,vF,vS,GlobalBlobID2);
if ( !GlobalBlobID.empty() ) {
// Compute the timestep-timestep map
ID_map_struct map = computeIDMap(GlobalBlobID,GlobalBlobID2);
// Renumber the current timestep's ids
}
Averages.NumberComponents_NWP = nblobs2;
Averages.Label_NWP.swap(GlobalBlobID2);
Averages.NumberComponents_WP = 1;
Averages.Label_WP.fill(0.0);
GlobalBlobID.swap(GlobalBlobID2);
PROFILE_STOP("Identify blobs and maps");
}
if (timestep%1000 == 5){
PROFILE_START("Compute dist");
//...........................................................................
// Copy the phase indicator field for the later timestep
DeviceBarrier();
CopyToHost(Averages.Phase_tminus.get(),Phi,N*sizeof(double));
// Averages.ColorToSignedDistance(beta,Averages.Phase_tminus,Averages.Phase_tminus);
//....................................................................
Averages.Initialize();
Averages.ComputeDelPhi();
Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.SDn);
Averages.UpdateMeshValues();
Averages.ComputeLocal();
Averages.Reduce();
Averages.PrintAll(timestep);
Averages.Initialize();
Averages.ComponentAverages();
Averages.SortBlobs();
Averages.PrintComponents(timestep);
//....................................................................
PROFILE_STOP("Compute dist");
}
if (timestep%RESTART_INTERVAL == 0){
PROFILE_START("Save Checkpoint");
if (pBC){
//err = fabs(sat_w - sat_w_previous);
//sat_w_previous = sat_w;
if (rank==0) printf("Timestep %i: change in saturation since last checkpoint is %f \n", timestep, err);
}
else{
// Not clear yet
}
// Copy the data to the CPU
CopyToHost(cDistEven,f_even,10*N*sizeof(double));
CopyToHost(cDistOdd,f_odd,9*N*sizeof(double));
CopyToHost(cDen,Den,2*N*sizeof(double));
// Read in the restart file to CPU buffers
WriteCheckpoint(LocalRestartFile, cDen, cDistEven, cDistOdd, N);
PROFILE_STOP("Save Checkpoint");
PROFILE_SAVE("lbpm_color_simulator",1);
}
// Run the analysis, blob identification, and write restart files
run_analysis(timestep,RESTART_INTERVAL,rank_info,Averages,last_ids,
Nx,Ny,Nz,pBC,beta,err,Phi,Pressure,Velocity,ID,f_even,f_odd,Den,
LocalRestartFile,tpool,work_ids);
}
tpool.wait_pool_finished();
PROFILE_STOP("Loop");
//************************************************************************/
//************************************************************************
DeviceBarrier();
MPI_Barrier(MPI_COMM_WORLD);
stoptime = MPI_Wtime();
@ -881,14 +807,14 @@ int main(int argc, char **argv)
if (rank==0) printf("Lattice update rate (total)= %f MLUPS \n", MLUPS);
if (rank==0) printf("********************************************************\n");
//************************************************************************/
// ************************************************************************
/* // Perform component averaging and write tcat averages
Averages.Initialize();
Averages.ComponentAverages();
Averages.SortBlobs();
Averages.PrintComponents(timestep);
//************************************************************************/
/*
// ************************************************************************
int NumberComponents_NWP = ComputeGlobalPhaseComponent(Dm.Nx-2,Dm.Ny-2,Dm.Nz-2,Dm.rank_info,Averages.PhaseID,1,Averages.Label_NWP);
printf("Number of non-wetting phase components: %i \n ",NumberComponents_NWP);
DeviceBarrier();
@ -936,7 +862,7 @@ int main(int argc, char **argv)
PHASE = fopen(LocalRankFilename,"wb");
fwrite(Averages.Phase.get(),8,N,PHASE);
fclose(PHASE);
*/
/* sprintf(LocalRankFilename,"%s%s","Pressure.",LocalRankString);
FILE *PRESS;
PRESS = fopen(LocalRankFilename,"wb");
@ -960,3 +886,5 @@ int main(int argc, char **argv)
MPI_Finalize();
// ****************************************************
}

View File

@ -0,0 +1,220 @@
// Run the analysis, blob identification, and write restart files
enum AnalysisType{ AnalyzeNone=0, IdentifyBlobs=0x01, CopyPhaseIndicator=0x02,
CopyAverages=0x02, CalcDist=0x02, CreateRestart=0x10 };
struct AnalysisWaitIdStruct {
ThreadPool::thread_id_t blobID;
ThreadPool::thread_id_t analysis;
ThreadPool::thread_id_t restart;
};
// Helper class to write the restart file from a seperate thread
class WriteRestartWorkItem: public ThreadPool::WorkItem
{
public:
WriteRestartWorkItem( const char* filename_, std::shared_ptr<double> cDen_,
std::shared_ptr<double> cDistEven_, std::shared_ptr<double>cDistOdd_, int N_ ):
filename(filename_), cDen(cDen_), cDistEven(cDistEven_), cDistOdd(cDistOdd_), N(N_) {}
virtual void run() {
PROFILE_START("Save Checkpoint",1);
WriteCheckpoint(filename,cDen.get(),cDistEven.get(),cDistOdd.get(),N);
PROFILE_STOP("Save Checkpoint",1);
PROFILE_SAVE("lbpm_color_simulator",1);
};
private:
WriteRestartWorkItem();
const char* filename;
std::shared_ptr<double> cDen, cDistEven, cDistOdd;
const int N;
};
// Helper class to compute the blob ids
typedef std::shared_ptr<std::pair<int,IntArray> > BlobIDstruct;
class BlobIdentificationWorkItem: public ThreadPool::WorkItem
{
public:
BlobIdentificationWorkItem( int Nx_, int Ny_, int Nz_, const RankInfoStruct& rank_info_,
std::shared_ptr<const double> phase_, const DoubleArray& dist_,
BlobIDstruct last_id_, BlobIDstruct new_id_ ):
Nx(Nx_), Ny(Ny_), Nz(Nz_), rank_info(rank_info_), phase(phase_),
dist(dist_), last_id(last_id_), new_id(new_id_) { }
virtual void run() {
// Compute the global blob id and compare to the previous version
PROFILE_START("Identify blobs and maps",1);
double vF = 0.0;
double vS = 0.0;
new_id->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,
*phase,dist,vF,vS,new_id->second);
if ( last_id==NULL ) {
// Compute the timestep-timestep map
ID_map_struct map = computeIDMap(last_id->second,new_id->second);
// Renumber the current timestep's ids
}
PROFILE_STOP("Identify blobs and maps",1);
}
private:
BlobIdentificationWorkItem();
int Nx, Ny, Nz;
const RankInfoStruct& rank_info;
std::shared_ptr<const double> phase;
const DoubleArray& dist;
BlobIDstruct last_id, new_id;
};
// Helper class to run the analysis from within a thread
// Note: Averages will be modified after the constructor is called
class AnalysisWorkItem: public ThreadPool::WorkItem
{
public:
AnalysisWorkItem( AnalysisType type_, int timestep_, TwoPhase& Averages_, BlobIDstruct ids, double beta_ ):
type(type_), timestep(timestep_), Averages(Averages_), blob_ids(ids), beta(beta_) { }
virtual void run() {
Averages.NumberComponents_NWP = blob_ids->first;
Averages.Label_NWP = blob_ids->second;
Averages.NumberComponents_WP = 1;
Averages.Label_WP.fill(0.0);
if ( (type&CopyPhaseIndicator) != 0 ) {
// Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.Phase_tplus);
}
if ( (type&CalcDist) != 0 ) {
PROFILE_START("Compute dist",1);
// Averages.ColorToSignedDistance(beta,Averages.Phase_tminus,Averages.Phase_tminus);
Averages.Initialize();
Averages.ComputeDelPhi();
Averages.ColorToSignedDistance(beta,Averages.Phase,Averages.SDn);
Averages.UpdateMeshValues();
Averages.ComputeLocal();
Averages.Reduce();
Averages.PrintAll(timestep);
Averages.Initialize();
Averages.ComponentAverages();
Averages.SortBlobs();
Averages.PrintComponents(timestep);
PROFILE_STOP("Compute dist",1);
}
}
private:
AnalysisWorkItem();
AnalysisType type;
int timestep;
TwoPhase& Averages;
BlobIDstruct blob_ids;
double beta;
};
// Function to start the analysis
void run_analysis( int timestep, int restart_interval,
const RankInfoStruct& rank_info, TwoPhase& Averages, BlobIDstruct& last_ids,
int Nx, int Ny, int Nz, bool pBC, double beta, double err,
const double *Phi, double *Pressure, const double *Velocity,
const char *ID, const double *f_even, const double *f_odd, const double *Den,
const char *LocalRestartFile, ThreadPool& tpool, AnalysisWaitIdStruct& wait )
{
int N = Nx*Ny*Nz;
// Determin the analysis we want to perform
AnalysisType type = AnalyzeNone;
if ( timestep%1000 == 995 ) {
// Copy the phase indicator field for the earlier timestep
type = static_cast<AnalysisType>( type | CopyPhaseIndicator );
}
if ( timestep%1000 == 0 ) {
type = static_cast<AnalysisType>( type | CopyAverages );
type = static_cast<AnalysisType>( type | IdentifyBlobs );
}
if ( timestep%1000 == 5 ) {
type = static_cast<AnalysisType>( type | CalcDist );
}
if (timestep%restart_interval == 0) {
type = static_cast<AnalysisType>( type | CreateRestart );
}
// Return if we are not doing anything
if ( type == AnalyzeNone )
return;
PROFILE_START("start_analysis");
// Copy the appropriate variables to the host (so we can spawn new threads)
DeviceBarrier();
PROFILE_START("Copy data to host",1);
std::shared_ptr<double> phase;
if ( (type&CopyPhaseIndicator)!=0 || (type&CalcDist)!=0 ||
(type&CopyAverages)!=0 || (type&IdentifyBlobs)!=0 )
{
std::shared_ptr<double>(new double[N],DeleteArray<double>);
CopyToHost(phase.get(),Phi,N*sizeof(double));
}
if ( (type&CopyPhaseIndicator)!=0 ) {
memcpy(Averages.Phase_tplus.get(),phase.get(),N*sizeof(double));
}
if ( (type&CalcDist)!=0 ) {
memcpy(Averages.Phase_tminus.get(),phase.get(),N*sizeof(double));
}
if ( (type&CopyAverages) != 0 ) {
// Copy the members of Averages to the cpu (phase was copied above)
ComputePressureD3Q19(ID,f_even,f_odd,Pressure,Nx,Ny,Nz);
memcpy(Averages.Phase.get(),phase.get(),N*sizeof(double));
DeviceBarrier();
CopyToHost(Averages.Press.get(),Pressure,N*sizeof(double));
CopyToHost(Averages.Vel_x.get(),&Velocity[0],N*sizeof(double));
CopyToHost(Averages.Vel_y.get(),&Velocity[N],N*sizeof(double));
}
std::shared_ptr<double> cDen, cDistEven, cDistOdd;
if ( (type&CreateRestart) != 0 ) {
// Copy restart data to the CPU
cDen = std::shared_ptr<double>(new double[2*N],DeleteArray<double>);
cDistEven = std::shared_ptr<double>(new double[10*N],DeleteArray<double>);
cDistOdd = std::shared_ptr<double>(new double[9*N],DeleteArray<double>);
CopyToHost(cDistEven.get(),f_even,10*N*sizeof(double));
CopyToHost(cDistOdd.get(),f_odd,9*N*sizeof(double));
CopyToHost(cDen.get(),Den,2*N*sizeof(double));
}
PROFILE_STOP("Copy data to host",1);
// Spawn threads to do blob identification work
if ( (type&IdentifyBlobs)!=0 ) {
BlobIDstruct new_ids;
ThreadPool::WorkItem *work = new BlobIdentificationWorkItem(
Nx,Ny,Nz,rank_info,phase,Averages.SDs,last_ids,new_ids);
work->add_dependency(wait.blobID);
last_ids = new_ids;
wait.blobID = tpool.add_work(work);
}
// Spawn threads to do the analysis work
if ( (type&CalcDist) != 0 ) {
ThreadPool::WorkItem *work = new AnalysisWorkItem(type,timestep,Averages,last_ids,beta);
work->add_dependency(wait.blobID);
work->add_dependency(wait.analysis);
wait.analysis = tpool.add_work(work);
}
// Spawn a thread to write the restart file
if ( (type&CreateRestart) != 0 ) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
if (pBC) {
//err = fabs(sat_w - sat_w_previous);
//sat_w_previous = sat_w;
if (rank==0) printf("Timestep %i: change in saturation since last checkpoint is %f \n",timestep,err);
} else {
// Not clear yet
}
// Write the restart file (using a seperate thread)
WriteRestartWorkItem *work = new WriteRestartWorkItem(LocalRestartFile,cDen,cDistEven,cDistOdd,N);
work->add_dependency(wait.restart);
wait.restart = tpool.add_work(work);
}
PROFILE_STOP("start_analysis");
}

View File

@ -31,7 +31,7 @@ inline void ReadDiscPacking(int ndiscs, double *List_cx, double *List_cy, double
char * line = new char[100];
// We will read until a blank like or end-of-file is reached
int count = 0;
while ( !feof(fid) && fgets(line,100,fid)>0 ) {
while ( !feof(fid) && fgets(line,100,fid)!=NULL ) {
char* line2 = line;
List_cx[count] = strtod(line2,&line2);
List_cy[count] = strtod(line2,&line2);
@ -71,6 +71,7 @@ inline void SignedDistanceDiscPack(double *Distance, int ndiscs, double *List_cx
min_x = double(iproc*(Nx-2)-1)*hx;
min_y = double(jproc*(Ny-2)-1)*hy;
min_z = double(kproc*(Nz-2)-1)*hz;
NULL_USE(min_x);
//............................................
//............................................

View File

@ -74,7 +74,6 @@ int main (int argc, char **argv)
DTMutableList<Point> local_nws_pts(20);
int n_local_nws_pts;
int n_nw_tris_beg, n_ns_tris_beg, n_ws_tris_beg;
int c;
int newton_steps = 0;
//...........................................................................
@ -153,10 +152,6 @@ int main (int argc, char **argv)
n_nw_pts=0,n_ns_pts=0,n_ws_pts=0,n_nws_pts=0, map=0;
n_nw_tris=0, n_ns_tris=0, n_ws_tris=0, n_nws_seg=0;
n_nw_tris_beg = 0;// n_nw_tris;
n_ns_tris_beg = 0;//n_ns_tris;
n_ws_tris_beg = 0;//n_ws_tris;
// if there is a solid phase interface in the grid cell
if (Interface(SignDist,solid_isovalue,i,j,k) == 1){
@ -387,9 +382,6 @@ int main (int argc, char **argv)
// Reset the triangle counts to zero
n_nw_pts=0,n_ns_pts=0,n_ws_pts=0,n_nws_pts=0, map=0;
n_nw_tris=0, n_ns_tris=0, n_ws_tris=0, n_nws_seg=0;
n_nw_tris_beg = 0;// n_nw_tris;
// n_ns_tris_beg = 0;//n_ns_tris;
// n_ws_tris_beg = 0;//n_ws_tris;
// n_nws_seg_beg = n_nws_seg;
//*******************************************************************
}

View File

@ -0,0 +1,28 @@
#include "threadpool/atomic_helpers.h"
#include <stdexcept>
#ifdef USE_PTHREAD_ATOMIC_LOCK
// Print a warning if we defaulted to use pthreads for atomic operations
// This can decrease the performance of atomic operations
// We print the message here so it is only printed once
#warning using pthreads for atomic operations, this may affect performance
#endif
namespace AtomicOperations {
#ifdef USE_PTHREAD_ATOMIC_LOCK
pthread_mutex_t atomic_pthread_lock;
static pthread_mutexattr_t threadpool_global_attr;
static int create_atomic_pthread_lock( ) {
pthread_mutexattr_init(&threadpool_global_attr);
int error = pthread_mutex_init(&atomic_pthread_lock,&threadpool_global_attr);
if ( error != 0 ) { throw std::logic_error("Error initializing mutex:"); }
return error;
}
int atomic_pthread_lock_initialized = create_atomic_pthread_lock();
#endif
} // AtomicOperations namespace

335
threadpool/atomic_helpers.h Normal file
View File

@ -0,0 +1,335 @@
// Copyright © 2004 Mark Berrill. All Rights Reserved. This work is distributed with permission,
// but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#ifndef included_ThreadPoolAtomicHelpers
#define included_ThreadPoolAtomicHelpers
#include <stdio.h>
#include <typeinfo>
#include <stdint.h>
// Choose the OS
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
// Using windows
#define USE_WINDOWS
#define NOMINMAX
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#elif defined(__APPLE__)
// Using MAC
#define USE_MAC
#include <libkern/OSAtomic.h>
#elif defined(__linux) || defined(__unix) || defined(__posix)
// Using Linux
#define USE_LINUX
#include <unistd.h>
#if !defined(__GNUC__)
#define USE_PTHREAD_ATOMIC_LOCK
#include "pthread.h"
#endif
#else
#error Unknown OS
#endif
/** \namespace atomic
* \brief Functions for atomic operations
* \details This class provides wrapper routines to access simple atomic operations.
* Since atomic operations are system dependent, these functions are necessary
* to provide a platform independent interface. We also provide some typedef
* variables to wrap OS dependencies. Currently we have 32 and 64 bit integers:
* int32_atomic and int64_atomic. In all cases the operations use the barrier
* versions provided by the compiler/OS if availible. In most cases, these builtins
* are considered a full barrier. That is, no memory operand will be moved across
* the operation, either forward or backward. Further, instructions will be issued
* as necessary to prevent the processor from speculating loads across the operation
* and from queuing stores after the operation.
* Note: for all functions the variable being modified must be volatile to prevent
* compiler optimization that may cache the value.
*/
namespace AtomicOperations {
// Define int32_atomic, int64_atomic
#include <stdint.h>
#if defined(USE_WINDOWS)
typedef long int32_atomic;
typedef __int64 int64_atomic;
#define NO_INST_ATTR
#elif defined(USE_MAC)
typedef int32_t int32_atomic;
typedef int64_t int64_atomic;
#define NO_INST_ATTR
#elif defined(__GNUC__)
typedef int int32_atomic;
typedef long int int64_atomic;
#define NO_INST_ATTR __attribute__((no_instrument_function))
#elif defined(USE_PTHREAD_ATOMIC_LOCK)
typedef int int32_atomic;
typedef long int int64_atomic;
#define NO_INST_ATTR
#else
#error Unknown OS
#endif
/**
* \brief Increment returning the new value
* \details Increment x and return the new value
* \param[in] x The pointer to the value to increment
*/
inline int32_atomic atomic_increment( int32_atomic volatile *x ) NO_INST_ATTR;
/**
* \brief Increment returning the new value
* \details Increment x and return the new value
* \param[in] x The pointer to the value to increment
*/
inline int64_atomic atomic_increment( int64_atomic volatile *x ) NO_INST_ATTR;
/**
* \brief Decrement returning the new value
* \details Decrement x and return the new value
* \param[in] x The pointer to the value to decrement
*/
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) NO_INST_ATTR;
/**
* \brief Decrement returning the new value
* \details Decrement x and return the new value
* \param[in] x The pointer to the value to decrement
*/
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) NO_INST_ATTR;
/**
* \brief Add returning the new value
* \details Add y to x and return the new value
* \param[in] x The pointer to the value to add to
* \param[in] y The value to add
*/
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) NO_INST_ATTR;
/**
* \brief Add returning the new value
* \details Add y to x and return the new value
* \param[in] x The pointer to the value to add to
* \param[in] y The value to add
*/
inline int64_atomic atomic_add( int64_atomic volatile *x, int32_atomic y ) NO_INST_ATTR;
/**
* \brief Compare the given value and swap
* \details Compare the existing value and swap if it matches.
* This function returns the previous value.
* To return a bool indicating if the swap was performed,
* use "bool t = atomic_compare_and_swap(v,x,y)==x".
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y );
/**
* \brief Compare the given value and swap
* \details Compare the existing value and swap if it matches.
* This function returns the previous value.
* To return a bool indicating if the swap was performed,
* use "bool t = atomic_compare_and_swap(v,x,y)==x".
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y );
/**
* \brief Compare the given value and swap
* \details Compare the existing value and swap if it matches.
* This function returns the previous value.
* To return a bool indicating if the swap was performed,
* use "bool t = atomic_compare_and_swap(v,x,y)==x".
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y );
// Define increment/decrement/add operators for int32, int64
#if defined(USE_WINDOWS)
inline int32_atomic atomic_increment( int32_atomic volatile *x ) {
return InterlockedIncrement(x);
}
inline int64_atomic atomic_increment( int64_atomic volatile *x ) {
return InterlockedIncrement64(x);
}
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) {
return InterlockedDecrement(x);
}
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) {
return InterlockedDecrement64(x);
}
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) {
return InterlockedExchangeAdd(x,y)+y;
}
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) {
return InterlockedExchangeAdd64(x,y)+y;
}
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) {
return InterlockedCompareExchange(v,x,y);
}
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) {
return InterlockedCompareExchange64(v,x,y);
}
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) {
return InterlockedCompareExchangePointer(v,x,y);
}
#elif defined(USE_MAC)
inline int32_atomic atomic_increment( int32_atomic volatile *x ) {
return OSAtomicIncrement32Barrier(x);
}
inline int64_atomic atomic_increment( int64_atomic volatile *x ) {
return OSAtomicIncrement64Barrier(x);
}
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) {
return OSAtomicDecrement32Barrier(x);
}
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) {
return OSAtomicDecrement64Barrier(x);
}
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) {
return OSAtomicAdd32Barrier(y,x);
}
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) {
return OSAtomicAdd64Barrier(y,x);
}
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) {
return OSAtomicCompareAndSwap32Barrier(x,y,v) ? y:x;
}
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) {
return OSAtomicCompareAndSwap64Barrier(x,y,v) ? y:x;
}
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) {
return OSAtomicCompareAndSwapPtrBarrier(x,y,v) ? y:x;
}
#elif defined(__GNUC__)
int32_atomic atomic_increment( int32_atomic volatile *x ) {
return __sync_add_and_fetch(x,1);
}
int64_atomic atomic_increment( int64_atomic volatile *x ) {
return __sync_add_and_fetch(x,1);
}
int32_atomic atomic_decrement( int32_atomic volatile *x ) {
return __sync_sub_and_fetch(x,1);
}
int64_atomic atomic_decrement( int64_atomic volatile *x ) {
return __sync_sub_and_fetch(x,1);
}
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) {
return __sync_add_and_fetch(x,y);
}
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) {
return __sync_add_and_fetch(x,y);
}
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) {
return __sync_val_compare_and_swap(v,x,y);
}
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) {
return __sync_val_compare_and_swap(v,x,y);
}
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) {
return __sync_val_compare_and_swap(v,x,y);
}
#elif defined(USE_PTHREAD_ATOMIC_LOCK)
extern pthread_mutex_t atomic_pthread_lock;
inline int32_atomic atomic_increment( int32_atomic volatile *x ) {
pthread_mutex_lock(&atomic_pthread_lock);
int32_atomic y = ++(*x);
pthread_mutex_unlock(&atomic_pthread_lock);
return y;
}
inline int64_atomic atomic_increment( int64_atomic volatile *x ) {
pthread_mutex_lock(&atomic_pthread_lock);
int64_atomic y = ++(*x);
pthread_mutex_unlock(&atomic_pthread_lock);
return y;
}
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) {
pthread_mutex_lock(&atomic_pthread_lock);
int32_atomic y = --(*x);
pthread_mutex_unlock(&atomic_pthread_lock);
return y;
}
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) {
pthread_mutex_lock(&atomic_pthread_lock);
int64_atomic y = --(*x);
pthread_mutex_unlock(&atomic_pthread_lock);
return y;
}
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) {
pthread_mutex_lock(&atomic_pthread_lock);
*x += y;
int32_atomic z = *x;
pthread_mutex_unlock(&atomic_pthread_lock);
return z;
}
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) {
pthread_mutex_lock(&atomic_pthread_lock);
*x += y;
int64_atomic z = *x;
pthread_mutex_unlock(&atomic_pthread_lock);
return z;
}
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) {
pthread_mutex_lock(&atomic_pthread_lock);
*v = (*v==x) ? y:x;
int32_atomic z = *v;
pthread_mutex_unlock(&atomic_pthread_lock);
return z;
}
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) {
pthread_mutex_lock(&atomic_pthread_lock);
*v = (*v==x) ? y:x;
int64_atomic z = *v;
pthread_mutex_unlock(&atomic_pthread_lock);
return z;
}
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) {
pthread_mutex_lock(&atomic_pthread_lock);
*v = (*v==x) ? y:x;
void* z = *v;
pthread_mutex_unlock(&atomic_pthread_lock);
return z;
}
#else
#error Unknown OS
#endif
// Define an atomic counter
struct counter_t {
public:
// Constructor
inline counter_t(): count(0) {}
// Destructor
inline ~counter_t() {} // Destructor
// Increment returning the new value
inline int increment() { return atomic_increment(&count);}
// Decrement returning the new value
inline int decrement() { return atomic_decrement(&count);}
// Set the current value of the count
inline void setCount(int val) { count = val;}
// Get the current value of the count
inline int getCount() const { return count;}
private:
counter_t( const counter_t& );
counter_t& operator=( const counter_t& );
volatile int32_atomic count;
};
} // namespace atomic
#endif

View File

@ -0,0 +1,17 @@
include ( macros )
# Add thread pool tests
ADD_LBPM_TEST( test_atomic )
SET_TESTS_PROPERTIES ( test_atomic PROPERTIES FAIL_REGULAR_EXPRESSION ".*FAILED.*" PROCESSORS 64 )
ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 1 4 )
ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 2 4 )
ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 4 4 )
SET_PROPERTY( TEST test_thread_pool_1procs_4threads APPEND PROPERTY RUN_SERIAL 1 )
IF ( USE_MPI )
SET_PROPERTY( TEST test_thread_pool_2procs_4threads APPEND PROPERTY RUN_SERIAL 1 )
SET_PROPERTY( TEST test_thread_pool_4procs_4threads APPEND PROPERTY RUN_SERIAL 1 )
ENDIF()

View File

@ -0,0 +1,246 @@
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <string>
#include <vector>
#include "threadpool/atomic_helpers.h"
#include "common/Utilities.h"
#include "common/UnitTest.h"
#define perr std::cerr
#define pout std::cout
#define printp printf
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
// Using windows
#define USE_WINDOWS
#define NOMINMAX
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#elif defined(__APPLE__)
// Using MAC
#define USE_MAC
#include <unistd.h>
#include <mach/mach_init.h>
#include <mach/thread_policy.h>
#elif defined(__linux) || defined(__unix) || defined(__posix)
// Using Linux
#define USE_LINUX
#include <pthread.h>
#include <unistd.h>
#else
#error Unknown OS
#endif
#ifdef USE_WINDOWS
#include <windows.h>
#define TIME_TYPE LARGE_INTEGER
#define get_time(x) QueryPerformanceCounter(x)
#define get_diff(start,end,f) (((double)(end.QuadPart-start.QuadPart))/((double)f.QuadPart))
#define get_frequency(f) QueryPerformanceFrequency(f)
#define sleep(x) Sleep(x*1000)
#elif defined(USE_LINUX) || defined(USE_MAC)
#include <sys/time.h>
#define TIME_TYPE timeval
#define get_time(x) gettimeofday(x,NULL);
#define get_diff(start,end,f) (((double)end.tv_sec-start.tv_sec)+1e-6*((double)end.tv_usec-start.tv_usec))
#define get_frequency(f) (*f=timeval())
#else
#error Unknown OS
#endif
// Function to increment/decrement a counter N times
struct counter_data {
AtomicOperations::counter_t *counter;
int N;
};
void modify_counter( counter_data *data ) {
int N = data->N;
AtomicOperations::counter_t &counter = *(data->counter);
if ( N > 0 ) {
for (int i=0; i<N; i++)
counter.increment();
} else if ( N < 0 ) {
for (int i=0; i<-N; i++)
counter.decrement();
}
}
// Define the thread handle type
#ifdef USE_WINDOWS
typedef HANDLE thread_handle;
#elif defined(USE_LINUX) || defined(USE_MAC)
typedef pthread_t* thread_handle;
#else
#error Unknown OS
#endif
// Create a thread
#ifdef USE_WINDOWS
static thread_handle create_thread( void (*routine)(void*), void* data ) {
return (HANDLE)_beginthread( routine, 0, data);
}
#elif defined(USE_LINUX) || defined(USE_MAC)
static thread_handle create_thread( void (*routine)(void*), void* data ) {
pthread_t *id = new pthread_t;
pthread_create( id, NULL, (void*(*)(void*)) routine, data );
return id;
}
#else
#error Unknown OS
#endif
// Destroy a thread
#ifdef USE_WINDOWS
static void destroy_thread( thread_handle id ) {
WaitForMultipleObjects( 1, &id, 1, 10000 );
}
#elif defined(USE_LINUX) || defined(USE_MAC)
static void destroy_thread( thread_handle id ) {
pthread_join(*id,NULL);
delete id;
}
#else
#error Unknown OS
#endif
/******************************************************************
* The main program *
******************************************************************/
#ifdef USE_WINDOWS
int __cdecl main(int, char **) {
#elif defined(USE_LINUX) || defined(USE_MAC)
int main(int, char*[]) {
#else
#error Unknown OS
#endif
UnitTest ut;
int N_threads = 64; // Number of threads
int N_count = 1000000; // Number of work items
TIME_TYPE start, end, f;
get_frequency(&f);
// Ensure we are using all processors
#ifdef __USE_GNU
int N_procs = sysconf( _SC_NPROCESSORS_ONLN );
cpu_set_t mask;
CPU_ZERO(&mask);
for (int i=0; i<N_procs; i++)
CPU_SET(i,&mask);
sched_setaffinity(getpid(), sizeof(cpu_set_t), &mask );
#endif
// Create the counter we want to test
AtomicOperations::counter_t count;
counter_data data;
data.counter = &count;
data.N = 0;
if ( count.increment() == 1 )
ut.passes("increment count");
else
ut.failure("increment count");
if ( count.decrement() == 0 )
ut.passes("decrement count");
else
ut.failure("decrement count");
count.setCount(3);
if ( count.getCount() == 3 )
ut.passes("set count");
else
ut.failure("set count");
count.setCount(0);
// Increment the counter in serial
data.N = N_count;
get_time(&start);
modify_counter( &data );
get_time(&end);
double time_inc_serial = get_diff(start,end,f)/N_count;
int val = count.getCount();
if ( val != N_count ) {
char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,N_count);
ut.failure(tmp);
}
printp("Time to increment (serial) = %0.1f ns\n",1e9*time_inc_serial);
// Decrement the counter in serial
data.N = -N_count;
get_time(&start);
modify_counter( &data );
get_time(&end);
double time_dec_serial = get_diff(start,end,f)/N_count;
val = count.getCount();
if ( val != 0 ) {
char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,0);
ut.failure(tmp);
}
printp("Time to decrement (serial) = %0.1f ns\n",1e9*time_dec_serial);
// Increment the counter in parallel
data.N = N_count;
std::vector<thread_handle> thread_ids(N_threads);
get_time(&start);
for (int i=0; i<N_threads; i++) {
thread_ids[i] = create_thread( (void (*)(void*)) modify_counter, (void*) &data );
}
for (int i=0; i<N_threads; i++) {
destroy_thread( thread_ids[i] );
}
get_time(&end);
double time_inc_parallel = get_diff(start,end,f)/(N_count*N_threads);
val = count.getCount();
if ( val != N_count*N_threads ) {
char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,N_count*N_threads);
ut.failure(tmp);
}
printp("Time to increment (parallel) = %0.1f ns\n",1e9*time_inc_parallel);
// Decrement the counter in parallel
data.N = -N_count;
get_time(&start);
for (int i=0; i<N_threads; i++) {
thread_ids[i] = create_thread( (void (*)(void*)) modify_counter, (void*) &data );
}
for (int i=0; i<N_threads; i++) {
destroy_thread( thread_ids[i] );
}
get_time(&end);
double time_dec_parallel = get_diff(start,end,f)/(N_count*N_threads);
val = count.getCount();
if ( val != 0 ) {
char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,0);
ut.failure(tmp);
}
printp("Time to decrement (parallel) = %0.1f ns\n",1e9*time_dec_parallel);
// Check the time to increment/decrement
if ( time_inc_serial>100e-9 || time_dec_serial>100e-9 || time_inc_parallel>100e-9 || time_dec_serial>100e-9 ) {
#if USE_GCOV
ut.expected_failure("Time to increment/decrement count is too expensive");
#else
ut.failure("Time to increment/decrement count is too expensive");
#endif
} else {
ut.passes("Time to increment/decrement passed");
}
// Finished
ut.report();
int N_errors = ut.NumFailGlobal();
return N_errors;
}

View File

@ -0,0 +1,947 @@
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
#include <stdexcept>
#include "threadpool/thread_pool.h"
#include "common/UnitTest.h"
#include "common/Utilities.h"
#include "ProfilerApp.h"
#include "math.h"
#define perr std::cerr
#define pout std::cout
#define printp printf
#define MAX(x,y) ((x) > (y) ? (x) : (y))
#ifdef USE_WINDOWS
#include <windows.h>
#define TIME_TYPE LARGE_INTEGER
#define get_time(x) QueryPerformanceCounter(x)
#define get_diff(start,end,f) (((double)(end.QuadPart-start.QuadPart))/((double)f.QuadPart))
#define get_frequency(f) QueryPerformanceFrequency(f)
#define sleep(x) Sleep(x*1000)
#define sleepMs(x) Sleep(x)
#elif defined(USE_LINUX) || defined(USE_MAC)
#include <sys/time.h>
#define TIME_TYPE timeval
#define get_time(x) gettimeofday(x,NULL);
#define get_diff(start,end,f) (((double)end.tv_sec-start.tv_sec)+1e-6*((double)end.tv_usec-start.tv_usec))
#define get_frequency(f) (*f=timeval())
#define sleepMs(x) usleep(1000*x)
#else
#error Unknown OS
#endif
#ifdef USE_MPI
#include "mpi.h"
#endif
// Wrapper function for mpi barrier
static inline void barrier() {
#ifdef USE_MPI
MPI_Barrier( MPI_COMM_WORLD );
#endif
}
// Function to waste CPU cycles
void waste_cpu(int N) {
if ( N > 10000 ) { PROFILE_START("waste_cpu",2); }
double pi = 3.141592653589793;
double x = 1.0;
N = std::max(10,N);
{ for (int i=0; i<N; i++) x = sqrt(x*exp(pi/x)); } // style to limit gcov hits
if ( fabs(x-2.926064057273157) > 1e-12 ) { abort(); }
if ( N > 10000 ) { PROFILE_STOP("waste_cpu",2); }
}
// Function to sleep for N seconds then increment a global count
static volatile int global_sleep_count = 0;
void sleep_inc(int N) {
PROFILE_START("sleep_inc");
sleep(N);
++global_sleep_count;
PROFILE_STOP("sleep_inc");
}
void sleep_inc2(double x) {
sleepMs(static_cast<int>(round(x*1000)));
++global_sleep_count;
}
void sleep_msg( double x, std::string msg ) {
PROFILE_START(msg);
sleepMs(static_cast<int>(round(x*1000)));
PROFILE_STOP(msg);
}
bool check_inc(int N) {
return global_sleep_count==N;
}
// Function to return the processor for the given thread
void print_processor( ThreadPool* tpool )
{
int rank = 0;
#ifdef USE_MPI
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
#endif
int thread = tpool->getThreadNumber();
int processor = ThreadPool::getCurrentProcessor();
char tmp[100];
sprintf(tmp,"%i: Thread,proc = %i,%i\n",rank,thread,processor);
std::cout << tmp;
sleepMs(100);
}
// Function to test how a member thread interacts with the thread pool
int test_member_thread( ThreadPool *tpool ) {
int N_errors = 0;
// Member threads are not allowed to wait for the pool to finish
try {
tpool->wait_pool_finished();
N_errors++;
} catch (...) {
}
// Member threads are not allowed to change the size of the pool
try {
tpool->wait_pool_finished();
N_errors++;
} catch (...) {
}
return N_errors;
}
// Function to test creating and locking a mutex
int test_mutex(bool recursive) {
int N_errors = 0;
Mutex lock(recursive); // Create a lock
Mutex lock2 = lock; // Copy the lock
// Test getting and releasing the lock
lock.lock();
lock.unlock();
lock2.lock();
lock2.unlock();
bool own1 = lock.ownLock();
bool own2 = lock2.ownLock();
lock.lock();
bool own3 = lock.ownLock();
bool own4 = lock2.ownLock();
lock.unlock();
bool own5 = lock.ownLock();
if ( own1 || own2 || !own3 || !own4 || own5 )
return 1;
if ( recursive ) {
// Test the behavior of a recursive lock
lock.lock();
if ( !lock.tryLock() )
return 1;
lock.unlock();
lock.lock();
lock.unlock();
} else {
// Test the behavior of a non-recursive lock
lock.lock();
if ( lock.tryLock() )
return 1;
lock.unlock();
try {
lock.unlock();
N_errors++;
} catch (...) {
}
try {
lock.lock();
lock.lock();
N_errors++;
} catch (...) {
lock.unlock();
}
try {
lock.lock();
lock2.lock();
N_errors++;
lock.unlock();
lock2.unlock();
} catch (...) {
lock.unlock();
}
}
return N_errors;
}
// Functions to test the templates
int myfun0() { return 0; }
int myfun1(int) { return 1; }
int myfun2(int,float) { return 2; }
int myfun3(int,float,double) { return 3; }
int myfun4(int,float,double,char) { return 4; }
int myfun5(int,float,double,char,std::string) { return 5; }
int myfun6(int,float,double,char,std::string,int) { return 6; }
int myfun7(int,float,double,char,std::string,int,int) { return 7; }
// Function to test instantiation of functions with different number of arguments
void vfunarg00() { }
void vfunarg01(int) { }
void vfunarg02(int,char) { }
void vfunarg03(int,char,double) { }
void vfunarg04(int,char,double,int) { }
void vfunarg05(int,char,double,int,char) { }
void vfunarg06(int,char,double,int,char,double) { }
void vfunarg07(int,char,double,int,char,double,int) { }
void vfunarg08(int,char,double,int,char,double,int,char) { }
void vfunarg09(int,char,double,int,char,double,int,char,double) { }
void vfunarg10(int,char,double,int,char,double,int,char,double,int) { }
void vfunarg11(int,char,double,int,char,double,int,char,double,int,char) { }
void vfunarg12(int,char,double,int,char,double,int,char,double,int,char,double) { }
void vfunarg13(int,char,double,int,char,double,int,char,double,int,char,double,int) { }
void vfunarg14(int,char,double,int,char,double,int,char,double,int,char,double,int,char) { }
void vfunarg15(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { }
void vfunarg16(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int) { }
void vfunarg17(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char) { }
void vfunarg18(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { }
void vfunarg19(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int) { }
void vfunarg20(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char) { }
void vfunarg21(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { }
void vfunarg22(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int) { }
void vfunarg23(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char) { }
void vfunarg24(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { }
int funarg00() { return 0; }
int funarg01(int) { return 1; }
int funarg02(int,char) { return 2; }
int funarg03(int,char,double) { return 3; }
int funarg04(int,char,double,int) { return 4; }
int funarg05(int,char,double,int,char) { return 5; }
int funarg06(int,char,double,int,char,double) { return 6; }
int funarg07(int,char,double,int,char,double,int) { return 7; }
int funarg08(int,char,double,int,char,double,int,char) { return 8; }
int funarg09(int,char,double,int,char,double,int,char,double) { return 9; }
int funarg10(int,char,double,int,char,double,int,char,double,int) { return 10; }
int funarg11(int,char,double,int,char,double,int,char,double,int,char) { return 11; }
int funarg12(int,char,double,int,char,double,int,char,double,int,char,double) { return 12; }
int funarg13(int,char,double,int,char,double,int,char,double,int,char,double,int) { return 13; }
int funarg14(int,char,double,int,char,double,int,char,double,int,char,double,int,char) { return 14; }
int funarg15(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { return 15; }
int funarg16(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int) { return 16; }
int funarg17(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char) { return 17; }
int funarg18(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { return 18; }
int funarg19(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int) { return 19; }
int funarg20(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char) { return 20; }
int funarg21(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { return 21; }
int funarg22(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int) { return 22; }
int funarg23(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char) { return 23; }
int funarg24(int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double,int,char,double) { return 24; }
int test_function_arguements( ThreadPool* tpool ) {
int N_errors = 0;
// Test some basic types of instantiations
ThreadPool::thread_id_t id0 = TPOOL_ADD_WORK( tpool, myfun0, (NULL) );
ThreadPool::thread_id_t id1 = TPOOL_ADD_WORK( tpool, myfun1, ( (int) 1 ) );
ThreadPool::thread_id_t id2 = TPOOL_ADD_WORK( tpool, myfun2, ( (int) 1, (float) 2 ) );
ThreadPool::thread_id_t id3 = TPOOL_ADD_WORK( tpool, myfun3, ( (int) 1, (float) 2, (double) 3 ) );
ThreadPool::thread_id_t id4 = TPOOL_ADD_WORK( tpool, myfun4, ( (int) 1, (float) 2, (double) 3, (char) 4 ) );
ThreadPool::thread_id_t id5 = TPOOL_ADD_WORK( tpool, myfun5, ( (int) 1, (float) 2, (double) 3, (char) 4, std::string("test") ) );
ThreadPool::thread_id_t id52= TPOOL_ADD_WORK( tpool, myfun5, ( (int) 1, (float) 2, (double) 3, (char) 4, std::string("test") ), -1 );
ThreadPool::thread_id_t id6 = TPOOL_ADD_WORK( tpool, myfun6, ( (int) 1, (float) 2, (double) 3, (char) 4, std::string("test"), (int) 1 ) );
ThreadPool::thread_id_t id7 = TPOOL_ADD_WORK( tpool, myfun7, ( (int) 1, (float) 2, (double) 3, (char) 4, std::string("test"), (int) 1, (int) 1 ) );
tpool->wait_pool_finished();
if ( !tpool->isFinished(id0) ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id0) != 0 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id1) != 1 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id2) != 2 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id3) != 3 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id4) != 4 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id5) != 5 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id52)!= 5 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id6) != 6 ) { N_errors++; }
if ( tpool->getFunctionRet<int>(id7) != 7 ) { N_errors++; }
// Test all the different numbers of arguments allowed
TPOOL_ADD_WORK( tpool, vfunarg00, (NULL) );
TPOOL_ADD_WORK( tpool, vfunarg01, ( 1) );
TPOOL_ADD_WORK( tpool, vfunarg02, ( 1, 'a' ) );
TPOOL_ADD_WORK( tpool, vfunarg03, ( 1, 'a', 3.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg04, ( 1, 'a', 3.0, 4 ) );
TPOOL_ADD_WORK( tpool, vfunarg05, ( 1, 'a', 3.0, 4, 'e' ) );
TPOOL_ADD_WORK( tpool, vfunarg06, ( 1, 'a', 3.0, 4, 'e', 6.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg07, ( 1, 'a', 3.0, 4, 'e', 6.0, 7 ) );
TPOOL_ADD_WORK( tpool, vfunarg08, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h' ) );
TPOOL_ADD_WORK( tpool, vfunarg09, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg10, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10 ) );
TPOOL_ADD_WORK( tpool, vfunarg11, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k' ) );
TPOOL_ADD_WORK( tpool, vfunarg12, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg13, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13 ) );
TPOOL_ADD_WORK( tpool, vfunarg14, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n' ) );
TPOOL_ADD_WORK( tpool, vfunarg15, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg16, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16 ) );
TPOOL_ADD_WORK( tpool, vfunarg17, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q' ) );
TPOOL_ADD_WORK( tpool, vfunarg18, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg19, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19 ) );
TPOOL_ADD_WORK( tpool, vfunarg20, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't' ) );
TPOOL_ADD_WORK( tpool, vfunarg21, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0 ) );
TPOOL_ADD_WORK( tpool, vfunarg22, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0, 22 ) );
TPOOL_ADD_WORK( tpool, vfunarg23, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0, 22, 'w' ) );
TPOOL_ADD_WORK( tpool, vfunarg24, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0, 22, 'w', 24.0 ) );
std::vector<ThreadPool::thread_id_t> ids(25);
ids[0] = TPOOL_ADD_WORK( tpool, funarg00, (NULL) );
ids[1] = TPOOL_ADD_WORK( tpool, funarg01, ( 1 ) );
ids[2] = TPOOL_ADD_WORK( tpool, funarg02, ( 1, 'a' ) );
ids[3] = TPOOL_ADD_WORK( tpool, funarg03, ( 1, 'a', 3.0 ) );
ids[4] = TPOOL_ADD_WORK( tpool, funarg04, ( 1, 'a', 3.0, 4 ) );
ids[5] = TPOOL_ADD_WORK( tpool, funarg05, ( 1, 'a', 3.0, 4, 'e' ) );
ids[6] = TPOOL_ADD_WORK( tpool, funarg06, ( 1, 'a', 3.0, 4, 'e', 6.0 ) );
ids[7] = TPOOL_ADD_WORK( tpool, funarg07, ( 1, 'a', 3.0, 4, 'e', 6.0, 7) );
ids[8] = TPOOL_ADD_WORK( tpool, funarg08, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h' ) );
ids[9] = TPOOL_ADD_WORK( tpool, funarg09, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0 ) );
ids[10] = TPOOL_ADD_WORK( tpool, funarg10, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10 ) );
ids[11] = TPOOL_ADD_WORK( tpool, funarg11, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k' ) );
ids[12] = TPOOL_ADD_WORK( tpool, funarg12, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0 ) );
ids[13] = TPOOL_ADD_WORK( tpool, funarg13, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13 ) );
ids[14] = TPOOL_ADD_WORK( tpool, funarg14, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'h' ) );
ids[15] = TPOOL_ADD_WORK( tpool, funarg15, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'h', 15.0 ) );
ids[16] = TPOOL_ADD_WORK( tpool, funarg16, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16 ) );
ids[17] = TPOOL_ADD_WORK( tpool, funarg17, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q' ) );
ids[18] = TPOOL_ADD_WORK( tpool, funarg18, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0 ) );
ids[19] = TPOOL_ADD_WORK( tpool, funarg19, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19 ) );
ids[20] = TPOOL_ADD_WORK( tpool, funarg20, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't' ) );
ids[21] = TPOOL_ADD_WORK( tpool, funarg21, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0 ) );
ids[22] = TPOOL_ADD_WORK( tpool, funarg22, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0, 22 ) );
ids[23] = TPOOL_ADD_WORK( tpool, funarg23, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0, 22, 'w' ) );
ids[24] = TPOOL_ADD_WORK( tpool, funarg24, ( 1, 'a', 3.0, 4, 'e', 6.0, 7, 'h', 9.0, 10, 'k', 12.0, 13, 'n', 15.0, 16, 'q', 18.0, 19, 't', 21.0, 22, 'w', 24.0 ) );
tpool->wait_all( ids );
for (size_t i=0; i<ids.size(); i++) {
if ( tpool->getFunctionRet<int>(ids[i]) != static_cast<int>(i) )
N_errors++;
}
return N_errors;
}
/******************************************************************
* Examples to derive a user work item *
******************************************************************/
class UserWorkItemVoid: public ThreadPool::WorkItem {
public:
// User defined constructor (does not need to match any intrefaces)
UserWorkItemVoid( int dummy )
{
// User initialized variables
NULL_USE(dummy);
// Set class variables
ThreadPool::WorkItem::d_has_result = false;
ThreadPool::WorkItem::d_state = 0;
}
// User defined run (can do anything)
void run()
{
// Set the state (always do this first)
ThreadPool::WorkItem::d_state = 1;
// Perform the tasks
printf("Hello work from UserWorkItem (void)");
// Set the state (always do this last)
ThreadPool::WorkItem::d_state = 2;
}
// User defined destructor
virtual ~UserWorkItemVoid()
{
}
};
class UserWorkItemInt: public ThreadPool::WorkItemRet<int> {
public:
// User defined constructor (does not need to match any intrefaces)
UserWorkItemInt( int dummy )
{
// User initialized variables
NULL_USE(dummy);
// Set class variables
ThreadPool::WorkItem::d_has_result = true;
ThreadPool::WorkItem::d_state = 0;
}
// User defined run (can do anything)
void run()
{
// Set the state (always do this first)
ThreadPool::WorkItem::d_state = 1;
// Perform the tasks
printf("Hello work from UserWorkItem (int)");
// Store the results (it's type will match the template)
ThreadPool::WorkItemRet<int>::d_result = 1;
// Set the state (always do this last)
ThreadPool::WorkItem::d_state = 2;
}
// User defined destructor
virtual ~UserWorkItemInt()
{
}
};
/******************************************************************
* test the time to run N tasks in parallel *
******************************************************************/
inline double run_parallel( ThreadPool *tpool, int N_tasks, int N_work )
{
// Make sure the thread pool is empty
tpool->wait_pool_finished();
// Add the work
TIME_TYPE start, end, f;
get_frequency(&f);
std::vector<ThreadPool::thread_id_t> ids;
ids.reserve(N_tasks);
get_time(&start);
for (int i=0; i<N_tasks; i++)
ids.push_back( TPOOL_ADD_WORK( tpool, waste_cpu, (N_work) ) );
// Wait for the thread pool to finish
tpool->wait_pool_finished();
// Compute the time spent running the tasks
get_time(&end);
return get_diff(start,end,f);
}
/******************************************************************
* The main program *
******************************************************************/
#ifdef USE_WINDOWS
int __cdecl main(int argc, char **argv) {
#elif defined(USE_LINUX) || defined(USE_MAC)
int main(int argc, char* argv[]) {
#else
#error Unknown OS
#endif
int N_threads = 4; // Number of threads
int N_work = 2000; // Number of work items
int N_it = 10; // Number of cycles to run
int N_problem = 4; // Problem size
PROFILE_ENABLE(3);
PROFILE_ENABLE_TRACE();
UnitTest ut;
// Initialize MPI and set the error handlers
int rank = 0;
int size = 1;
#ifdef USE_MPI
int provided_thread_support=-1;
MPI_Init_thread(&argc,&argv,MPI_THREAD_MULTIPLE,&provided_thread_support);
MPI_Comm_size( MPI_COMM_WORLD, &size );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
Utilities::setErrorHandlers();
#endif
NULL_USE(size);
// Disable OS specific warnings for all non-root ranks
if ( rank > 0 )
ThreadPool::set_OS_warnings(1);
// Initialize the data
std::vector<int> data1(N_work,0);
std::vector<int> priority(N_work,0);
for (int i=0; i<N_work; i++) {
data1[i] = N_problem;
priority[i] = i%128;
}
TIME_TYPE start, end, f, start2, end2;
get_frequency(&f);
// Print the size of the thread pool class
printp("Size of ThreadPool = %i\n",(int)sizeof(ThreadPool));
// Create and test a mutex
barrier();
printp("Testing mutex\n");
int N_errors_mutex = test_mutex(false);
N_errors_mutex += test_mutex(true);
if ( N_errors_mutex == 0 )
ut.passes("test mutex");
else
ut.failure("Errors found testing mutex");
// Get the number of processors availible
barrier();
int N_procs = 0;
try {
N_procs = ThreadPool::getNumberOfProcessors();
} catch (...) {
}
if ( N_procs>0 )
ut.passes("getNumberOfProcessors");
else
ut.failure("getNumberOfProcessors");
printp("%i processors availible\n",N_procs);
// Get the processor affinities for the process
barrier();
std::vector<int> cpus;
try {
cpus = ThreadPool::getProcessAffinity();
printp("%i cpus for current process: ",(int)cpus.size());
for (size_t i=0; i<cpus.size(); i++)
printp("%i ",cpus[i]);
printp("\n");
} catch (...) {
}
if ( !cpus.empty() ) {
ut.passes("getProcessAffinity");
} else {
#ifdef __APPLE__
ut.expected_failure("getProcessAffinity");
#else
ut.failure("getProcessAffinity");
#endif
}
// Test setting the process affinities
barrier();
bool pass = false;
if ( !cpus.empty() && N_procs>0 ) {
if ( cpus.size()==1 ) {
cpus.resize(N_procs);
for (int i=0; i<N_procs; i++)
cpus.push_back( i );
try {
ThreadPool::setProcessAffinity( cpus );
} catch (...) {
}
cpus = ThreadPool::getProcessAffinity();
std::vector<int> cpus = ThreadPool::getProcessAffinity();
printp("%i cpus for current process (updated): ",(int)cpus.size());
for (size_t i=0; i<cpus.size(); i++)
printp("%i ",cpus[i]);
printp("\n");
pass = cpus.size() > 1;
} else {
std::vector<int> cpus_orig = cpus;
std::vector<int> cpus_tmp(1,cpus[0]);
try {
ThreadPool::setProcessAffinity( cpus_tmp );
} catch (...) {
}
cpus = ThreadPool::getProcessAffinity();
if ( cpus.size() == 1 )
pass = true;
try {
ThreadPool::setProcessAffinity( cpus_orig );
} catch (...) {
}
cpus = ThreadPool::getProcessAffinity();
if ( cpus.size() != cpus_orig.size() )
pass = false;
}
}
if ( pass ) {
ut.passes("setProcessAffinity");
} else {
#ifdef __APPLE__
ut.expected_failure("setProcessAffinity");
#else
ut.failure("setProcessAffinity");
#endif
}
int N_procs_used = std::min<int>(N_procs,N_threads);
printp("%i processors used\n",N_procs_used);
// Create the thread pool
barrier();
printp("Creating thread pool\n");
ThreadPool tpool0;
ThreadPool tpool;
ThreadPool::thread_id_t id;
id = TPOOL_ADD_WORK( &tpool, waste_cpu, ( data1[0] ) );
if ( id==ThreadPool::thread_id_t() || !tpool.isValid(id) )
ut.failure("Errors with id");
tpool.setNumThreads(N_threads);
if ( tpool.getNumThreads()==N_threads )
ut.passes("Created thread pool");
else
ut.failure("Failed to create tpool with desired number of threads");
// Test creating/destroying a thread pool using new
barrier();
pass = true;
try {
ThreadPool *tpool2 = new ThreadPool(MAX_NUM_THREADS-1);
if ( tpool2->getNumThreads() != MAX_NUM_THREADS-1 )
pass = false;
if ( !ThreadPool::is_valid(tpool2) )
pass = false;
delete tpool2;
// Check that tpool2 is invalid
// Note: valgrind will report this as an invalid memory read, but we want to keep the test)
if ( ThreadPool::is_valid(tpool2) )
pass = false;
} catch(...) {
pass = false;
}
if ( tpool.getNumThreads()==N_threads )
ut.passes("Created/destroyed thread pool with new");
else
ut.failure("Created/destroyed thread pool with new");
// Test setting the thread affinities
barrier();
if ( cpus.size()>1 ) {
sleepMs(50);
// First make sure we can get the thread affinities
std::vector<int> procs = ThreadPool::getThreadAffinity( );
if ( procs == cpus ) {
ut.passes("getThreadAffinity() matches procs");
} else {
char msg[100];
sprintf(msg,"getThreadAffinity() does not match procs (%i,%i)",
static_cast<int>(procs.size()), static_cast<int>(cpus.size()));
ut.failure(msg);
}
pass = true;
for (int i=0; i<N_threads; i++) {
std::vector<int> procs_thread = tpool.getThreadAffinity( i );
if ( procs_thread != procs ) {
printp("%i: Initial thread affinity: ",rank);
for (size_t i=0; i<procs_thread.size(); i++)
printp("%i ",procs_thread[i]);
printp("\n");
pass = false;
}
}
if ( pass )
ut.passes("getThreadAffinity(thread) matches procs");
else
ut.failure("getThreadAffinity(thread) does not match procs");
// Try to set the thread affinities
pass = true;
if ( !procs.empty() ) {
int N_procs_thread = std::max<int>((int)cpus.size()/N_threads,1);
for (int i=0; i<N_threads; i++) {
std::vector<int> procs_thread(N_procs_thread,-1);
for (int j=0; j<N_procs_thread; j++)
procs_thread[j] = procs[(i*N_procs_thread+j)%procs.size()];
tpool.setThreadAffinity( i, procs_thread );
sleepMs(10); // Give time for OS to update thread affinities
std::vector<int> procs_thread2 = tpool.getThreadAffinity( i );
if ( procs_thread2 != procs_thread ) {
printp("%i: Final thread affinity: ",rank);
for (size_t i=0; i<procs_thread.size(); i++)
printp("%i ",procs_thread[i]);
printp("\n");
pass = false;
}
}
}
if ( pass )
ut.passes("setThreadAffinity passes");
else
ut.failure("setThreadAffinity failed to change affinity");
}
// Reset the thread affinities
barrier();
tpool.setNumThreads(tpool.getNumThreads(),"none");
//tpool.setNumThreads(tpool.getNumThreads(),"independent");
for (int i=0; i<N_threads; i++) {
std::vector<int> procs_thread = tpool.getThreadAffinity( i );
printp("Thread affinity: ");
for (size_t i=0; i<procs_thread.size(); i++)
printp("%i ",procs_thread[i]);
printp("\n");
}
// Print the current processors by thread id
barrier();
print_processor(&tpool);
for (int i=0; i<N_threads; i++)
TPOOL_ADD_WORK( &tpool, print_processor, ( &tpool ) );
tpool.wait_pool_finished();
// Run some basic tests
barrier();
get_time(&start);
for (int n=0; n<N_it; n++) {
for (int i=0; i<N_work; i++)
waste_cpu(data1[i]);
}
get_time(&end);
double time = get_diff(start,end,f);
printp("Time for serial cycle = %0.0f us\n",1e6*time/N_it);
printp("Time for serial item = %0.0f ns\n",1e9*time/(N_it*N_work));
id = TPOOL_ADD_WORK( &tpool, waste_cpu, ( data1[0] ) );
tpool.wait(id);
std::vector<ThreadPool::thread_id_t> ids2;
ids2.push_back( TPOOL_ADD_WORK( &tpool, waste_cpu, ( data1[0] ) ) );
tpool.wait(ids2[0]);
// Test calling functions with different number of arguments
barrier();
printp("Testing arguments:\n");
int N_errors_args = test_function_arguements( &tpool );
if ( N_errors_args == 0 )
ut.passes("Calling function with default arguments");
else
ut.failure("Error calling function with default arguments");
// Check that the threads can sleep in parallel (this does not depend on the number of processors)
barrier();
tpool.wait_pool_finished();
get_time(&start);
sleep_inc(1);
get_time(&end);
double sleep_serial = get_diff(start,end,f);
ids2.clear();
get_time(&start);
for (int i=0; i<N_threads; i++)
ids2.push_back( TPOOL_ADD_WORK( &tpool,sleep_inc, (1) ) );
tpool.wait_all(N_procs_used,&ids2[0]);
ids2.clear();
get_time(&end);
double sleep_parallel = get_diff(start,end,f);
double sleep_speedup = N_procs_used*sleep_serial/sleep_parallel;
printf("%i: Speedup on %i sleeping threads: %0.3f\n",rank,N_procs_used,sleep_speedup);
printf("%i: ts = %0.3f, tp = %0.3f\n",rank,sleep_serial,sleep_parallel);
if ( fabs(sleep_serial-1.0)<0.05 && fabs(sleep_parallel-1.0)<0.075 )
ut.passes("Passed thread sleep");
else
ut.failure("Failed thread sleep");
// Check that the threads are actually working in parallel
barrier();
if ( N_procs_used>1 ) {
#ifdef USE_MPI
// Use a non-blocking serialization of the MPI processes
// if we do not have a sufficient number of processors
bool serialize_mpi = N_procs < N_threads*size;
int buf;
MPI_Request request;
MPI_Status status;
if ( serialize_mpi && rank>0 ) {
MPI_Irecv( &buf, 1, MPI_INT, rank-1, 0, MPI_COMM_WORLD, &request );
int flag = false;
while ( !flag ) {
MPI_Test( &request, &flag, &status );
sleep(1);
}
}
#endif
int N = 20000000; // Enough work to keep the processor busy for ~ 1 s
// Run in serial
get_time(&start);
waste_cpu(N);
get_time(&end);
double time_serial = get_diff(start,end,f);
// Run in parallel
double time_parallel2 = run_parallel( &tpool, N_procs_used, N/1000 );
double time_parallel = run_parallel( &tpool, N_procs_used, N );
double speedup = N_procs_used*time_serial/time_parallel;
printf("%i: Speedup on %i procs: %0.3f\n",rank,N_procs_used,speedup);
printf("%i: ts = %0.3f, tp = %0.3f, tp2 = %0.3f\n",rank,time_serial,time_parallel,time_parallel2);
if ( speedup > 1.4 ) {
ut.passes("Passed speedup test");
} else {
#ifdef USE_GCOV
ut.expected_failure("Times do not indicate tests are running in parallel (gcov)");
#else
ut.failure("Times do not indicate tests are running in parallel");
#endif
}
#ifdef USE_MPI
if ( serialize_mpi ) {
if ( rank<size-1 )
MPI_Send( &N, 1, MPI_INT, rank+1, 0, MPI_COMM_WORLD );
if ( rank==size-1 ) {
for (int i=0; i<size-1; i++)
MPI_Send( &N, 1, MPI_INT, i, 1, MPI_COMM_WORLD );
} else {
MPI_Irecv( &buf, 1, MPI_INT, size-1, 1, MPI_COMM_WORLD, &request );
int flag = false;
MPI_Status status;
while ( !flag ) {
MPI_Test( &request, &flag, &status );
sleep(1);
}
}
}
#endif
} else {
ut.expected_failure("Testing thread performance with less than 1 processor");
}
// Test adding a work item with a dependency
barrier();
{
// Test that we sucessfully wait on the work items
std::vector<ThreadPool::thread_id_t> ids;
ids.reserve(5);
global_sleep_count = 0; // Reset the count before this test
ThreadPool::thread_id_t id1 = TPOOL_ADD_WORK( &tpool, sleep_inc, ( 1 ) );
ThreadPool::thread_id_t id2 = TPOOL_ADD_WORK( &tpool, sleep_inc, ( 2 ) );
ThreadPool::WorkItem *wait1 = new WorkItemFull<bool,int>( check_inc, 1 );
ThreadPool::WorkItem *wait2 = new WorkItemFull<bool,int>( check_inc, 2 );
wait1->add_dependency(id1);
wait2->add_dependency(id1); wait2->add_dependency(id2);
ids.clear();
ids.push_back( tpool.add_work(wait1) );
ids.push_back( tpool.add_work(wait2) );
tpool.wait_all(ids.size(),&ids[0]);
if ( !tpool.getFunctionRet<bool>(ids[0]) || !tpool.getFunctionRet<bool>(ids[1]) )
ut.failure("Failed to wait on required dependency");
else
ut.passes("Dependencies");
tpool.wait_pool_finished();
// Check that we can handle more complex dependencies
id1 = TPOOL_ADD_WORK( &tpool, sleep_inc2, ( 0.5 ) );
for (int i=0; i<10; i++) {
wait1 = new WorkItemFull<bool,int>( check_inc, 1 );
wait1->add_dependency(id1);
tpool.add_work(wait1);
}
tpool.wait_pool_finished();
ids.clear();
for (int i=0; i<5; i++)
ids.push_back( TPOOL_ADD_WORK( &tpool, sleep_inc2, (0.5) ) );
sleep_inc2(0.002);
ThreadPool::WorkItem *work = new WorkItemFull<void,int>( waste_cpu, 100 );
work->add_dependencies(ids);
id = tpool.add_work(work,10);
tpool.wait(id);
}
// Test the timing adding a single item
barrier();
for (int it=0; it<2; it++) {
ThreadPool *tpool_ptr = NULL;
if ( it==0 ) {
printp("Testing timmings (adding a single item to empty tpool):\n");
tpool_ptr = &tpool0;
} else if ( it==1 ) {
printp("Testing timmings (adding a single item):\n");
tpool_ptr = &tpool;
}
std::vector<ThreadPool::thread_id_t> ids(N_work);
double time_add = 0.0;
double time_wait = 0.0;
get_time(&start);
for (int n=0; n<N_it; n++) {
get_time(&start2);
for (int i=0; i<N_work; i++)
ids[i] = TPOOL_ADD_WORK( tpool_ptr, waste_cpu, ( data1[i] ), priority[i] );
get_time(&end2);
time_add += get_diff(start2,end2,f);
get_time(&start2);
tpool_ptr->wait_all(N_work,&ids[0]);
//tpool_ptr->wait_pool_finished();
get_time(&end2);
time_wait += get_diff(start2,end2,f);
if ( (n+1)%100 == 0 )
printp("Cycle %i of %i finished\n",n+1,N_it);
}
get_time(&end);
time = get_diff(start,end,f);
printp(" time = %0.0f ms\n",1e3*time);
printp(" time / cycle = %0.0f us\n",1e6*time/N_it);
printp(" average time / item = %0.0f ns\n",1e9*time/(N_it*N_work));
printp(" create and add = %0.0f ns\n",1e9*time_add/(N_it*N_work));
printp(" wait = %0.0f us\n",1e9*time_wait/(N_it*N_work));
}
// Test the timing pre-creating the work items and adding multiple at a time
barrier();
for (int it=0; it<2; it++) {
ThreadPool *tpool_ptr = NULL;
if ( it==0 ) {
printp("Testing timmings (adding a block of items to empty tpool):\n");
tpool_ptr = &tpool0;
} else if ( it==1 ) {
printp("Testing timmings (adding a block of items):\n");
tpool_ptr = &tpool;
}
double time_create_work = 0.0;
double time_add_work = 0.0;
double time_wait_work = 0.0;
std::vector<ThreadPool::WorkItem*> work(N_work);
get_time(&start);
for (int n=0; n<N_it; n++) {
get_time(&start2);
for (int i=0; i<N_work; i++)
work[i] = new WorkItemFull<void,int>( waste_cpu, data1[i] );
get_time(&end2);
time_create_work += get_diff(start2,end2,f);
get_time(&start2);
std::vector<ThreadPool::thread_id_t> ids = tpool_ptr->add_work( work, priority );
get_time(&end2);
time_add_work += get_diff(start2,end2,f);
get_time(&start2);
tpool_ptr->wait_all(ids);
get_time(&end2);
time_wait_work += get_diff(start2,end2,f);
if ( (n+1)%100 == 0 )
printp("Cycle %i of %i finished\n",n+1,N_it);
}
get_time(&end);
time = get_diff(start,end,f);
printp(" time = %0.0f ms\n",1e3*time);
printp(" time / cycle = %0.0f us\n",1e6*time/N_it);
printp(" average time / item = %0.0f ns\n",1e9*time/(N_it*N_work));
printp(" create = %0.0f ns\n",1e9*time_create_work/(N_it*N_work));
printp(" add = %0.0f ns\n",1e9*time_add_work/(N_it*N_work));
printp(" wait = %0.0f ns\n",1e9*time_wait_work/(N_it*N_work));
}
// Run a dependency test that tests a simple case that should keep the thread pool busy
// Note: Checking the results requires looking at the trace data
tpool.wait_pool_finished();
PROFILE_START("Dependency test");
for (int i=0; i<10; i++) {
char msg[3][100];
sprintf(msg[0],"Item %i-%i",i,0);
sprintf(msg[1],"Item %i-%i",i,1);
sprintf(msg[2],"Item %i-%i",i,2);
ThreadPool::WorkItem *work = new WorkItemFull<void,double,std::string>(sleep_msg,0.5,msg[0]);
ThreadPool::WorkItem *work1 = new WorkItemFull<void,double,std::string>(sleep_msg,0.1,msg[1]);
ThreadPool::WorkItem *work2 = new WorkItemFull<void,double,std::string>(sleep_msg,0.1,msg[2]);
ThreadPool::thread_id_t id = tpool.add_work(work);
work1->add_dependency(id);
work2->add_dependency(id);
tpool.add_work(work1);
tpool.add_work(work2);
}
tpool.wait_pool_finished();
PROFILE_STOP("Dependency test");
tpool.wait_pool_finished();
barrier();
ut.report();
int N_errors = static_cast<int>(ut.NumFailGlobal());
// Shudown MPI
PROFILE_SAVE("test_thread_pool");
pout << "Shutting down\n";
barrier();
#ifdef USE_MPI
MPI_Finalize( );
sleepMs(10);
#endif
return N_errors;
}

1915
threadpool/thread_pool.cpp Executable file

File diff suppressed because it is too large Load Diff

705
threadpool/thread_pool.h Executable file
View File

@ -0,0 +1,705 @@
// Copyright © 2004 Mark Berrill. All Rights Reserved. This work is distributed with permission,
// but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#ifndef included_ThreadPool
#define included_ThreadPool
#include <stdio.h>
#include <typeinfo>
#include <iostream>
#include <stdarg.h>
#include <string.h>
#include <vector>
#include <stdexcept>
#include <map>
#include "threadpool/atomic_helpers.h"
// Choose the OS
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
// Using windows
#define USE_WINDOWS
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#define NOMINMAX
// Disable warning: the inline specifier cannot be used when a friend
// declaration refers to a specialization of a function template
#pragma warning(disable:4396)
#elif defined(__APPLE__)
// Using MAC
// https://developer.apple.com/library/mac/#releasenotes/Performance/RN-AffinityAPI
// http://plugins.svn.wordpress.org/wp-xhprof-profiler/trunk/facebook-xhprof/extension/xhprof..c
#define USE_MAC
#include <unistd.h>
#include <mach/mach_init.h>
#include <mach/thread_policy.h>
#define cpu_set_t thread_affinity_policy_data_t
#define CPU_SET(cpu_id, new_mask) \
*new_mask.affinity_tag = (cpu_id + 1)
#define CPU_ZERO(new_mask) \
(*(new_mask)).affinity_tag = THREAD_AFFINITY_TAG_NULL
#define sched_setaffinity(pid, size, mask) \
thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, mask, \
THREAD_AFFINITY_POLICY_COUNT)
#define sched_getaffinity(pid, size, mask) \
thread_policy_get(mach_thread_self(), THREAD_AFFINITY_POLICY, mask, \
THREAD_AFFINITY_POLICY_COUNT)
/*
#define CPU_ZERO(new_mask) \
*new_mask.affinity_tag == THREAD_AFFINITY_TAG_NULL
#define SET_AFFINITY(pid, size, mask) \
thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, mask, THREAD_AFFINITY_POLICY_COUNT)
#define GET_AFFINITY(pid, size, mask) \
thread_policy_get(mach_thread_self(), THREAD_AFFINITY_POLICY, mask, THREAD_AFFINITY_POLICY_COUNT)
*/
#elif defined(__linux) || defined(__unix) || defined(__posix)
// Using Linux
#define USE_LINUX
#include <pthread.h>
#include <unistd.h>
#else
#error Unknown OS
#endif
// Set some definitions
#define MAX_NUM_THREADS 128 // The maximum number of threads (must be a multiple of 64)
#define MAX_QUEUED 1024 // The maximum number of items in the work queue at any moment
#define MAX_WAIT 128 // The maximum number of active waits at any given time
/** \class Mutex
* \brief Functions for locking/unlocking a mutex
* \details This class provides basic routines for creating,
* locking, and unlocking a mutex <BR>
* The lock may be recursive, meaning that the same thread
* may lock and unlock the lock multiple times before releasing it.
* In this case unlock must be called the same number of times before
* another thread may lock the mutex.
*/
class Mutex {
public:
//! Empty constructor (equivilent to Mutex(false) )
Mutex();
/** Default constructor
* \param recursive If set to true a thread may repeated lock a mutex.
* If set to false an attept to repeatedly lock will throw an error.*/
Mutex(bool recursive);
//! Destructor
~Mutex();
//! Copy constructor
Mutex(const Mutex &);
//! Assignment operator
Mutex& operator=(const Mutex&);
//! Lock the mutex
void lock() const;
//! Unlock the mutex
void unlock() const;
//! Try to lock the mutex and return true if successful
bool tryLock() const;
//! Return true if we already own the lock
bool ownLock() const;
private:
bool d_recursive; // Is the lock recursive (this attribute cannot be changed)
volatile int* d_count; // Number of copies of the mutex
volatile int* d_lock_count; // Number of times a thread has locked the mutex
volatile size_t* d_thread; // Pointer to the thread id that owns the lock
#ifdef USE_WINDOWS
CRITICAL_SECTION *d_lock;
#elif defined(USE_LINUX) || defined(USE_MAC)
pthread_mutex_t *d_lock;
#else
#error Unknown OS
#endif
friend class ThreadPool;
};
/** \class ThreadPool
*
* \brief This is a concrete class that provides for a basic thread pool.
* \details This class implements a basic thread pool that can be used for a wide variety of applications.
* An example call usage is provided below. The ability to return a value is provided. Note that there
* is a small overhead to using this functionality. <BR>
* <pre>Example: <BR>
* Existing function call:
* double x = myfun_1(a,b);
* double y = myfun_2(c,d); <BR>
* Threaded call (processing in parallel):
* thread_id_t ids[2];
* ids[0] = TPOOL_ADD_WORK( tpool, myfun_1, (a,b) );
* ids[1] = TPOOL_ADD_WORK( tpool, myfun_2, (c,d) );
* int error = wait_all(2,ids);
* double x = getFunctionRet(ids[0]);
* double y = getFunctionRet(ids[1]); <BR>
* </pre>
*/
class ThreadPool {
public:
//! Convience typedef
typedef unsigned long long int uint64;
//! Function to get a unique id for the current thread
static inline size_t getThreadId();
public:
///// Member classes
/** \class thread_id_t
*
* \brief This a class to hold the work item id
* \details This class hold the id of the work item that is being processed by the thread pool.
* It is created when a work item is added to the thread pool and is used by various routines within the thread pool.
*/
class thread_id_t {
public:
//! Empty constructor
inline thread_id_t( );
//! Destructor
inline ~thread_id_t( );
//! Copy constructors
inline thread_id_t( const thread_id_t& rhs );
inline thread_id_t& operator=( const thread_id_t& rhs ) volatile;
#ifndef USE_WINDOWS
inline thread_id_t( const volatile thread_id_t& rhs );
inline thread_id_t& operator=( const thread_id_t& rhs );
inline thread_id_t& operator=( const volatile thread_id_t& rhs );
inline thread_id_t& operator=( const volatile thread_id_t& rhs ) volatile;
#endif
// Overload key operators
inline bool operator==(const thread_id_t& rhs ) const { return d_id==rhs.d_id; }
inline bool operator!=(const thread_id_t& rhs ) const { return d_id!=rhs.d_id; }
inline bool operator>=(const thread_id_t& rhs ) const { return d_id>=rhs.d_id; }
inline bool operator<=(const thread_id_t& rhs ) const { return d_id<=rhs.d_id; }
inline bool operator> (const thread_id_t& rhs ) const { return d_id>rhs.d_id; }
inline bool operator< (const thread_id_t& rhs ) const { return d_id<rhs.d_id; }
inline bool operator==(const volatile thread_id_t& rhs ) const volatile { return d_id==rhs.d_id; }
inline bool operator!=(const volatile thread_id_t& rhs ) const volatile { return d_id!=rhs.d_id; }
inline bool operator>=(const volatile thread_id_t& rhs ) const volatile { return d_id>=rhs.d_id; }
inline bool operator<=(const volatile thread_id_t& rhs ) const volatile { return d_id<=rhs.d_id; }
inline bool operator> (const volatile thread_id_t& rhs ) const volatile { return d_id>rhs.d_id; }
inline bool operator< (const volatile thread_id_t& rhs ) const volatile { return d_id<rhs.d_id; }
//! Reset the id back to a NULL id
inline void reset() volatile;
inline void reset();
//! Check if the work has finished
inline bool finished( ) const;
private:
// Default constructor
inline void reset( int priority, size_t local_id, void* work );
// Get the local id
inline size_t getLocalID() const;
// Get the priority
inline int getPriority() const;
// Check if the id is initialized
inline bool initialized() const volatile { return d_id!=0x0FFFFFFFFFFFFFFF; }
// Friends
friend class ThreadPool;
template<typename T> friend void std::swap(T&, T&);
// Data
uint64 d_id; // 64-bit data to store id
AtomicOperations::int32_atomic* volatile d_count; // Reference count
void* d_work; // Pointer to the work item
};
//! Base class for the work item (users should derive from WorkItemRet)
class WorkItem {
public:
//! Function to run the routine
virtual void run()=0;
//! Will the routine return a result
bool has_result() const { return d_has_result; }
//! Empty deconstructor
virtual ~WorkItem() { delete [] d_ids; d_ids=NULL; d_N_ids=0; d_size=0; }
//! Get the number of work ids that this work item depends on
inline size_t get_N_dependencies() const { return d_N_ids; }
//! Return the list of work ids that we depend on
std::vector<ThreadPool::thread_id_t> get_dependencies() const;
/*!
* \brief Add a work item to the list of dependencies
* \param id Id of the work item to add
*/
void add_dependency( const ThreadPool::thread_id_t& id ) { add_dependencies(1,&id); }
/*!
* \brief Add a list of work item to the list of dependencies
* \param ids Ids of the work item to add
*/
inline void add_dependencies( const std::vector<ThreadPool::thread_id_t>& ids ) {
if ( !ids.empty() ) { add_dependencies(ids.size(),&ids[0]); }
}
/*!
* \brief Add a list of work item to the list of dependencies
* \param N Number of items to add
* \param ids Ids of the work item to add
*/
void add_dependencies( size_t N, const ThreadPool::thread_id_t* ids);
protected:
friend class ThreadPool;
inline WorkItem(): d_has_result(false), d_state(0), d_tpool_index(-1), d_N_ids(0), d_size(0), d_ids(NULL) {}
bool d_has_result; // Derived classes must set the result flag (true: has a result)
volatile char d_state; // Derived classes must set the state (0: not scheduled, -1: scheduled, 1: started, 2: finished)
short int d_tpool_index; // Index of the item in the thread pool (-1: not added)
private:
WorkItem(const WorkItem&); // Private copy constructor
WorkItem& operator=(const WorkItem&); // Private assignment operator
short unsigned int d_N_ids; // Number of dependencies
short unsigned int d_size; // Size of d_ids
thread_id_t* d_ids; // Pointer to id list
};
/*!
* \brief Class to define a work item returning a variable
* \details This is the class that defines a work item to be processed. Users may derive their own
* class and add work using the add_work routine, or can use the TPOOL_ADD_WORK macro.
* Note: this class is templated on the return argument type and may be a void type.
*/
template <typename return_type>
class WorkItemRet: public ThreadPool::WorkItem {
public:
//! Run the work item
virtual void run()=0;
//! Return the results
return_type get_results() const { return d_result; }
//! Virtual destructor
virtual ~WorkItemRet() {}
protected:
return_type d_result;
inline WorkItemRet(): WorkItem() { d_has_result = true; }
private:
WorkItemRet(const WorkItemRet&); // Private copy constructor
WorkItemRet& operator=(const WorkItemRet&); // Private assignment operator
};
public:
///// Member functions
//! Empty constructor
ThreadPool()
{
// Note: we need the constructor in the header to ensure that check_startup
// is able to check for changes in the byte alignment
check_startup(sizeof(ThreadPool));
initialize(0,"none",0,NULL);
if ( !is_valid(this) )
throw std::logic_error("Thread pool is not valid");
}
/*!
* Constructor that initialize the thread pool with N threads
* @param N The desired number of worker threads
* @param affinity The affinity scheduler to use:
* none - Let the OS handle the affinities (default)
* independent - Give each thread an independent set of processors
* @param procs The processors to use (defaults to the process affinitiy list)
*/
ThreadPool( const int N, const std::string& affinity="none", const std::vector<int>& procs=std::vector<int>() )
{
// Note: we need the constructor in the header to ensure that check_startup
// is able to check for changes in the byte alignment
check_startup(sizeof(ThreadPool));
const int* procs2 = procs.empty() ? NULL:(&procs[0]);
initialize(N,affinity.c_str(),procs.size(),procs2);
if ( !is_valid(this) )
throw std::logic_error("Thread pool is not valid");
}
//! Destructor
~ThreadPool();
//! Function to return the number of processors availible
static int getNumberOfProcessors();
//! Function to return the processor number that the current thread is running on
static int getCurrentProcessor();
//! Function to return the affinity of the current process
static std::vector<int> getProcessAffinity();
//! Function to set the affinity of the current process
static void setProcessAffinity( std::vector<int> procs );
//! Function to return the affinity of the current thread
static std::vector<int> getThreadAffinity();
/*!
* Function to return the affinity of the given thread
* @param thread The index of the thread
*/
std::vector<int> getThreadAffinity( int thread ) const;
/*!
* Function to set the affinity of the current thread
* @param procs The processors to use
*/
static void setThreadAffinity( std::vector<int> procs );
/*!
* Set the given thread to have the specified affinity
* @param thread The index of the thread
* @param procs The processors to use
*/
void setThreadAffinity( int thread, std::vector<int> procs ) const;
//! Function to return the number of threads in the thread pool
int getNumThreads() const { return d_N_threads; }
/*!
* \brief Function to set the number of threads in the thread pool
* \details This function will change the number of worker threads in the ThreadPool
* to the number specified. This function will immediately change the number of threads
* in the ThreadPool without checking the existing work unless the desired number of
* threads is 0. In this case, the function will wait for all work items to finish
* before deleting the existing work threads.
* Member threads may not call this function.
* @param N The desired number of worker threads
* @param affinity The affinity scheduler to use:
* none - Let the OS handle the affinities (default)
* independent - Give each thread an independent set of processors
* @param procs The processors to use (defaults to the process affinitiy list)
*/
inline void setNumThreads( const int N, const std::string& affinity="none",
const std::vector<int>& procs=std::vector<int>() )
{
const int* procs2 = procs.empty() ? NULL:(&procs[0]);
setNumThreads(N,affinity.c_str(),procs.size(),procs2);
}
/*!
* \brief Function to return the current thread number
* \details This function will return the thread number of current active thread.
* If the thread is not a member of the thread pool, this function will return 0.
*/
int getThreadNumber() const;
//! Function to check if the work item is valid
/*!
* This function checks if the work item has a valid id.
* Note: this function does not require blocking and will return immediately.
* @param id The id of the work item
*/
inline bool isValid(const thread_id_t& id) const;
/*!
* \brief Function to check if the work item has finished processing
* \details This function checks if the work item has finished processing.
* @param id The id of the work item
*/
bool isFinished(thread_id_t id) const;
/*!
* \brief Function to get the returned function value
* \details This is the function returns the value that was returned from the working function.
* If the work item has not finished or was not found it will return 0.
* @param id The id of the work item
*/
template <class return_type>
inline return_type getFunctionRet(const thread_id_t& id) const;
/*!
* \brief Function to add a work item
* \details This function adds a work item to the queue
* Note: any thread may call this routine.
* @param work Pointer to the work item to add
* Note that the threadpool will automatically destroy the item when finished
* @param priority A value indicating the priority of the work item (0-default)
*/
inline thread_id_t add_work( ThreadPool::WorkItem* work, int priority=0);
/*!
* \brief Function to add multiple work items
* \details This function adds multiple work item to the queue
* Note: any thread may call this routine.
* @param work Vector of pointers to the work items to add
* Note that the threadpool will automatically destroy the item when finished
* @param priority Vector of values indicating the priority of the work items
*/
inline std::vector<thread_id_t> add_work( const std::vector<ThreadPool::WorkItem*>& work,
const std::vector<int>& priority=std::vector<int>() );
/*!
* \brief Function to wait until a specific work item has finished
* \details This is the function waits for a specific work item to finished. It returns 0 if successful.
* Note: any thread may call this routine, but they will block until finished.
* For worker threads this may eventually lead to a deadlock.
* @param id The work item to wait for
*/
inline int wait(thread_id_t id) const;
/*!
* \brief Function to wait until any of the given work items have finished their work
* \details This is the function waits for any of the given work items to finish.
* If successful it returns the index of a finished work item (the index in the array ids).
* If unseccessful it will return -1.
* Note: any thread may call this routine, but they will block until finished.
* For worker threads this may eventually lead to a deadlock.
* @param N_work The number of work items
* @param ids Array of work items to wait for
*/
inline int wait_any(size_t N_work, const thread_id_t *ids);
/*!
* \brief Function to wait until any of the given work items have finished their work
* \details This is the function waits for any of the given work items to finish.
* If successful it returns the index of a finished work item (the index in the array ids).
* If unseccessful it will return -1.
* Note: any thread may call this routine, but they will block until finished.
* For worker threads this may eventually lead to a deadlock.
* @param ids Vector of work items to wait for
*/
inline int wait_any(const std::vector<thread_id_t>& ids) const;
/*!
* \brief Function to wait until all of the given work items have finished their work
* \details This is the function waits for all given of the work items to finish. It returns 0 if successful.
* Note: any thread may call this routine, but they will block until finished.
* For worker threads this may eventually lead to a deadlock.
* @param N_work The number of work items
* @param ids Array of work items to wait for
*/
inline int wait_all(size_t N_work, const thread_id_t *ids) const;
/*!
* \brief Function to wait until all of the given work items have finished their work
* \details This is the function waits for all given of the work items to finish. It returns 0 if successful.
* Note: any thread may call this routine, but they will block until finished.
* For worker threads this may eventually lead to a deadlock.
* @param ids Vector of work items to wait for
*/
inline int wait_all(const std::vector<thread_id_t>& ids) const;
/*!
* \brief Function to wait until all work items in the thread pool have finished their work
* \details This function will wait until all work has finished.
* Note: member threads may not call this function.
* Only one non-member thread should call this routine at a time.
*/
void wait_pool_finished() const;
/*!
* \brief Function to check if the thread pool is valid
* \details Sometimes it is necessary to work with raw pointers for the thread pool.
* If the thread pool is invalid and used, the program will likely fail catastrophically.
* This function checks if the thread pool is valid is a relatively safe manner.
* If the thread pool is pointing to an invalid memory address, because it has been
* freed, never allocated, or otherwise corrupted, this function will return false.
* @param tpool Pointer to the ThreadPool to check
*/
static bool is_valid( const ThreadPool* tpool );
/*!
* \brief Function to enable/disable OS warning messages
* \details Some of the functions such as setting/getting the thread affinities
* are not supported on all platforms. This function controls the behavior
* of these functions on systems where they are not supported. The default
* behavior is to print a warning message. Other options include ignoring
* the messages (the functions will return empty sets), or throwing an exception.
* Note: this is a global property and will affect all thread pools in an application.
* @param behavior The behavior of OS specific messages/errors
* 0: Print a warning message
* 1: Ignore the messages
* 2: Throw an error
*/
static void set_OS_warnings( int behavior=0 );
private:
friend class ThreadPoolData;
// Convience typedefs
#ifdef USE_WINDOWS
typedef HANDLE wait_type;
#elif defined(USE_LINUX) || defined(USE_MAC)
typedef pthread_cond_t* wait_type;
#else
#error Unknown OS
#endif
private:
///// Member data structures
// Structure to store properties for each work item (linked list)
struct queue_list_struct {
short int position; // Position of the work item in the list
short int prev; // Next item in the list
short int next; // Next item in the list
queue_list_struct(): position(-1), prev(-1), next(-1) {}
inline void reset() volatile { prev=-1; next=-1; }
inline void reset() { prev=-1; next=-1; }
private:
queue_list_struct( const queue_list_struct& );
queue_list_struct& operator=( const queue_list_struct& );
};
// Structure to store a pool of wait events (thread safe)
struct wait_pool_struct {
wait_pool_struct( );
~wait_pool_struct( );
void push( wait_type event );
wait_type pop();
private:
volatile unsigned int d_count;
volatile unsigned int d_size;
volatile wait_type *d_pool;
#ifdef USE_WINDOWS
CRITICAL_SECTION *d_lock;
#elif defined(USE_LINUX) || defined(USE_MAC)
pthread_mutex_t *d_lock;
#else
#error Unknown OS
#endif
wait_pool_struct& operator=( const wait_pool_struct& );
wait_pool_struct( const wait_pool_struct& );
};
// Structure to store wait events (note: both the constructor and destructor are NOT thread safe and must be blocked)
struct wait_event_struct {
int count; // The number of work items that must finish before we alert the thread
size_t ThreadId; // Id of the waiting thread
std::vector<thread_id_t> ids; // The ids we are waiting on
wait_type wait_event; // Handle to a wait event
wait_event_struct( wait_pool_struct* wait_pool );
~wait_event_struct( );
private:
wait_pool_struct* d_wait_pool;
wait_event_struct( );
wait_event_struct( const wait_event_struct& );
};
private:
///// Member functions
// Copy constructors ( we do not want the user to be able to copy the thread pool)
ThreadPool(const ThreadPool&);
ThreadPool& operator=(const ThreadPool&);
// Function to initialize the thread pool
void setNumThreads( int N, const char* affinity, int N_procs, const int* procs );
void initialize(int N, const char* affinity, int N_procs, const int* procs);
void check_startup(size_t size0);
// Function to add an array of work items
void add_work(size_t N, ThreadPool::WorkItem* work[], const int* priority, ThreadPool::thread_id_t* id);
// Function to get a work item that has finished
WorkItem* getFinishedWorkItem(ThreadPool::thread_id_t id) const;
// This function provides a wrapper (needed for the threads)
static void create_new_thread(void *arglist) {
void **tmp = (void **) arglist;
ThreadPool *call = reinterpret_cast<ThreadPool*>(tmp[0]);
int id = static_cast<int>(reinterpret_cast<size_t>(tmp[1]));
call->tpool_thread(id);
}
/* This is the function that controls the individual thread and allows it to do work.
* Note: this version uses a last in - first out work scheduling.
* param thread_init - Structure address contining the startup information for the thread */
void tpool_thread( int id );
// Some functions/variables used to get/test the unique work ids
inline void initialize_id(); // A simple function to initialize the id (should only be called once)
inline size_t advance_id(); // A simple function to advance the return the id and advance (thread-safe)
// Function to check if the current thread is a member of the thread pool
inline bool isMemberThread() const;
// Function to wait for some work items to finish
int wait_some(size_t N_work, const thread_id_t *ids, size_t N_wait, bool *finished) const;
// Helper functions to get the next availible item in the work queue
inline short int get_work_item( );
static inline short int check_dependecies( const ThreadPool::queue_list_struct *list,
const thread_id_t *ids, short int index );
private:
///// Member data
// Note: We want to store the variables in a certain order to optimize storage
// and ensure consistent packing / object size
size_t d_NULL_HEAD; // Null data buffer to check memory bounds
volatile AtomicOperations::int64_atomic d_id_assign; // An internal variable used to store the current id to assign
volatile mutable bool d_signal_empty; // Do we want to send a signal when the queue is empty
volatile mutable unsigned char d_signal_count; // Do we want to send a signal when the count drops to zero
short int d_N_threads; // Number of threads
volatile short int d_num_active; // Number of threads that are currently active
volatile short int d_queue_head; // Index to work queue head
volatile short int d_queue_free; // Index to free queue item
volatile int d_queue_size; // Number of items in the work queue
volatile mutable int d_N_wait; // The number of threads waiting
size_t d_ThreadId[MAX_NUM_THREADS]; // Unique id for each thread
volatile uint64 d_active[MAX_NUM_THREADS/64]; // Which threads are currently active
volatile uint64 d_cancel[MAX_NUM_THREADS/64]; // Which threads should be deleted
thread_id_t volatile d_queue_ids[MAX_QUEUED]; // List of ids in the work queue
queue_list_struct volatile d_queue_list[MAX_QUEUED]; // Work queue list
volatile mutable wait_event_struct* d_wait[MAX_WAIT]; // The wait events to check
wait_type d_wait_finished; // Handle to a wait event that indicates all threads have finished work
mutable wait_pool_struct wait_pool; // Pool of wait events that we can use
#ifdef USE_WINDOWS
CRITICAL_SECTION *d_lock_queue; // Mutex lock for changing the queue
HANDLE d_hThread[MAX_NUM_THREADS]; // Handles to the threads
#elif defined(USE_LINUX) || defined(USE_MAC)
pthread_mutex_t *d_lock_queue; // Mutex lock for changing the queue
pthread_t d_hThread[MAX_NUM_THREADS]; // Handles to the threads
wait_type d_queue_not_empty; // Event condition
#else
#error Unknown OS
#endif
size_t d_NULL_TAIL; // Null data buffer to check memory bounds
};
// Swap the contents of the two ids
namespace std {
template<> inline void swap<ThreadPool::thread_id_t>(
ThreadPool::thread_id_t& a, ThreadPool::thread_id_t& b )
{
std::swap(a.d_id,b.d_id);
std::swap(a.d_count,b.d_count);
std::swap(a.d_work,b.d_work);
}
}
#include "thread_pool.hpp"
#endif

1167
threadpool/thread_pool.hpp Normal file

File diff suppressed because it is too large Load Diff