Updating threadpool class

This commit is contained in:
Mark Berrill
2017-07-05 12:08:21 -04:00
parent c3d26ff712
commit a04317d766
14 changed files with 5020 additions and 4113 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,31 @@
#ifndef included_StackTrace #ifndef included_AtomicStackTrace
#define included_StackTrace #define included_AtomicStackTrace
#include <functional>
#include <iostream>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <iostream>
#include <vector> #include <vector>
#include <thread>
#include <memory>
#include <set>
// Check for and include MPI
// clang-format off
#if defined(USE_MPI) || defined(USE_EXT_MPI)
#include "mpi.h"
#elif defined(__has_include)
#if __has_include("mpi.h")
#include "mpi.h"
#else
typedef int MPI_Comm;
#endif
#else
typedef int MPI_Comm;
#endif
// clang-format on
namespace StackTrace { namespace StackTrace {
@@ -19,29 +38,179 @@ struct stack_info {
std::string filename; std::string filename;
int line; int line;
//! Default constructor //! Default constructor
stack_info(): address(NULL), address2(NULL), line(0) {} stack_info() : address( nullptr ), address2( nullptr ), line( 0 ) {}
//! Operator==
bool operator==( const stack_info& rhs ) const;
//! Operator!=
bool operator!=( const stack_info& rhs ) const;
//! Print the stack info //! Print the stack info
std::string print() const; std::string print() const;
//! Compute the number of bytes needed to store the object
size_t size() const;
//! Pack the data to a byte array, returning a pointer to the end of the data
char* pack( char* ptr ) const;
//! Unpack the data from a byte array, returning a pointer to the end of the data
const char* unpack( const char* ptr );
//! Pack a vector of data to a memory block
static std::vector<char> packArray( const std::vector<stack_info>& data );
//! Unpack a vector of data from a memory block
static std::vector<stack_info> unpackArray( const char* data );
}; };
//! Function to return the current call stack struct multi_stack_info {
int N;
stack_info stack;
std::vector<multi_stack_info> children;
//! Default constructor
multi_stack_info() : N( 0 ) {}
//! Add the given stack to the multistack
void add( size_t N, const stack_info *stack );
//! Print the stack info
std::vector<std::string> print( const std::string& prefix=std::string() ) const;
};
/*!
* @brief Get the current call stack
* @details This function returns the current call stack for the current thread
* @return Returns vector containing the stack
*/
std::vector<stack_info> getCallStack(); std::vector<stack_info> getCallStack();
/*!
* @brief Get the current call stack for a thread
* @details This function returns the current call stack for the given thread
* @param[in] id The thread id of the stack we want to return
* @return Returns vector containing the stack
*/
std::vector<stack_info> getCallStack( std::thread::native_handle_type id );
/*!
* @brief Get the current call stack for all threads
* @details This function returns the current call stack for all threads
* in the current process.
* Note: This functionality may not be availible on all platforms
* @return Returns vector containing the stack
*/
multi_stack_info getAllCallStacks( );
/*!
* @brief Get the current call stack for all threads/processes
* @details This function returns the current call stack for all threads
* for all processes in the current process. This function requires
* the user to call globalCallStackInitialize() before calling this
* routine, and globalCallStackFinalize() before exiting.
* Note: This functionality may not be availible on all platforms
* @return Returns vector containing the stack
*/
multi_stack_info getGlobalCallStacks( );
//! Function to return the current call stack for the current thread
std::vector<void *> backtrace();
//! Function to return the current call stack for the given thread
std::vector<void *> backtrace( std::thread::native_handle_type id );
//! Function to return the current call stack for all threads
std::vector<std::vector<void *>> backtraceAll();
//! Function to return the stack info for a given address //! Function to return the stack info for a given address
stack_info getStackInfo( void* address ); stack_info getStackInfo( void *address );
//! Function to return the stack info for a given address
std::vector<stack_info> getStackInfo( const std::vector<void *> &address );
//! Function to return the signal name
std::string signalName( int signal );
/*! /*!
* Return the symbols from the current executable (not availible for all platforms) * Return the symbols from the current executable (not availible for all platforms)
* @return Returns 0 if sucessful * @return Returns 0 if sucessful
*/ */
int getSymbols( std::vector<void*>& address, std::vector<char>& type, std::vector<std::string>& obj ); int getSymbols(
std::vector<void *> &address, std::vector<char> &type, std::vector<std::string> &obj );
/*!
* Return the name of the executable
* @return Returns the name of the executable (usually the full path)
*/
std::string getExecutable();
/*!
* Return the search path for the symbols
* @return Returns the search path for the symbols
*/
std::string getSymPaths();
//!< Terminate type
enum class terminateType { signal, exception };
/*!
* Set the error handlers
* @param[in] Function to terminate the program: abort(msg,type)
*/
void setErrorHandlers( std::function<void( std::string, terminateType )> abort );
/*!
* Set the given signals to the handler
* @param[in] Function to terminate the program: abort(msg,type)
*/
void setSignals( const std::vector<int>& signals, void (*handler) (int) );
//! Clear a signal set by setSignals
void clearSignal( int signal );
//! Clear all signals set by setSignals
void clearSignals( );
//! Return a list of all signals that can be caught
std::vector<int> allSignalsToCatch( );
//! Return a default list of signals to catch
std::vector<int> defaultSignalsToCatch( );
//! Get a list of the active threads
std::set<std::thread::native_handle_type> activeThreads( );
//! Get a handle to this thread
std::thread::native_handle_type thisThread( );
//! Initialize globalCallStack functionallity
void globalCallStackInitialize( MPI_Comm comm );
//! Clean up globalCallStack functionallity
void globalCallStackFinalize( );
/*!
* @brief Call system command
* @details This function calls a system command, waits for the program
* to execute, captures and returns the output and exit code.
* @param[in] cmd Command to execute
* @param[out] exit_code Exit code returned from child process
* @return Returns string containing the output
*/
std::string exec( const std::string& cmd, int& exit_code );
} // namespace StackTrace } // namespace StackTrace
#endif #endif

View File

@@ -36,12 +36,11 @@ public:
std::shared_ptr<double> cDistEven_, std::shared_ptr<double>cDistOdd_, int N_ ): std::shared_ptr<double> cDistEven_, std::shared_ptr<double>cDistOdd_, int N_ ):
filename(filename_), cDen(cDen_), cDistEven(cDistEven_), cDistOdd(cDistOdd_), N(N_) {} filename(filename_), cDen(cDen_), cDistEven(cDistEven_), cDistOdd(cDistOdd_), N(N_) {}
virtual void run() { virtual void run() {
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
PROFILE_START("Save Checkpoint",1); PROFILE_START("Save Checkpoint",1);
WriteCheckpoint(filename,cDen.get(),cDistEven.get(),cDistOdd.get(),N); WriteCheckpoint(filename,cDen.get(),cDistEven.get(),cDistOdd.get(),N);
PROFILE_STOP("Save Checkpoint",1); PROFILE_STOP("Save Checkpoint",1);
ThreadPool::WorkItem::d_state = 2; // Change state to finished
}; };
virtual bool has_result() const { return false; }
private: private:
WriteRestartWorkItem(); WriteRestartWorkItem();
const char* filename; const char* filename;
@@ -67,7 +66,6 @@ public:
} }
~BlobIdentificationWorkItem1() { MPI_Comm_free(&newcomm); } ~BlobIdentificationWorkItem1() { MPI_Comm_free(&newcomm); }
virtual void run() { virtual void run() {
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
// Compute the global blob id and compare to the previous version // Compute the global blob id and compare to the previous version
PROFILE_START("Identify blobs",1); PROFILE_START("Identify blobs",1);
double vF = 0.0; double vF = 0.0;
@@ -75,8 +73,8 @@ public:
IntArray& ids = new_index->second; IntArray& ids = new_index->second;
new_index->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,*phase,dist,vF,vS,ids,newcomm); new_index->first = ComputeGlobalBlobIDs(Nx-2,Ny-2,Nz-2,rank_info,*phase,dist,vF,vS,ids,newcomm);
PROFILE_STOP("Identify blobs",1); PROFILE_STOP("Identify blobs",1);
ThreadPool::WorkItem::d_state = 2; // Change state to finished
} }
virtual bool has_result() const { return false; }
private: private:
BlobIdentificationWorkItem1(); BlobIdentificationWorkItem1();
int timestep; int timestep;
@@ -101,7 +99,6 @@ public:
} }
~BlobIdentificationWorkItem2() { MPI_Comm_free(&newcomm); } ~BlobIdentificationWorkItem2() { MPI_Comm_free(&newcomm); }
virtual void run() { virtual void run() {
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
// Compute the global blob id and compare to the previous version // Compute the global blob id and compare to the previous version
PROFILE_START("Identify blobs maps",1); PROFILE_START("Identify blobs maps",1);
const IntArray& ids = new_index->second; const IntArray& ids = new_index->second;
@@ -123,8 +120,8 @@ public:
writeIDMap(map,timestep,id_map_filename); writeIDMap(map,timestep,id_map_filename);
} }
PROFILE_STOP("Identify blobs maps",1); PROFILE_STOP("Identify blobs maps",1);
ThreadPool::WorkItem::d_state = 2; // Change state to finished
} }
virtual bool has_result() const { return false; }
private: private:
BlobIdentificationWorkItem2(); BlobIdentificationWorkItem2();
int timestep; int timestep;
@@ -150,7 +147,6 @@ public:
} }
~WriteVisWorkItem() { MPI_Comm_free(&newcomm); } ~WriteVisWorkItem() { MPI_Comm_free(&newcomm); }
virtual void run() { virtual void run() {
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
PROFILE_START("Save Vis",1); PROFILE_START("Save Vis",1);
ASSERT(visData[0].vars[0]->name=="phase"); ASSERT(visData[0].vars[0]->name=="phase");
ASSERT(visData[0].vars[1]->name=="Pressure"); ASSERT(visData[0].vars[1]->name=="Pressure");
@@ -166,8 +162,8 @@ public:
fillData.copy(Averages.Label_NWP,BlobData); fillData.copy(Averages.Label_NWP,BlobData);
IO::writeData( timestep, visData, newcomm ); IO::writeData( timestep, visData, newcomm );
PROFILE_STOP("Save Vis",1); PROFILE_STOP("Save Vis",1);
ThreadPool::WorkItem::d_state = 2; // Change state to finished
}; };
virtual bool has_result() const { return false; }
private: private:
WriteVisWorkItem(); WriteVisWorkItem();
int timestep; int timestep;
@@ -189,7 +185,6 @@ public:
blob_ids(ids), id_list(id_list_), beta(beta_) { } blob_ids(ids), id_list(id_list_), beta(beta_) { }
~AnalysisWorkItem() { } ~AnalysisWorkItem() { }
virtual void run() { virtual void run() {
ThreadPool::WorkItem::d_state = 1; // Change state to in progress
Averages.NumberComponents_NWP = blob_ids->first; Averages.NumberComponents_NWP = blob_ids->first;
Averages.Label_NWP = blob_ids->second; Averages.Label_NWP = blob_ids->second;
Averages.Label_NWP_map = *id_list; Averages.Label_NWP_map = *id_list;
@@ -215,8 +210,8 @@ public:
Averages.PrintComponents(timestep); Averages.PrintComponents(timestep);
PROFILE_STOP("Compute dist",1); PROFILE_STOP("Compute dist",1);
} }
ThreadPool::WorkItem::d_state = 2; // Change state to finished
} }
virtual bool has_result() const { return false; }
private: private:
AnalysisWorkItem(); AnalysisWorkItem();
AnalysisType type; AnalysisType type;

View File

@@ -2,25 +2,27 @@
#include <stdexcept> #include <stdexcept>
#ifdef USE_PTHREAD_ATOMIC_LOCK #ifdef USE_PTHREAD_ATOMIC_LOCK
// Print a warning if we defaulted to use pthreads for atomic operations // Print a warning if we defaulted to use pthreads for atomic operations
// This can decrease the performance of atomic operations // This can decrease the performance of atomic operations
// We print the message here so it is only printed once // We print the message here so it is only printed once
#warning using pthreads for atomic operations, this may affect performance #warning using pthreads for atomic operations, this may affect performance
#endif #endif
namespace AtomicOperations { namespace AtomicOperations {
#ifdef USE_PTHREAD_ATOMIC_LOCK #ifdef USE_PTHREAD_ATOMIC_LOCK
pthread_mutex_t atomic_pthread_lock; pthread_mutex_t atomic_pthread_lock;
static pthread_mutexattr_t threadpool_global_attr; static pthread_mutexattr_t threadpool_global_attr;
static int create_atomic_pthread_lock( ) { static int create_atomic_pthread_lock()
pthread_mutexattr_init(&threadpool_global_attr); {
int error = pthread_mutex_init(&atomic_pthread_lock,&threadpool_global_attr); pthread_mutexattr_init( &threadpool_global_attr );
if ( error != 0 ) { throw std::logic_error("Error initializing mutex:"); } int error = pthread_mutex_init( &atomic_pthread_lock, &threadpool_global_attr );
return error; if ( error != 0 )
} throw std::logic_error( "Error initializing mutex:" );
int atomic_pthread_lock_initialized = create_atomic_pthread_lock(); return error;
}
int atomic_pthread_lock_initialized = create_atomic_pthread_lock();
#endif #endif
} // AtomicOperations namespace } // AtomicOperations namespace

View File

@@ -2,47 +2,49 @@
// but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#ifndef included_ThreadPoolAtomicHelpers #ifndef included_ThreadPoolAtomicHelpers
#define included_ThreadPoolAtomicHelpers #define included_ThreadPoolAtomicHelpers
#include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <typeinfo> #include <typeinfo>
#include <stdint.h> #include <stdexcept>
// Choose the OS // Choose the OS
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) #if defined( WIN32 ) || defined( _WIN32 ) || defined( WIN64 ) || defined( _WIN64 )
// Using windows // Using windows
#define USE_WINDOWS #define USE_WINDOWS
#define NOMINMAX #define NOMINMAX
#include <stdlib.h> #include <process.h>
#include <windows.h> #include <stdlib.h>
#include <process.h> #include <windows.h>
#elif defined(__APPLE__) #elif defined( __APPLE__ )
// Using MAC // Using MAC
#define USE_MAC #define USE_MAC
#include <libkern/OSAtomic.h> #include <libkern/OSAtomic.h>
#elif defined(__linux) || defined(__unix) || defined(__posix) #elif defined( __linux ) || defined( __unix ) || defined( __posix )
// Using Linux // Using Linux
#define USE_LINUX #define USE_LINUX
#include <unistd.h> #include <unistd.h>
#if !defined(__GNUC__) #if !defined( __GNUC__ )
#define USE_PTHREAD_ATOMIC_LOCK #define USE_PTHREAD_ATOMIC_LOCK
#include "pthread.h" #include "pthread.h"
#endif
#else
#error Unknown OS
#endif #endif
#else
#error Unknown OS
#endif
/** \namespace atomic /** \namespace atomic
* \brief Functions for atomic operations * \brief Functions for atomic operations
* \details This class provides wrapper routines to access simple atomic operations. * \details This class provides wrapper routines to access simple atomic operations.
* Since atomic operations are system dependent, these functions are necessary * Since atomic operations are system dependent, these functions are necessary
* to provide a platform independent interface. We also provide some typedef * to provide a platform independent interface. We also provide some typedef
* variables to wrap OS dependencies. Currently we have 32 and 64 bit integers: * variables to wrap OS dependencies. Currently we have 32 and 64 bit integers:
* int32_atomic and int64_atomic. In all cases the operations use the barrier * int32_atomic and int64_atomic. In all cases the operations use the barrier
* versions provided by the compiler/OS if availible. In most cases, these builtins * versions provided by the compiler/OS if availible. In most cases, these builtins
* are considered a full barrier. That is, no memory operand will be moved across * are considered a full barrier. That is, no memory operand will be moved across
* the operation, either forward or backward. Further, instructions will be issued * the operation, either forward or backward. Further, instructions will be issued
* as necessary to prevent the processor from speculating loads across the operation * as necessary to prevent the processor from speculating loads across the operation
* and from queuing stores after the operation. * and from queuing stores after the operation.
* Note: for all functions the variable being modified must be volatile to prevent * Note: for all functions the variable being modified must be volatile to prevent
* compiler optimization that may cache the value. * compiler optimization that may cache the value.
@@ -51,280 +53,456 @@ namespace AtomicOperations {
// Define int32_atomic, int64_atomic // Define int32_atomic, int64_atomic
#include <stdint.h> #if defined( USE_WINDOWS )
#if defined(USE_WINDOWS) typedef long int32_atomic;
typedef long int32_atomic; typedef __int64 int64_atomic;
typedef __int64 int64_atomic; #define NO_INST_ATTR_ATOMIC
#define NO_INST_ATTR #elif defined( USE_MAC )
#elif defined(USE_MAC) typedef int32_t int32_atomic;
typedef int32_t int32_atomic; typedef int64_t int64_atomic;
typedef int64_t int64_atomic; #define NO_INST_ATTR_ATOMIC
#define NO_INST_ATTR #elif defined( __GNUC__ )
#elif defined(__GNUC__) typedef int int32_atomic;
typedef int int32_atomic; typedef long int int64_atomic;
typedef long int int64_atomic; #define NO_INST_ATTR_ATOMIC __attribute__( ( no_instrument_function ) )
#define NO_INST_ATTR __attribute__((no_instrument_function)) #elif defined( USE_PTHREAD_ATOMIC_LOCK )
#elif defined(USE_PTHREAD_ATOMIC_LOCK) typedef int int32_atomic;
typedef int int32_atomic; typedef long int int64_atomic;
typedef long int int64_atomic; #define NO_INST_ATTR_ATOMIC
#define NO_INST_ATTR
#else #else
#error Unknown OS #error Unknown OS
#endif #endif
/** /**
* \brief Increment returning the new value * \brief Get the value
* \details Increment x and return the new value * \details Read the data in x
* \param[in] x The pointer to the value to increment * \param[in] x The pointer to the value to get
*/ */
inline int32_atomic atomic_increment( int32_atomic volatile *x ) NO_INST_ATTR; inline int32_atomic atomic_get( const int32_atomic volatile *x );
/**
* \brief Get the value
* \details Read the data in x
* \param[in] x The pointer to the value to get
*/
inline int64_atomic atomic_get( const int64_atomic volatile *x );
/**
* \brief Set the value
* \details Set the data in x to y (*x=y)
* \param[in] x The pointer to the value to set
* \param[in] y The value to set
*/
inline void atomic_set( int32_atomic volatile *x, int32_atomic y );
/**
* \brief Set the value
* \details Set the data in x to y (*x=y)
* \param[in] x The pointer to the value to set
* \param[in] y The value to set
*/
inline void atomic_set( int64_atomic volatile *x, int64_atomic y );
/** /**
* \brief Increment returning the new value * \brief Increment returning the new value
* \details Increment x and return the new value * \details Increment x and return the new value
* \param[in] x The pointer to the value to increment * \param[in] x The pointer to the value to increment
*/ */
inline int64_atomic atomic_increment( int64_atomic volatile *x ) NO_INST_ATTR; inline int32_atomic atomic_increment( int32_atomic volatile *x ) NO_INST_ATTR_ATOMIC;
/**
* \brief Increment returning the new value
* \details Increment x and return the new value
* \param[in] x The pointer to the value to increment
*/
inline int64_atomic atomic_increment( int64_atomic volatile *x ) NO_INST_ATTR_ATOMIC;
/** /**
* \brief Decrement returning the new value * \brief Decrement returning the new value
* \details Decrement x and return the new value * \details Decrement x and return the new value
* \param[in] x The pointer to the value to decrement * \param[in] x The pointer to the value to decrement
*/ */
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) NO_INST_ATTR; inline int32_atomic atomic_decrement( int32_atomic volatile *x ) NO_INST_ATTR_ATOMIC;
/** /**
* \brief Decrement returning the new value * \brief Decrement returning the new value
* \details Decrement x and return the new value * \details Decrement x and return the new value
* \param[in] x The pointer to the value to decrement * \param[in] x The pointer to the value to decrement
*/ */
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) NO_INST_ATTR; inline int64_atomic atomic_decrement( int64_atomic volatile *x ) NO_INST_ATTR_ATOMIC;
/** /**
* \brief Add returning the new value * \brief Add returning the new value
* \details Add y to x and return the new value * \details Add y to x and return the new value
* \param[in] x The pointer to the value to add to * \param[in] x The pointer to the value to add to
* \param[in] y The value to add * \param[in] y The value to add
*/ */
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) NO_INST_ATTR; inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) NO_INST_ATTR_ATOMIC;
/** /**
* \brief Add returning the new value * \brief Add returning the new value
* \details Add y to x and return the new value * \details Add y to x and return the new value
* \param[in] x The pointer to the value to add to * \param[in] x The pointer to the value to add to
* \param[in] y The value to add * \param[in] y The value to add
*/ */
inline int64_atomic atomic_add( int64_atomic volatile *x, int32_atomic y ) NO_INST_ATTR; inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) NO_INST_ATTR_ATOMIC;
/** /**
* \brief Compare the given value and swap * \brief Compare the given value and swap
* \details Compare the existing value and swap if it matches. * \details Compare the existing value and swap if it matches.
* This function returns the previous value. * \return Returns true if the swap was performed
* To return a bool indicating if the swap was performed,
* use "bool t = atomic_compare_and_swap(v,x,y)==x".
* \param[in] v The pointer to the value to check and swap * \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare * \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x * \param[in] y The value to swap iff *v==x
*/ */
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ); inline bool atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y );
/** /**
* \brief Compare the given value and swap * \brief Compare the given value and swap
* \details Compare the existing value and swap if it matches. * \details Compare the existing value and swap if it matches.
* This function returns the previous value. * \return Returns true if the swap was performed
* To return a bool indicating if the swap was performed,
* use "bool t = atomic_compare_and_swap(v,x,y)==x".
* \param[in] v The pointer to the value to check and swap * \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare * \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x * \param[in] y The value to swap iff *v==x
*/ */
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ); inline bool atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y );
/** /**
* \brief Compare the given value and swap * \brief Compare the given value and swap
* \details Compare the existing value and swap if it matches. * \details Compare the existing value and swap if it matches.
* This function returns the previous value. * \return Returns true if the swap was performed
* To return a bool indicating if the swap was performed,
* use "bool t = atomic_compare_and_swap(v,x,y)==x".
* \param[in] v The pointer to the value to check and swap * \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare * \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x * \param[in] y The value to swap iff *v==x
*/ */
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ); inline bool atomic_compare_and_swap( void *volatile *v, void *x, void *y );
/**
* \brief Fetch the current value and "and" with given value
* \details Perform *v = (*v) & x, returning the previous value
* \return Returns the previous value before the "and" operation
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline int32_atomic atomic_fetch_and_and( int32_atomic volatile *v, int32_atomic x );
/**
* \brief Fetch the current value and "and" with given value
* \details Perform *v = (*v) & x, returning the previous value
* \return Returns the previous value before the "and" operation
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline int64_atomic atomic_fetch_and_and( int64_atomic volatile *v, int64_atomic x );
/**
* \brief Fetch the current value and "or" with given value
* \details Perform *v = (*v) | x, returning the previous value
* \return Returns the previous value before the "and" operation
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline int32_atomic atomic_fetch_and_or( int32_atomic volatile *v, int32_atomic x );
/**
* \brief Fetch the current value and "ou" with given value
* \details Perform *v = (*v) | x, returning the previous value
* \return Returns the previous value before the "and" operation
* \param[in] v The pointer to the value to check and swap
* \param[in] x The value to compare
* \param[in] y The value to swap iff *v==x
*/
inline int64_atomic atomic_fetch_and_or( int64_atomic volatile *v, int64_atomic x );
/**
* \brief Class to store a pool of objects
* \details This class stores a pool of objects that can be added/removed in a thread-safe way
*/
template<class TYPE,int N_MAX>
class pool
{
public:
pool( )
{
d_data = new volatile TYPE*[N_MAX];
for (int i=0; i<N_MAX; i++)
d_data[i] = new TYPE;
}
~pool( )
{
for (int i=0; i<N_MAX; i++)
if ( d_data[i] != nullptr )
delete d_data[i];
delete [] d_data;
}
inline TYPE* get()
{
int i=0;
while ( true ) {
TYPE* tmp = const_cast<TYPE*>( d_data[i] );
bool swapped = atomic_compare_and_swap( (void* volatile*) &d_data[i], tmp, nullptr );
if ( swapped && ( tmp != nullptr ) )
return tmp;
i = (i+1)%N_MAX;
}
}
inline void put( TYPE* ptr )
{
int i = 0;
while ( !atomic_compare_and_swap( (void* volatile*) &d_data[i], nullptr, ptr ) )
i = (i+1)%N_MAX;
}
private:
volatile TYPE **d_data;
pool( const pool &rhs );
pool &operator=( const pool &rhs );
};
// Define increment/decrement/add operators for int32, int64 // Define increment/decrement/add operators for int32, int64
#if defined(USE_WINDOWS) #if defined( USE_WINDOWS )
inline int32_atomic atomic_increment( int32_atomic volatile *x ) { inline int32_atomic atomic_increment( int32_atomic volatile *x )
return InterlockedIncrement(x); {
} return InterlockedIncrement( x );
inline int64_atomic atomic_increment( int64_atomic volatile *x ) { }
return InterlockedIncrement64(x); inline int64_atomic atomic_increment( int64_atomic volatile *x )
} {
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) { return InterlockedIncrement64( x );
return InterlockedDecrement(x); }
} inline int32_atomic atomic_decrement( int32_atomic volatile *x )
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) { {
return InterlockedDecrement64(x); return InterlockedDecrement( x );
} }
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) { inline int64_atomic atomic_decrement( int64_atomic volatile *x )
return InterlockedExchangeAdd(x,y)+y; {
} return InterlockedDecrement64( x );
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) { }
return InterlockedExchangeAdd64(x,y)+y; inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y )
} {
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) { return InterlockedExchangeAdd( x, y ) + y;
return InterlockedCompareExchange(v,x,y); }
} inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y )
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) { {
return InterlockedCompareExchange64(v,x,y); return InterlockedExchangeAdd64( x, y ) + y;
} }
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) { inline bool atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y )
return InterlockedCompareExchangePointer(v,x,y); {
} return InterlockedCompareExchange( v, y, x ) == x;
#elif defined(USE_MAC) }
inline int32_atomic atomic_increment( int32_atomic volatile *x ) { inline bool atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y )
return OSAtomicIncrement32Barrier(x); {
} return InterlockedCompareExchange64( v, y, x ) == x;
inline int64_atomic atomic_increment( int64_atomic volatile *x ) { }
return OSAtomicIncrement64Barrier(x); inline bool atomic_compare_and_swap( void *volatile *v, void *x, void *y )
} {
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) { return InterlockedCompareExchangePointer( v, x, y ) == x;
return OSAtomicDecrement32Barrier(x); }
} #elif defined( USE_MAC )
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) { inline int32_atomic atomic_increment( int32_atomic volatile *x )
return OSAtomicDecrement64Barrier(x); {
} return OSAtomicIncrement32Barrier( x );
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) { }
return OSAtomicAdd32Barrier(y,x); inline int64_atomic atomic_increment( int64_atomic volatile *x )
} {
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) { return OSAtomicIncrement64Barrier( x );
return OSAtomicAdd64Barrier(y,x); }
} inline int32_atomic atomic_decrement( int32_atomic volatile *x )
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) { {
return OSAtomicCompareAndSwap32Barrier(x,y,v) ? y:x; return OSAtomicDecrement32Barrier( x );
} }
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) { inline int64_atomic atomic_decrement( int64_atomic volatile *x )
return OSAtomicCompareAndSwap64Barrier(x,y,v) ? y:x; {
} return OSAtomicDecrement64Barrier( x );
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) { }
return OSAtomicCompareAndSwapPtrBarrier(x,y,v) ? y:x; int32_atomic atomic_fetch_and_or( int32_atomic volatile *v, int32_atomic x ) { return OSAtomicOr32Orig( x, (volatile uint32_t *) v ); }
} int32_atomic atomic_fetch_and_and( int32_atomic volatile *v, int32_atomic x ) { return OSAtomicAnd32Orig( x, (volatile uint32_t *) v); }
#elif defined(__GNUC__) int64_atomic atomic_fetch_and_or( int64_atomic volatile *v, int64_atomic x ) { throw std::logic_error("Not availible for this OS"); return 0; }
int32_atomic atomic_increment( int32_atomic volatile *x ) { int64_atomic atomic_fetch_and_and( int64_atomic volatile *v, int64_atomic x ) { throw std::logic_error("Not availible for this OS"); return 0; }
return __sync_add_and_fetch(x,1); inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y )
} {
int64_atomic atomic_increment( int64_atomic volatile *x ) { return OSAtomicAdd32Barrier( y, x );
return __sync_add_and_fetch(x,1); }
} inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y )
int32_atomic atomic_decrement( int32_atomic volatile *x ) { {
return __sync_sub_and_fetch(x,1); return OSAtomicAdd64Barrier( y, x );
} }
int64_atomic atomic_decrement( int64_atomic volatile *x ) { inline bool atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y )
return __sync_sub_and_fetch(x,1); {
} return OSAtomicCompareAndSwap32Barrier( x, y, v );
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) { }
return __sync_add_and_fetch(x,y); inline bool atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y )
} {
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) { return OSAtomicCompareAndSwap64Barrier( x, y, v );
return __sync_add_and_fetch(x,y); }
} inline bool atomic_compare_and_swap( void *volatile *v, void *x, void *y )
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) { {
return __sync_val_compare_and_swap(v,x,y); return OSAtomicCompareAndSwapPtrBarrier( x, y, v );
} }
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) { #elif defined( __GNUC__ )
return __sync_val_compare_and_swap(v,x,y); int32_atomic atomic_increment( int32_atomic volatile *x ) { return __sync_add_and_fetch( x, 1 ); }
} int64_atomic atomic_increment( int64_atomic volatile *x ) { return __sync_add_and_fetch( x, 1 ); }
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) { int32_atomic atomic_decrement( int32_atomic volatile *x ) { return __sync_sub_and_fetch( x, 1 ); }
return __sync_val_compare_and_swap(v,x,y); int64_atomic atomic_decrement( int64_atomic volatile *x ) { return __sync_sub_and_fetch( x, 1 ); }
} int32_atomic atomic_fetch_and_or( int32_atomic volatile *v, int32_atomic x ) { return __sync_fetch_and_or( v, x ); }
#elif defined(USE_PTHREAD_ATOMIC_LOCK) int64_atomic atomic_fetch_and_or( int64_atomic volatile *v, int64_atomic x ) { return __sync_fetch_and_or( v, x ); }
extern pthread_mutex_t atomic_pthread_lock; int32_atomic atomic_fetch_and_and( int32_atomic volatile *v, int32_atomic x ) { return __sync_fetch_and_and( v, x ); }
inline int32_atomic atomic_increment( int32_atomic volatile *x ) { int64_atomic atomic_fetch_and_and( int64_atomic volatile *v, int64_atomic x ) { return __sync_fetch_and_and( v, x ); }
pthread_mutex_lock(&atomic_pthread_lock); inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y )
int32_atomic y = ++(*x); {
pthread_mutex_unlock(&atomic_pthread_lock); return __sync_add_and_fetch( x, y );
return y; }
} inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y )
inline int64_atomic atomic_increment( int64_atomic volatile *x ) { {
pthread_mutex_lock(&atomic_pthread_lock); return __sync_add_and_fetch( x, y );
int64_atomic y = ++(*x); }
pthread_mutex_unlock(&atomic_pthread_lock); inline bool atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y )
return y; {
} return __sync_bool_compare_and_swap( v, x, y );
inline int32_atomic atomic_decrement( int32_atomic volatile *x ) { }
pthread_mutex_lock(&atomic_pthread_lock); inline bool atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y )
int32_atomic y = --(*x); {
pthread_mutex_unlock(&atomic_pthread_lock); return __sync_bool_compare_and_swap( v, x, y );
return y; }
} inline bool atomic_compare_and_swap( void *volatile *v, void *x, void *y )
inline int64_atomic atomic_decrement( int64_atomic volatile *x ) { {
pthread_mutex_lock(&atomic_pthread_lock); return __sync_bool_compare_and_swap( v, x, y );
int64_atomic y = --(*x); }
pthread_mutex_unlock(&atomic_pthread_lock); #elif defined( USE_PTHREAD_ATOMIC_LOCK )
return y; extern pthread_mutex_t atomic_pthread_lock;
} inline int32_atomic atomic_increment( int32_atomic volatile *x )
inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y ) { {
pthread_mutex_lock(&atomic_pthread_lock); pthread_mutex_lock( &atomic_pthread_lock );
*x += y; int32_atomic y = ++( *x );
int32_atomic z = *x; pthread_mutex_unlock( &atomic_pthread_lock );
pthread_mutex_unlock(&atomic_pthread_lock); return y;
return z; }
} inline int64_atomic atomic_increment( int64_atomic volatile *x )
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y ) { {
pthread_mutex_lock(&atomic_pthread_lock); pthread_mutex_lock( &atomic_pthread_lock );
*x += y; int64_atomic y = ++( *x );
int64_atomic z = *x; pthread_mutex_unlock( &atomic_pthread_lock );
pthread_mutex_unlock(&atomic_pthread_lock); return y;
return z; }
} inline int32_atomic atomic_decrement( int32_atomic volatile *x )
inline int32_atomic atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y ) { {
pthread_mutex_lock(&atomic_pthread_lock); pthread_mutex_lock( &atomic_pthread_lock );
*v = (*v==x) ? y:x; int32_atomic y = --( *x );
int32_atomic z = *v; pthread_mutex_unlock( &atomic_pthread_lock );
pthread_mutex_unlock(&atomic_pthread_lock); return y;
return z; }
} inline int64_atomic atomic_decrement( int64_atomic volatile *x )
inline int64_atomic atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y ) { {
pthread_mutex_lock(&atomic_pthread_lock); pthread_mutex_lock( &atomic_pthread_lock );
*v = (*v==x) ? y:x; int64_atomic y = --( *x );
int64_atomic z = *v; pthread_mutex_unlock( &atomic_pthread_lock );
pthread_mutex_unlock(&atomic_pthread_lock); return y;
return z; }
} inline int32_atomic atomic_add( int32_atomic volatile *x, int32_atomic y )
inline void* atomic_compare_and_swap( void* volatile *v, void* x, void* y ) { {
pthread_mutex_lock(&atomic_pthread_lock); pthread_mutex_lock( &atomic_pthread_lock );
*v = (*v==x) ? y:x; *x += y;
void* z = *v; int32_atomic z = *x;
pthread_mutex_unlock(&atomic_pthread_lock); pthread_mutex_unlock( &atomic_pthread_lock );
return z; return z;
} }
inline int64_atomic atomic_add( int64_atomic volatile *x, int64_atomic y )
{
pthread_mutex_lock( &atomic_pthread_lock );
*x += y;
int64_atomic z = *x;
pthread_mutex_unlock( &atomic_pthread_lock );
return z;
}
inline bool atomic_compare_and_swap( int32_atomic volatile *v, int32_atomic x, int32_atomic y )
{
pthread_mutex_lock( &atomic_pthread_lock );
bool test = *v == x;
*v = test ? y : x;
pthread_mutex_unlock( &atomic_pthread_lock );
return test;
}
inline bool atomic_compare_and_swap( int64_atomic volatile *v, int64_atomic x, int64_atomic y )
{
pthread_mutex_lock( &atomic_pthread_lock );
bool test = *v == x;
*v = test ? y : x;
pthread_mutex_unlock( &atomic_pthread_lock );
return test;
}
inline bool atomic_compare_and_swap( void *volatile *v, void *x, void *y )
{
pthread_mutex_lock( &atomic_pthread_lock );
bool test = *v == x;
*v = test ? y : x;
pthread_mutex_unlock( &atomic_pthread_lock );
return test;
}
#else #else
#error Unknown OS #error Unknown OS
#endif #endif
inline int32_atomic atomic_get( const int32_atomic volatile *x )
{
return atomic_add( const_cast<int32_atomic volatile *>( x ), 0 );
}
inline int64_atomic atomic_get( const int64_atomic volatile *x )
{
return atomic_add( const_cast<int64_atomic volatile *>( x ), 0 );
}
inline void atomic_set( int32_atomic volatile *x, int32_atomic y )
{
int32_atomic tmp = *x;
while ( !atomic_compare_and_swap( x, tmp, y ) ) { tmp = *x; }
}
inline void atomic_set( int64_atomic volatile *x, int64_atomic y )
{
int64_atomic tmp = *x;
while ( !atomic_compare_and_swap( x, tmp, y ) ) { tmp = *x; }
}
inline void atomic_swap( int32_atomic volatile *x, int32_atomic *y )
{
int32_atomic tmp = *x;
while ( !atomic_compare_and_swap( x, tmp, *y ) ) { tmp = *x; }
*y = tmp;
}
inline void atomic_swap( int64_atomic volatile *x, int64_atomic *y )
{
int64_atomic tmp = *x;
while ( !atomic_compare_and_swap( x, tmp, *y ) ) { tmp = *x; }
*y = tmp;
}
// Define an atomic counter // Define an atomic counter
struct counter_t { struct counter_t {
public: public:
// Constructor // Constructor
inline counter_t(): count(0) {} inline counter_t() : count( 0 ) {}
// Destructor // Destructor
inline ~counter_t() {} // Destructor inline ~counter_t() {} // Destructor
// Increment returning the new value // Increment returning the new value
inline int increment() { return atomic_increment(&count);} inline int increment() { return atomic_increment( &count ); }
// Decrement returning the new value // Decrement returning the new value
inline int decrement() { return atomic_decrement(&count);} inline int decrement() { return atomic_decrement( &count ); }
// Set the current value of the count // Set the current value of the count
inline void setCount(int val) { count = val;} inline void setCount( int val ) { count = val; }
// Get the current value of the count // Get the current value of the count
inline int getCount() const { return count;} inline int getCount() const { return count; }
private: private:
counter_t( const counter_t& ); counter_t( const counter_t & );
counter_t& operator=( const counter_t& ); counter_t &operator=( const counter_t & );
volatile int32_atomic count; volatile int32_atomic count;
}; };
@@ -332,4 +510,3 @@ struct counter_t {
#endif #endif

185
threadpool/atomic_list.h Normal file
View File

@@ -0,0 +1,185 @@
#ifndef included_AtomicModelAtomicList
#define included_AtomicModelAtomicList
#include <functional>
#include <csignal>
#include <atomic>
#include "threadpool/atomic_helpers.h"
/** \class AtomicList
*
* \brief Maintain a sorted list of entries
* \details This class implements a basic sorted list that is thread-safe and lock-free.
* Entries are stored smallest to largest according to the compare operator
*/
template< class TYPE, int MAX_SIZE, class COMPARE = std::less<TYPE> >
class AtomicList final
{
public:
//! Default constructor
AtomicList( const TYPE& default_value=TYPE(), const COMPARE& comp=COMPARE() );
/*!
* \brief Remove an item from the list
* \details Find and remove first entry that meets the given criteria
* @return Return the item that matches the criteria, or the default item if no item matches
* @param comp Comparison function object (i.e. an object that satisfies
* the requirements of Compare) which returns true if the
* given value meets the selection criteria.
* The signature of the comparison function should be equivalent to:
* bool cmp( const TYPE& value, ... );
*/
template<class Compare, class ... Args>
inline TYPE remove( Compare compare, Args... args );
//! Remove the first from the list
inline TYPE remove_first( );
/*!
* \brief Insert an item
* \details Insert an item into the list
* @param x Item to insert
* @param comp Comparison function object (i.e. an object that satisfies
* the requirements of Compare) which returns true if the
* first argument is less than (i.e. is ordered before) the second.
* The signature of the comparison function should be equivalent to:
* bool cmp(const TYPE &a, const TYPE &b);
*/
inline void insert( TYPE x );
/*!
* \brief Return the size of the list
* \details Return the number of items in the list
*/
inline int size( ) const { return AtomicOperations::atomic_get(&d_N); }
/*!
* \brief Check if the list is empty
* \details Return true if the list is empty
*/
inline bool empty( ) const { return AtomicOperations::atomic_get(&d_N)==0; }
/*!
* \brief Return the capacity of the list
* \details Return the maximum number of items the list can hold
*/
inline int capacity( ) const { return MAX_SIZE; }
/*!
* \brief Check the list
* \details Perform a series of checks to verify the list is in a stable state.
* Note: This function is only partially thread-safe: it will block all other
* operations on the list, but check may fail if we caught a thread modifing the list.
* It is intended for debugging purposes only!
* @return This function returns true if the list is in a good working state
*/
inline bool check( );
//! Return the total number of inserts since object creation
inline int64_t N_insert() const { return AtomicOperations::atomic_get(&d_N_insert); }
//! Return the total number of removals since object creation
inline int64_t N_remove() const { return AtomicOperations::atomic_get(&d_N_remove); }
private:
// Data members
COMPARE d_compare;
volatile TYPE d_default;
volatile TYPE d_objects[MAX_SIZE];
volatile AtomicOperations::int32_atomic d_N;
volatile AtomicOperations::int32_atomic d_next[MAX_SIZE+1];
volatile AtomicOperations::int32_atomic d_unused;
volatile AtomicOperations::int64_atomic d_N_insert;
volatile AtomicOperations::int64_atomic d_N_remove;
private:
inline int lock( int i )
{
if ( i == -1 )
return -1;
int tmp = 0;
while ( tmp == 0 )
tmp = AtomicOperations::atomic_fetch_and_and( &d_next[i], 0 );
return tmp;
}
inline void unlock( int i, int value )
{
if ( i != -1 )
AtomicOperations::atomic_fetch_and_or( &d_next[i], value );
}
inline int get_unused( )
{
int i = 0;
while ( i == 0 )
i = AtomicOperations::atomic_fetch_and_and( &d_unused, 0 );
AtomicOperations::atomic_fetch_and_or( &d_unused, -(d_next[i]+4)+1 );
d_next[i] = -3;
return i;
}
inline void put_unused( int i )
{
int j = 0;
while ( j == 0 )
AtomicOperations::atomic_swap( &d_unused, &j );
d_next[i] = -3-j;
AtomicOperations::atomic_fetch_and_or( &d_unused, i );
}
private:
AtomicList( const AtomicList& );
AtomicList& operator=( const AtomicList& );
};
/** \class MemoryPool
*
* \brief Pool allocator
* \details This class implements a basic fast pool allocator that is thread-safe.
*/
template< class TYPE, class INT_TYPE=int >
class MemoryPool final
{
public:
//! Default constructor
explicit MemoryPool( size_t size );
//! destructor
~MemoryPool( );
/*!
* \brief Allocate an object
* \details Allocates a new object from the pool
* @return Return the new pointer, or nullptr if there is no more room in the pool
*/
inline TYPE* allocate( );
/*!
* \brief Insert an item
* \details Insert an item into the list
* @param ptr The pointer to free
*/
inline void free( TYPE* ptr );
private:
// Data members
volatile TYPE *d_objects;
volatile AtomicOperations::int32_atomic d_next;
private:
MemoryPool( const MemoryPool& );
MemoryPool& operator=( const MemoryPool& );
};
#include "threadpool/atomic_list.hpp"
#endif

242
threadpool/atomic_list.hpp Normal file
View File

@@ -0,0 +1,242 @@
#ifndef included_AtomicList_hpp
#define included_AtomicList_hpp
#include <stdexcept>
#include <iostream>
#include <thread>
/******************************************************************
* Constructor *
******************************************************************/
template<class TYPE,int MAX_SIZE,class COMPARE>
AtomicList<TYPE,MAX_SIZE,COMPARE>::AtomicList( const TYPE& default_value, const COMPARE& comp ):
d_compare(comp),
d_default(default_value)
{
d_N = 0;
d_next[0] = -1;
d_unused = 1;
d_N_insert = 0;
d_N_remove = 0;
for (int i=0; i<MAX_SIZE; i++) {
d_next[i+1] = -5-i;
d_objects[i] = d_default;
}
}
/******************************************************************
* Remove an item *
******************************************************************/
template<class TYPE,int MAX_SIZE,class COMPARE>
template<class Compare, class ... Args>
inline TYPE AtomicList<TYPE,MAX_SIZE,COMPARE>::remove( Compare compare, Args... args )
{
// Acquiring temporary ownership
int pos = 0;
auto next = lock( 0 );
while ( true ) {
if ( next == -1 ) {
// We have no more entires to search
unlock( pos, -1 );
pos = -1;
break;
}
if ( next < 0 )
throw std::logic_error( "Internal error" );
// Acquire ownership of the next item
int next2 = lock( next );
// Test to see if the object passes compare
bool test = compare( const_cast<TYPE&>(d_objects[next-1]), args... );
if ( test ) {
// We want to return this object, update next to point to another entry and remove the entry
unlock( next, -3 );
unlock( pos, next2 );
pos = next;
break;
}
// Release the ownership and move on
unlock( pos, next );
pos = next;
next = next2;
}
TYPE rtn(d_default);
if ( pos != -1 ) {
std::swap( rtn, const_cast<TYPE&>( d_objects[pos-1] ) );
put_unused( pos );
AtomicOperations::atomic_decrement( &d_N );
AtomicOperations::atomic_increment( &d_N_remove );
}
return rtn;
}
template<class TYPE,int MAX_SIZE,class COMPARE>
inline TYPE AtomicList<TYPE,MAX_SIZE,COMPARE>::remove_first( )
{
TYPE rtn(d_default);
auto next = lock( 0 );
if ( next != -1 ) {
int next2 = lock( next );
unlock( next, -3 );
unlock( 0, next2 );
std::swap( rtn, const_cast<TYPE&>( d_objects[next-1] ) );
put_unused( next );
AtomicOperations::atomic_decrement( &d_N );
AtomicOperations::atomic_increment( &d_N_remove );
} else {
unlock( 0, next );
}
return rtn;
}
/******************************************************************
* Insert an item *
******************************************************************/
template<class TYPE,int MAX_SIZE,class COMPARE>
inline void AtomicList<TYPE,MAX_SIZE,COMPARE>::insert( TYPE x )
{
int N_used = AtomicOperations::atomic_increment( &d_N );
if ( N_used > MAX_SIZE ) {
AtomicOperations::atomic_decrement( &d_N );
throw std::logic_error( "No room in list" );
}
// Get an index to store the entry
auto index = get_unused();
if ( index<1 )
throw std::logic_error( "Internal error" );
// Store the object in d_objects
AtomicOperations::atomic_increment( &d_N_insert );
d_objects[index-1] = x;
d_next[index] = -1;
// Find the position to store and update the next entires
int pos = 0;
auto next = lock( pos );
while ( true ) {
// Get the next item in the list (acquiring temporary ownership)
if ( next == -1 ) {
// We have no more entires to search, store here
unlock( pos, index );
break;
}
// Test to see if the object is < the value being compared
bool test = d_compare.operator()( x, const_cast<TYPE&>(d_objects[next-1]) );
if ( test ) {
// We want to store this object before next
d_next[index] = next;
unlock( pos, index );
break;
}
// Release the ownership and move on
int last = pos;
pos = next;
next = lock( next );
unlock( last, pos );
}
}
/******************************************************************
* Check the internal structures of the list *
* This is mostly thread-safe, but blocks all threads *
******************************************************************/
template<class TYPE,int MAX_SIZE,class COMPARE>
inline bool AtomicList<TYPE,MAX_SIZE,COMPARE>::check( )
{
// Get the lock and check for any other threads modifying the list
auto start = lock( 0 );
std::this_thread::sleep_for( std::chrono::microseconds(100) );
// Perform the checks on the list
bool pass = true;
int N1 = 0;
int N2 = 0;
int N_unused = 0;
int N_tail = 0;
for (int i=0; i<MAX_SIZE; i++) {
if ( d_objects[i] != d_default )
N1++;
}
for (int i=0; i<MAX_SIZE+1; i++) {
int next = i==0 ? start:d_next[i];
if ( next > 0 ) {
N2++;
} else if ( next < -3 ) {
N_unused++;
} else if ( next == -1 ) {
N_tail++;
} else {
pass = false;
}
}
pass = pass && N_tail==1 && N1==d_N && N2==d_N && N_unused+d_N==MAX_SIZE;
int it = 0;
int pos = 0;
while ( true ) {
int next = pos==0 ? start:d_next[pos];
if ( next == -1 )
break;
pos = next;
it++;
}
pass = pass && it==d_N;
// Unlock the list and return the results
unlock( 0, start );
return pass;
}
/******************************************************************
* MemoryPool *
******************************************************************/
template<class TYPE,class INT_TYPE>
MemoryPool<TYPE,INT_TYPE>::MemoryPool( size_t size )
{
static_assert( sizeof(TYPE) >= sizeof(int),
"sizeof(TYPE) must be >= sizeof(int) to ensure proper operation" );
static_assert( sizeof(TYPE) >= sizeof(INT_TYPE),
"sizeof(TYPE) must be >= sizeof(INT_TYPE) to ensure proper operation" );
d_objects = reinterpret_cast<TYPE*>( malloc(sizeof(TYPE)*size) );
d_next = 1;
for (size_t i=0; i<size; i++)
reinterpret_cast<volatile INT_TYPE&>(d_objects[i]) = i+1;
reinterpret_cast<volatile INT_TYPE&>(d_objects[size-1]) = -1;
}
template<class TYPE,class INT_TYPE>
MemoryPool<TYPE,INT_TYPE>::~MemoryPool()
{
free( const_cast<TYPE*>( d_objects ) );
d_objects = nullptr;
}
template<class TYPE,class INT_TYPE>
inline TYPE* MemoryPool<TYPE,INT_TYPE>::allocate()
{
AtomicOperations::int32_atomic i = 0;
while ( i == 0 )
AtomicOperations::atomic_swap( &d_next, &i );
TYPE *ptr = nullptr;
if ( i!=-1 ) {
INT_TYPE j = reinterpret_cast<volatile INT_TYPE&>(d_objects[i-1]);
ptr = const_cast<TYPE*>( &d_objects[i-1] );
new(ptr) TYPE();
i = j+1;
}
AtomicOperations::atomic_fetch_and_or( &d_next, i );
return ptr;
}
template<class TYPE,class INT_TYPE>
inline void MemoryPool<TYPE,INT_TYPE>::free( TYPE* ptr )
{
ptr->~TYPE();
AtomicOperations::int32_atomic i = 0;
while ( i == 0 )
AtomicOperations::atomic_swap( &d_next, &i );
reinterpret_cast<INT_TYPE&>(*ptr) = i-1;
i = ptr - d_objects + 1;
AtomicOperations::atomic_fetch_and_or( &d_next, i );
}
#endif

View File

@@ -2,6 +2,7 @@ include ( macros )
# Add thread pool tests # Add thread pool tests
ADD_LBPM_TEST( test_atomic ) ADD_LBPM_TEST( test_atomic )
ADD_LBPM_TEST( test_atomic_list )
SET_TESTS_PROPERTIES ( test_atomic PROPERTIES FAIL_REGULAR_EXPRESSION ".*FAILED.*" PROCESSORS 64 ) SET_TESTS_PROPERTIES ( test_atomic PROPERTIES FAIL_REGULAR_EXPRESSION ".*FAILED.*" PROCESSORS 64 )
ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 1 4 ) ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 1 4 )
ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 2 4 ) ADD_LBPM_TEST_THREAD_MPI( test_thread_pool 2 4 )

View File

@@ -1,66 +1,25 @@
#include <stdlib.h> #include "threadpool/atomic_helpers.h"
#include <stdio.h> #include "common/UnitTest.h"
#include "common/Utilities.h"
#include <iostream> #include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include "threadpool/atomic_helpers.h" #include <thread>
#include "common/Utilities.h" #include <chrono>
#include "common/UnitTest.h" #include <functional>
#include <atomic>
#define perr std::cerr #define perr std::cerr
#define pout std::cout #define pout std::cout
#define printp printf #define printp printf
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
// Using windows
#define USE_WINDOWS
#define NOMINMAX
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#elif defined(__APPLE__)
// Using MAC
#define USE_MAC
#include <unistd.h>
#include <mach/mach_init.h>
#include <mach/thread_policy.h>
#elif defined(__linux) || defined(__unix) || defined(__posix)
// Using Linux
#define USE_LINUX
#include <pthread.h>
#include <unistd.h>
#else
#error Unknown OS
#endif
#ifdef USE_WINDOWS
#include <windows.h>
#define TIME_TYPE LARGE_INTEGER
#define get_time(x) QueryPerformanceCounter(x)
#define get_diff(start,end,f) (((double)(end.QuadPart-start.QuadPart))/((double)f.QuadPart))
#define get_frequency(f) QueryPerformanceFrequency(f)
#define sleep(x) Sleep(x*1000)
#elif defined(USE_LINUX) || defined(USE_MAC)
#include <sys/time.h>
#define TIME_TYPE timeval
#define get_time(x) gettimeofday(x,NULL);
#define get_diff(start,end,f) (((double)end.tv_sec-start.tv_sec)+1e-6*((double)end.tv_usec-start.tv_usec))
#define get_frequency(f) (*f=timeval())
#else
#error Unknown OS
#endif
// Function to increment/decrement a counter N times // Function to increment/decrement a counter N times
struct counter_data { static void modify_counter( int N, AtomicOperations::counter_t &counter )
AtomicOperations::counter_t *counter; {
int N;
};
void modify_counter( counter_data *data ) {
int N = data->N;
AtomicOperations::counter_t &counter = *(data->counter);
if ( N > 0 ) { if ( N > 0 ) {
for (int i=0; i<N; i++) for (int i=0; i<N; i++)
counter.increment(); counter.increment();
@@ -71,78 +30,35 @@ void modify_counter( counter_data *data ) {
} }
// Define the thread handle type
#ifdef USE_WINDOWS
typedef HANDLE thread_handle;
#elif defined(USE_LINUX) || defined(USE_MAC)
typedef pthread_t* thread_handle;
#else
#error Unknown OS
#endif
// Create a thread
#ifdef USE_WINDOWS
static thread_handle create_thread( void (*routine)(void*), void* data ) {
return (HANDLE)_beginthread( routine, 0, data);
}
#elif defined(USE_LINUX) || defined(USE_MAC)
static thread_handle create_thread( void (*routine)(void*), void* data ) {
pthread_t *id = new pthread_t;
pthread_create( id, NULL, (void*(*)(void*)) routine, data );
return id;
}
#else
#error Unknown OS
#endif
// Destroy a thread
#ifdef USE_WINDOWS
static void destroy_thread( thread_handle id ) {
WaitForMultipleObjects( 1, &id, 1, 10000 );
}
#elif defined(USE_LINUX) || defined(USE_MAC)
static void destroy_thread( thread_handle id ) {
pthread_join(*id,NULL);
delete id;
}
#else
#error Unknown OS
#endif
/****************************************************************** /******************************************************************
* The main program * * The main program *
******************************************************************/ ******************************************************************/
#ifdef USE_WINDOWS #ifdef USE_WINDOWS
int __cdecl main(int, char **) { int __cdecl main( int, char ** )
#elif defined(USE_LINUX) || defined(USE_MAC) {
int main(int, char*[]) { #elif defined( USE_LINUX ) || defined( USE_MAC )
int main( int, char *[] )
{
#else #else
#error Unknown OS #error Unknown OS
#endif #endif
UnitTest ut; UnitTest ut;
int N_threads = 64; // Number of threads int N_threads = 64; // Number of threads
int N_count = 1000000; // Number of work items int N_count = 1000000; // Number of work items
TIME_TYPE start, end, f; // Ensure we are using all processors
get_frequency(&f); #ifdef __USE_GNU
int N_procs = sysconf( _SC_NPROCESSORS_ONLN );
// Ensure we are using all processors cpu_set_t mask;
#ifdef __USE_GNU CPU_ZERO( &mask );
int N_procs = sysconf( _SC_NPROCESSORS_ONLN ); for ( int i = 0; i < N_procs; i++ )
cpu_set_t mask; CPU_SET( i, &mask );
CPU_ZERO(&mask); sched_setaffinity( getpid(), sizeof( cpu_set_t ), &mask );
for (int i=0; i<N_procs; i++) #endif
CPU_SET(i,&mask);
sched_setaffinity(getpid(), sizeof(cpu_set_t), &mask );
#endif
// Create the counter we want to test // Create the counter we want to test
AtomicOperations::counter_t count; AtomicOperations::counter_t count;
counter_data data;
data.counter = &count;
data.N = 0;
if ( count.increment() == 1 ) if ( count.increment() == 1 )
ut.passes("increment count"); ut.passes("increment count");
else else
@@ -159,88 +75,78 @@ void modify_counter( counter_data *data ) {
count.setCount(0); count.setCount(0);
// Increment the counter in serial // Increment the counter in serial
data.N = N_count; auto start = std::chrono::high_resolution_clock::now();
get_time(&start); modify_counter( N_count, count );
modify_counter( &data ); auto stop = std::chrono::high_resolution_clock::now();
get_time(&end); double time_inc_serial = std::chrono::duration<double>(stop-start).count() / N_count;
double time_inc_serial = get_diff(start,end,f)/N_count; int val = count.getCount();
int val = count.getCount();
if ( val != N_count ) { if ( val != N_count ) {
char tmp[100]; char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,N_count); sprintf( tmp, "Count of %i did not match expected count of %i", val, N_count );
ut.failure(tmp); ut.failure( tmp );
} }
printp("Time to increment (serial) = %0.1f ns\n",1e9*time_inc_serial); printp( "Time to increment (serial) = %0.1f ns\n", 1e9 * time_inc_serial );
// Decrement the counter in serial // Decrement the counter in serial
data.N = -N_count; start = std::chrono::high_resolution_clock::now();
get_time(&start); modify_counter( -N_count, count );
modify_counter( &data ); stop = std::chrono::high_resolution_clock::now();
get_time(&end); double time_dec_serial = std::chrono::duration<double>(stop-start).count() / N_count;
double time_dec_serial = get_diff(start,end,f)/N_count; val = count.getCount();
val = count.getCount();
if ( val != 0 ) { if ( val != 0 ) {
char tmp[100]; char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,0); sprintf( tmp, "Count of %i did not match expected count of %i", val, 0 );
ut.failure(tmp); ut.failure( tmp );
} }
printp("Time to decrement (serial) = %0.1f ns\n",1e9*time_dec_serial); printp( "Time to decrement (serial) = %0.1f ns\n", 1e9 * time_dec_serial );
// Increment the counter in parallel // Increment the counter in parallel
data.N = N_count; std::vector<std::thread> threads( N_threads );
std::vector<thread_handle> thread_ids(N_threads); start = std::chrono::high_resolution_clock::now();
get_time(&start); for ( int i = 0; i < N_threads; i++ )
for (int i=0; i<N_threads; i++) { threads[i] = std::thread( modify_counter, N_count, std::ref(count) );
thread_ids[i] = create_thread( (void (*)(void*)) modify_counter, (void*) &data ); for ( int i = 0; i < N_threads; i++ )
} threads[i].join();
for (int i=0; i<N_threads; i++) { stop = std::chrono::high_resolution_clock::now();
destroy_thread( thread_ids[i] ); double time_inc_parallel = std::chrono::duration<double>(stop-start).count() / ( N_count * N_threads );
} val = count.getCount();
get_time(&end); if ( val != N_count * N_threads ) {
double time_inc_parallel = get_diff(start,end,f)/(N_count*N_threads);
val = count.getCount();
if ( val != N_count*N_threads ) {
char tmp[100]; char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,N_count*N_threads); sprintf( tmp, "Count of %i did not match expected count of %i", val, N_count * N_threads );
ut.failure(tmp); ut.failure( tmp );
} }
printp("Time to increment (parallel) = %0.1f ns\n",1e9*time_inc_parallel); printp( "Time to increment (parallel) = %0.1f ns\n", 1e9 * time_inc_parallel );
// Decrement the counter in parallel // Decrement the counter in parallel
data.N = -N_count; start = std::chrono::high_resolution_clock::now();
get_time(&start); for ( int i = 0; i < N_threads; i++ )
for (int i=0; i<N_threads; i++) { threads[i] = std::thread( modify_counter, -N_count, std::ref(count) );
thread_ids[i] = create_thread( (void (*)(void*)) modify_counter, (void*) &data ); for ( int i = 0; i < N_threads; i++ )
} threads[i].join();
for (int i=0; i<N_threads; i++) { stop = std::chrono::high_resolution_clock::now();
destroy_thread( thread_ids[i] ); double time_dec_parallel = std::chrono::duration<double>(stop-start).count() / ( N_count * N_threads );
} val = count.getCount();
get_time(&end);
double time_dec_parallel = get_diff(start,end,f)/(N_count*N_threads);
val = count.getCount();
if ( val != 0 ) { if ( val != 0 ) {
char tmp[100]; char tmp[100];
sprintf(tmp,"Count of %i did not match expected count of %i",val,0); sprintf( tmp, "Count of %i did not match expected count of %i", val, 0 );
ut.failure(tmp); ut.failure( tmp );
} }
printp("Time to decrement (parallel) = %0.1f ns\n",1e9*time_dec_parallel); printp( "Time to decrement (parallel) = %0.1f ns\n", 1e9 * time_dec_parallel );
// Check the time to increment/decrement // Check the time to increment/decrement
if ( time_inc_serial>100e-9 || time_dec_serial>100e-9 || time_inc_parallel>100e-9 || time_dec_serial>100e-9 ) { if ( time_inc_serial > 100e-9 || time_dec_serial > 100e-9 || time_inc_parallel > 100e-9 ||
#if USE_GCOV time_dec_serial > 100e-9 ) {
ut.expected_failure("Time to increment/decrement count is too expensive"); #if USE_GCOV
#else ut.expected_failure( "Time to increment/decrement count is too expensive" );
ut.failure("Time to increment/decrement count is too expensive"); #else
#endif ut.failure( "Time to increment/decrement count is too expensive" );
#endif
} else { } else {
ut.passes("Time to increment/decrement passed"); ut.passes( "Time to increment/decrement passed" );
} }
// Finished // Finished
ut.report(); ut.report();
int N_errors = ut.NumFailGlobal(); int N_errors = static_cast<int>( ut.NumFailGlobal() );
return N_errors; return N_errors;
} }

View File

@@ -0,0 +1,210 @@
#include "threadpool/atomic_list.h"
#include "common/UnitTest.h"
#include "common/Utilities.h"
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <vector>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>
#include <algorithm>
static void modify_list( AtomicList<int,1024>& list )
{
const int N_count = 50000;
for (int i=0; i<N_count; i++) {
auto v1 = list.remove_first( );
auto v2 = list.remove( [](int) { return true; } );
auto v3 = list.remove( [](int v) { return v>=(rand()/8); } );
auto v4 = list.remove( [](int v) { return v>=(rand()/4); } );
auto v5 = list.remove( [](int v) { return v>=(rand()/2); } );
if ( v1 !=-1 ) { list.insert( v1 ); }
if ( v2 !=-1 ) { list.insert( v2 ); }
if ( v3 !=-1 ) { list.insert( v3 ); }
if ( v4 !=-1 ) { list.insert( v4 ); }
if ( v5 !=-1 ) { list.insert( v5 ); }
}
}
static bool check_list( const std::vector<int>& x, AtomicList<int,1024>& list )
{
bool pass = list.check();
pass = pass && (int) x.size() == list.size();
if ( pass ) {
for (size_t i=0; i<x.size(); i++)
pass = pass && x[i] == list.remove( [](int) { return true; } );
}
// Restore the list
for (int i=0; i<list.size(); i++)
list.remove_first();
for (size_t i=0; i<x.size(); i++)
list.insert( x[i] );
return pass;
}
static inline void clear_list(AtomicList<int,1024>& list )
{
for (int i=0; i<list.size(); i++)
list.remove_first();
}
/******************************************************************
* The main program *
******************************************************************/
int main( int, char *[] )
{
UnitTest ut;
int N_threads = 8; // Number of threads
// Create the list
AtomicList<int,1024> list(-1);
if ( list.size()==0 && list.check() )
ut.passes( "Initialize" );
else
ut.failure( "Initialize" );
// Initialize the list with some empty values
for (int i=0; i<80; i++)
list.insert( rand() );
list.insert( 2 );
list.insert( 1 );
list.insert( rand() );
// Try to pull off a couple of values
int v1 = list.remove( [](int a) { return a==1; } ); // Find the entry with 1
int v2 = list.remove( [](int) { return true; } ); // Get the first entry
int v3 = list.remove( [](int) { return false; } ); // Fail to get an entry
if ( v1==1 && v2==2 && v3==-1 && list.size()==81 && list.check() )
ut.passes( "Basic sanity test" );
else
ut.failure( "Basic sanity test" );
// Clear the list
while ( list.remove( [](int) { return true; } ) != -1 ) {}
// Create a list of known values
//std::vector<int> data0(512);
std::vector<int> data0(5*N_threads);
for (size_t i=0; i<data0.size(); i++)
data0[i] = rand();
auto data = data0;
std::sort( data.begin(), data.end() );
// Test the cost to insert
int N_it = 20;
for (int i=0; i<list.size(); i++)
list.remove( [](int) { return true; } );
std::chrono::duration<double> time;
std::chrono::time_point<std::chrono::high_resolution_clock> start, stop;
time = time.zero();
for (int it=0; it<N_it; it++ ) {
clear_list( list );
start = std::chrono::high_resolution_clock::now();
for (size_t i=0; i<data0.size(); i++)
list.insert( data0[i] );
stop = std::chrono::high_resolution_clock::now();
time += ( stop - start );
}
printf("insert time/item = %0.0f ns\n",1e9*time.count()/(N_it*data0.size()));
// Test the cost to remove (first)
time = time.zero();
for (int it=0; it<N_it; it++ ) {
check_list( data, list );
start = std::chrono::high_resolution_clock::now();
for (size_t i=0; i<data0.size(); i++)
list.remove_first( );
stop = std::chrono::high_resolution_clock::now();
time += ( stop - start );
}
printf("remove (first) time/item = %0.0f ns\n",1e9*time.count()/(N_it*data0.size()));
// Test the cost to remove (in order)
time = time.zero();
for (int it=0; it<N_it; it++ ) {
check_list( data, list );
start = std::chrono::high_resolution_clock::now();
for (size_t i=0; i<data0.size(); i++)
list.remove( [](int) { return true; } );
stop = std::chrono::high_resolution_clock::now();
time += ( stop - start );
}
printf("remove (ordered) time/item = %0.0f ns\n",1e9*time.count()/(N_it*data0.size()));
// Test the cost to remove (out order)
time = time.zero();
for (int it=0; it<N_it; it++ ) {
check_list( data, list );
start = std::chrono::high_resolution_clock::now();
for (size_t i=0; i<data0.size(); i++) {
int tmp = data0[i];
list.remove( [tmp](int v) { return v==tmp; } );
}
stop = std::chrono::high_resolution_clock::now();
time += ( stop - start );
}
printf("remove (unordered) time/item = %0.0f ns\n",1e9*time.count()/(N_it*data0.size()));
// Read/write to the list and check the results
int64_t N0 = list.N_remove();
check_list( data, list );
start = std::chrono::high_resolution_clock::now();
modify_list( list );
stop = std::chrono::high_resolution_clock::now();
double time_serial = std::chrono::duration<double>(stop-start).count();
int64_t N1 = list.N_remove();
bool pass = check_list( data, list );
if ( pass )
ut.passes( "Serial get/insert" );
else
ut.failure( "Serial get/insert" );
printf("serial time = %0.5f s\n",time_serial);
printf("serial time/item = %0.0f ns\n",1e9*time_serial/(N1-N0));
// Have multiple threads reading/writing to the list simultaneously
std::vector<std::thread> threads( N_threads );
start = std::chrono::high_resolution_clock::now();
for ( int i = 0; i < N_threads; i++ )
threads[i] = std::thread( modify_list, std::ref(list) );
for ( int i = 0; i < N_threads; i++ )
threads[i].join();
stop = std::chrono::high_resolution_clock::now();
double time_parallel = std::chrono::duration<double>(stop-start).count();
int64_t N2 = list.N_remove();
pass = check_list( data, list );
if ( pass )
ut.passes( "Parallel get/insert" );
else
ut.failure( "Parallel get/insert" );
printf("parallel time = %0.5f s\n",time_parallel);
printf("parallel time/item = %0.0f ns\n",1e9*time_parallel/(N2-N1));
// Try to over-fill the list
while ( !list.empty() )
list.remove_first();
for (int i=1; i<=list.capacity(); i++)
list.insert( i );
try {
list.insert( list.capacity()+1 );
ut.failure( "List overflow" );
} catch (const std::exception& e) {
ut.passes( "List overflow" );
} catch(...) {
ut.failure( "List overflow (unknown exception)" );
}
// Finished
ut.report();
int N_errors = static_cast<int>( ut.NumFailGlobal() );
return N_errors;
}

1296
threadpool/test/test_thread_pool.cpp Executable file → Normal file

File diff suppressed because it is too large Load Diff

2244
threadpool/thread_pool.cpp Executable file → Normal file

File diff suppressed because it is too large Load Diff

768
threadpool/thread_pool.h Executable file → Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff