Updating threadpool, Array, StackTrace, ... classes

This commit is contained in:
Mark Berrill
2018-02-06 10:50:43 -05:00
parent 396bb07b26
commit 98d86d2f94
21 changed files with 3839 additions and 2444 deletions

View File

@@ -5,14 +5,14 @@
#include "ProfilerApp.h"
#include <algorithm>
#include <bitset>
#include <chrono>
#include <climits>
#include <cstdio>
#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <stdio.h>
#include <stdlib.h>
#include <typeinfo>
#include <thread>
#include <chrono>
#include <typeinfo>
#define perr std::cerr
@@ -22,6 +22,15 @@
// OS specific includes / definitions
// clang-format off
#if defined( WIN32 ) || defined( _WIN32 ) || defined( WIN64 ) || defined( _WIN64 )
#define USE_WINDOWS
#elif defined( __APPLE__ )
#define USE_MAC
#elif defined( __linux ) || defined( __unix ) || defined( __posix )
#define USE_LINUX
#else
#error Unknown OS
#endif
#if defined( USE_WINDOWS )
#include <process.h>
#include <windows.h>
@@ -54,41 +63,45 @@
// Set some macros
#if PROFILE_THREADPOOL_PERFORMANCE
#define PROFILE_THREADPOOL_START( X ) PROFILE_START( X, 3 )
#define PROFILE_THREADPOOL_START2( X ) PROFILE_START2( X, 3 )
#define PROFILE_THREADPOOL_STOP( X ) PROFILE_STOP( X, 3 )
#define PROFILE_THREADPOOL_STOP2( X ) PROFILE_STOP2( X, 3 )
#define PROFILE_THREADPOOL_START( X ) PROFILE_START( X, 3 )
#define PROFILE_THREADPOOL_START2( X ) PROFILE_START2( X, 3 )
#define PROFILE_THREADPOOL_STOP( X ) PROFILE_STOP( X, 3 )
#define PROFILE_THREADPOOL_STOP2( X ) PROFILE_STOP2( X, 3 )
#else
#define PROFILE_THREADPOOL_START( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_START2( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_STOP( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_STOP2( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_START( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_START2( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_STOP( X ) \
do { \
} while ( 0 )
#define PROFILE_THREADPOOL_STOP2( X ) \
do { \
} while ( 0 )
#endif
#if MONITOR_THREADPOOL_PERFORMANCE == 1
#define accumulate( x, t1, t2 ) AtomicOperations::atomic_add( &x, \
std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t1).count() );
#define accumulate( x, t1, t2 ) \
AtomicOperations::atomic_add( \
&x, std::chrono::duration_cast<std::chrono::nanoseconds>( t2 - t1 ).count() );
#endif
#if MONITOR_THREADPOOL_PERFORMANCE == 1
static AtomicOperations::int64_atomic total_add_work_time[5] = {0,0,0,0,0};
static AtomicOperations::int64_atomic total_add_work_time[5] = { 0, 0, 0, 0, 0 };
#endif
// Helper functions
template <class T>
void quicksort( int N, T* data );
template <class T>
inline void quicksort( std::vector<T> &x ) { quicksort((int)x.size(),x.data()); }
static inline int find_id( int, const ThreadPool::thread_id_t*, const ThreadPool::thread_id_t& );
template<class T>
void quicksort( int N, T *data );
template<class T>
inline void quicksort( std::vector<T> &x )
{
quicksort( (int) x.size(), x.data() );
}
static inline int find_id( int, const ThreadPool::thread_id_t *, const ThreadPool::thread_id_t & );
// Function to generate a random size_t number (excluding 0 and ~0)
@@ -116,8 +129,8 @@ static size_t rand_size_t()
/******************************************************************
* Run some basic compile-time checks *
******************************************************************/
* Run some basic compile-time checks *
******************************************************************/
#if MAX_NUM_THREADS % 64 != 0
// We use a bit array for d_active and d_cancel
#error MAX_NUM_THREADS must be a multiple of 64
@@ -130,47 +143,52 @@ static size_t rand_size_t()
// We store the indicies to the queue list as short ints
#error MAX_QUEUED must < 65535
#endif
// Check the c++ std
#if CXX_STD == 98
#error Thread pool class requires c++11 or newer
#endif
/******************************************************************
* Get/Set a bit *
* Note: these functions are thread-safe *
******************************************************************/
* Get/Set a bit *
* Note: these functions are thread-safe *
******************************************************************/
static inline void set_bit( volatile AtomicOperations::int64_atomic *x, size_t index )
{
uint64_t mask = 0x01;
mask <<= index % 64;
size_t i = index / 64;
size_t i = index / 64;
bool test = false;
while ( !test ) {
AtomicOperations::int64_atomic y = x[i];
test = AtomicOperations::atomic_compare_and_swap( &x[i], y, (y|mask) );
test = AtomicOperations::atomic_compare_and_swap( &x[i], y, ( y | mask ) );
}
}
static inline void unset_bit( volatile AtomicOperations::int64_atomic *x, size_t index )
{
uint64_t mask = 0x01;
mask <<= index % 64;
mask = ~mask;
size_t i = index / 64;
mask = ~mask;
size_t i = index / 64;
bool test = false;
while ( !test ) {
AtomicOperations::int64_atomic y = x[i];
test = AtomicOperations::atomic_compare_and_swap( &x[i], y, (y&mask) );
test = AtomicOperations::atomic_compare_and_swap( &x[i], y, ( y & mask ) );
}
}
static inline bool get_bit( const volatile AtomicOperations::int64_atomic *x, size_t index )
{
uint64_t mask = 0x01;
mask <<= index % 64;
AtomicOperations::int64_atomic y = x[index / 64]; // This is thread-safe since we only care about a single bit
// This is thread-safe since we only care about a single bit
AtomicOperations::int64_atomic y = x[index / 64];
return ( y & mask ) != 0;
}
/******************************************************************
* Simple function to check if the parity is odd (true) or even *
******************************************************************/
* Simple function to check if the parity is odd (true) or even *
******************************************************************/
static inline bool is_odd8( size_t x )
{ // This only works for 64-bit integers
x ^= ( x >> 1 );
@@ -181,7 +199,7 @@ static inline bool is_odd8( size_t x )
x ^= ( x >> 32 );
return ( x & 0x01 ) > 0;
}
template <class int_type>
template<class int_type>
static inline int count_bits( int_type x )
{
int count = 0;
@@ -194,8 +212,18 @@ static inline int count_bits( int_type x )
/******************************************************************
* Set the bahvior of OS warnings *
******************************************************************/
* Set the global constants *
******************************************************************/
constexpr int ThreadPool::MAX_NUM_THREADS;
constexpr int ThreadPool::MAX_QUEUED;
constexpr int ThreadPool::MAX_WAIT;
constexpr bool ThreadPool::PROFILE_THREADPOOL_PERFORMANCE;
constexpr bool ThreadPool::MONITOR_THREADPOOL_PERFORMANCE;
/******************************************************************
* Set the behavior of OS warnings *
******************************************************************/
static int global_OS_behavior = 0;
std::mutex OS_warning_mutex;
void ThreadPool::set_OS_warnings( int behavior )
@@ -213,11 +241,14 @@ static void OS_warning( const std::string &message )
}
OS_warning_mutex.unlock();
}
void ThreadPool::setErrorHandler( std::function<void( const std::string & )> fun )
{
d_errorHandler = fun;
}
/******************************************************************
* Function to return the number of processors availible *
******************************************************************/
* Function to return the number of processors availible *
******************************************************************/
int ThreadPool::getNumberOfProcessors()
{
#if defined( USE_LINUX ) || defined( USE_MAC )
@@ -233,8 +264,8 @@ int ThreadPool::getNumberOfProcessors()
/******************************************************************
* Function to return the processor number of the current thread *
******************************************************************/
* Function to return the processor number of the current thread *
******************************************************************/
int ThreadPool::getCurrentProcessor()
{
#if defined( USE_LINUX )
@@ -251,8 +282,8 @@ int ThreadPool::getCurrentProcessor()
/******************************************************************
* Function to get/set the affinity of the current process *
******************************************************************/
* Function to get/set the affinity of the current process *
******************************************************************/
std::vector<int> ThreadPool::getProcessAffinity()
{
std::vector<int> procs;
@@ -325,8 +356,8 @@ void ThreadPool::setProcessAffinity( std::vector<int> procs )
/******************************************************************
* Function to get the thread affinities *
******************************************************************/
* Function to get the thread affinities *
******************************************************************/
#ifdef USE_WINDOWS
DWORD GetThreadAffinityMask( HANDLE thread )
{
@@ -387,7 +418,7 @@ std::vector<int> ThreadPool::getThreadAffinity( int thread ) const
if ( thread >= getNumThreads() )
std::logic_error( "Invalid thread number" );
std::vector<int> procs;
auto handle = const_cast<std::thread&>( d_thread[thread] ).native_handle();
auto handle = const_cast<std::thread &>( d_thread[thread] ).native_handle();
#ifdef USE_LINUX
#ifdef _GNU_SOURCE
cpu_set_t mask;
@@ -423,8 +454,8 @@ std::vector<int> ThreadPool::getThreadAffinity( int thread ) const
/******************************************************************
* Function to set the thread affinity *
******************************************************************/
* Function to set the thread affinity *
******************************************************************/
void ThreadPool::setThreadAffinity( std::vector<int> procs )
{
#ifdef USE_LINUX
@@ -458,7 +489,7 @@ void ThreadPool::setThreadAffinity( int thread, std::vector<int> procs ) const
{
if ( thread >= getNumThreads() )
std::logic_error( "Invalid thread number" );
auto handle = const_cast<std::thread&>( d_thread[thread] ).native_handle();
auto handle = const_cast<std::thread &>( d_thread[thread] ).native_handle();
#ifdef USE_LINUX
#ifdef __USE_GNU
cpu_set_t mask;
@@ -490,15 +521,15 @@ void ThreadPool::setThreadAffinity( int thread, std::vector<int> procs ) const
/******************************************************************
* Function to perform some basic checks before we start *
******************************************************************/
* Function to perform some basic checks before we start *
******************************************************************/
void ThreadPool::check_startup( size_t size0 )
{
// Check the size of the class to make sure that we don't have any
// byte alignment problems between a library implimentation and a calling pacakge
size_t size1 = sizeof( ThreadPool );
size_t size2 = ( (size_t) &d_NULL_HEAD ) - ( ( size_t ) this ) + sizeof( size_t );
size_t size3 = ( (size_t) &d_NULL_TAIL ) - ( ( size_t ) this ) + sizeof( size_t );
size_t size2 = ( (size_t) &d_NULL_HEAD ) - ( (size_t) this ) + sizeof( size_t );
size_t size3 = ( (size_t) &d_NULL_TAIL ) - ( (size_t) this ) + sizeof( size_t );
if ( size0 != size1 || size1 < size2 || size1 < size3 )
throw std::logic_error( "Internal data format problem" );
// Check the size of variables
@@ -517,7 +548,7 @@ void ThreadPool::check_startup( size_t size0 )
ThreadPool::thread_id_t id;
if ( id.getPriority() != -128 )
pass = false;
id.reset( 3, 564, NULL );
id.reset( 3, 564, nullptr );
if ( id.getPriority() != 3 || id.getLocalID() != 564 )
pass = false;
if ( count_bits( 0x0 ) != 0 || count_bits( 0x03 ) != 2 )
@@ -530,8 +561,10 @@ void ThreadPool::check_startup( size_t size0 )
if ( is_odd8( ~( (size_t) 0 ) ) || !is_odd8( thread_id_t::maxThreadID ) )
pass = false;
for ( size_t i = 0; i < 1024; i++ ) {
if ( ( count_bits( thread_id_t::maxThreadID - i ) % 2 == 1 ) != is_odd8( thread_id_t::maxThreadID - i ) ) {
printp( "%i %i %s\n", count_bits( thread_id_t::maxThreadID - i ), is_odd8( thread_id_t::maxThreadID - i ) ? 1 : 0,
if ( ( count_bits( thread_id_t::maxThreadID - i ) % 2 == 1 ) !=
is_odd8( thread_id_t::maxThreadID - i ) ) {
printp( "%i %i %s\n", count_bits( thread_id_t::maxThreadID - i ),
is_odd8( thread_id_t::maxThreadID - i ) ? 1 : 0,
std::bitset<64>( thread_id_t::maxThreadID - i ).to_string().c_str() );
pass = false;
}
@@ -550,27 +583,28 @@ void ThreadPool::check_startup( size_t size0 )
/******************************************************************
* Function to initialize the thread pool *
******************************************************************/
* Function to initialize the thread pool *
******************************************************************/
void ThreadPool::initialize( const int N, const char *affinity, int N_procs, const int *procs )
{
// Initialize the header/tail
d_NULL_HEAD = rand_size_t();
d_NULL_TAIL = d_NULL_HEAD;
// Initialize the variables to NULL values
d_id_assign = 0;
d_signal_empty = false;
d_signal_count = 0;
d_N_threads = 0;
d_num_active = 0;
d_N_added = 0;
d_N_started = 0;
d_N_finished = 0;
d_id_assign = 0;
d_signal_empty = false;
d_signal_count = 0;
d_N_threads = 0;
d_num_active = 0;
d_N_added = 0;
d_N_started = 0;
d_N_finished = 0;
d_max_wait_time = 600;
memset( (void *) d_active, 0, MAX_NUM_THREADS / 8 );
memset( (void *) d_cancel, 0, MAX_NUM_THREADS / 8 );
d_wait_last = nullptr;
for ( int i = 0; i < MAX_WAIT; i++ )
d_wait[i] = nullptr;
for ( auto &i : d_wait )
i = nullptr;
// Initialize the id
d_id_assign = thread_id_t::maxThreadID;
// Create the threads
@@ -579,14 +613,14 @@ void ThreadPool::initialize( const int N, const char *affinity, int N_procs, con
/******************************************************************
* This is the de-constructor *
******************************************************************/
* This is the de-constructor *
******************************************************************/
ThreadPool::~ThreadPool()
{
if ( !is_valid( this ) ) {
std::cerr << "Thread pool is not valid\n";
std::terminate();
}
DISABLE_WARNINGS
if ( !is_valid( this ) )
throw std::logic_error( "Thread pool is not valid" );
ENABLE_WARNINGS
// Destroy the threads
setNumThreads( 0 );
// Delete all remaining data
@@ -598,16 +632,15 @@ ThreadPool::~ThreadPool()
// Print the performance metrics
printp( "ThreadPool Performance:\n" );
printp( "add_work: %lu us, %lu us, %lu us, %lu us, %lu us\n",
total_add_work_time[0]/1000, total_add_work_time[1]/1000,
total_add_work_time[2]/1000, total_add_work_time[3]/1000,
total_add_work_time[4]/1000 );
total_add_work_time[0] / 1000, total_add_work_time[1] / 1000, total_add_work_time[2] / 1000,
total_add_work_time[3] / 1000, total_add_work_time[4] / 1000 );
#endif
}
/******************************************************************
* Check if the pointer points to a valid thread pool object *
******************************************************************/
* Check if the pointer points to a valid thread pool object *
******************************************************************/
bool ThreadPool::is_valid( const ThreadPool *tpool )
{
if ( tpool == nullptr )
@@ -621,8 +654,8 @@ bool ThreadPool::is_valid( const ThreadPool *tpool )
/******************************************************************
* This function creates the threads in the thread pool *
******************************************************************/
* This function creates the threads in the thread pool *
******************************************************************/
void ThreadPool::setNumThreads(
int num_worker_threads, const char *affinity2, int N_procs, const int *procs )
{
@@ -643,8 +676,8 @@ void ThreadPool::setNumThreads(
int d_N_threads_diff = num_worker_threads - d_N_threads;
if ( d_N_threads_diff > 0 ) {
// Check that no threads are in the process of being deleted
for ( int i = 0; i < MAX_NUM_THREADS / 64; i++ ) {
if ( d_cancel[i] != 0 )
for ( long i : d_cancel ) {
if ( i != 0 )
throw std::logic_error(
"Threads are being created and destroyed at the same time" );
}
@@ -670,11 +703,11 @@ void ThreadPool::setNumThreads(
j++;
}
// Wait for all of the threads to finish initialization
while ( 1 ) {
std::this_thread::sleep_for( std::chrono::milliseconds(25) );
while ( true ) {
std::this_thread::sleep_for( std::chrono::milliseconds( 25 ) );
bool wait = false;
for ( int i = 0; i < MAX_NUM_THREADS / 64; i++ ) {
if ( d_cancel[i] != 0 )
for ( long i : d_cancel ) {
if ( i != 0 )
wait = true;
}
if ( !wait )
@@ -684,7 +717,7 @@ void ThreadPool::setNumThreads(
#if defined( USE_LINUX ) || defined( USE_MAC )
pthread_attr_destroy( &attr );
#endif
std::this_thread::sleep_for( std::chrono::milliseconds(25) );
std::this_thread::sleep_for( std::chrono::milliseconds( 25 ) );
delete[] tmp;
} else if ( d_N_threads_diff < 0 ) {
// Reduce the number of threads
@@ -697,7 +730,7 @@ void ThreadPool::setNumThreads(
set_bit( d_cancel, d_N_threads - 1 + i );
// Wake all threads to process the shutdown
d_wait_work.notify_all();
std::this_thread::sleep_for( std::chrono::milliseconds(25) );
std::this_thread::sleep_for( std::chrono::milliseconds( 25 ) );
// Wait for the threads to close
for ( int i = 0; i > d_N_threads_diff; i-- ) {
d_thread[d_N_threads - 1 + i].join();
@@ -732,13 +765,13 @@ void ThreadPool::setNumThreads(
// We do not have a list of cpus to use, do nothing (OS not supported)
} else if ( affinity == "none" ) {
// We are using the default thread affinities (all threads get all procs of the program)
for ( int i = 0; i < d_N_threads; i++ )
for ( int i = 0; i < d_N_threads; i++ )
t_procs[i] = cpus;
} else if ( affinity == "independent" ) {
// We want to use an independent set of processors for each thread
if ( (int) cpus.size() == d_N_threads ) {
// The number of cpus matches the number of threads
for ( int i = 0; i < d_N_threads; i++ )
for ( int i = 0; i < d_N_threads; i++ )
t_procs[i] = std::vector<int>( 1, cpus[i] );
} else if ( (int) cpus.size() > d_N_threads ) {
// There are more cpus than threads, threads will use more the one processor
@@ -752,7 +785,7 @@ void ThreadPool::setNumThreads(
}
} else {
// There are fewer cpus than threads, threads will share a processor
int N_threads_proc =
auto N_threads_proc =
static_cast<int>( ( cpus.size() + d_N_threads - 1 ) / cpus.size() );
for ( int i = 0; i < d_N_threads; i++ )
t_procs[i].push_back( cpus[i / N_threads_proc] );
@@ -776,10 +809,10 @@ void ThreadPool::setNumThreads(
/******************************************************************
* This is the function that controls the individual thread and *
* allows it to do work. *
* Note: this function is lock free *
******************************************************************/
* This is the function that controls the individual thread and *
* allows it to do work. *
* Note: this function is lock free *
******************************************************************/
void ThreadPool::tpool_thread( int thread_id )
{
bool shutdown = false;
@@ -797,8 +830,8 @@ void ThreadPool::tpool_thread( int thread_id )
try {
std::vector<int> cpus = ThreadPool::getProcessAffinity();
printp( "%i cpus for current thread: ", (int) cpus.size() );
for ( size_t i = 0; i < cpus.size(); i++ )
printp( "%i ", cpus[i] );
for ( int cpu : cpus )
printp( "%i ", cpu );
printp( "\n" );
} catch ( ... ) {
printp( "Unable to get process affinity\n" );
@@ -811,24 +844,39 @@ void ThreadPool::tpool_thread( int thread_id )
// Check if there is work to do
if ( d_queue_list.size() > 0 ) {
// Get next work item to process
auto work_id = d_queue_list.remove( []( const thread_id_t& id ) { return id.ready(); } );
auto work_id =
d_queue_list.remove( []( const thread_id_t &id ) { return id.ready(); } );
if ( work_id.isNull() ) {
std::this_thread::yield();
continue;
}
WorkItem *work = work_id.work( );
WorkItem *work = work_id.work();
AtomicOperations::atomic_increment( &d_N_started );
// Start work here
PROFILE_THREADPOOL_START( "thread working" );
work->d_state = 2;
work->run();
work->d_state = 3;
work->d_state = 2;
if ( d_errorHandler ) {
try {
work->run();
} catch ( std::exception &e ) {
auto msg = Utilities::stringf(
"Error, caught exception in thread %i:\n %s\n", thread_id, e.what() );
d_errorHandler( msg );
} catch ( ... ) {
auto msg = Utilities::stringf(
"Error, caught unknown exception in thread %i\n", thread_id );
d_errorHandler( msg );
}
} else {
work->run();
}
work->d_state = 3;
PROFILE_THREADPOOL_STOP( "thread working" );
AtomicOperations::atomic_increment( &d_N_finished );
// Check if any threads are waiting on the current work item
// This can be done without blocking
for ( int i = 0; i < MAX_WAIT; i++ ) {
const wait_ids_struct *wait = const_cast<const wait_ids_struct *>(d_wait[i]);
for ( auto &i : d_wait ) {
auto wait = AtomicOperations::atomic_get( &i );
if ( wait != nullptr )
wait->id_finished( work_id );
}
@@ -849,7 +897,7 @@ void ThreadPool::tpool_thread( int thread_id )
}
// Wait for work
PROFILE_THREADPOOL_STOP2( "thread active" );
d_wait_work.wait_for(1e-3);
d_wait_work.wait_for( 1e-3 );
PROFILE_THREADPOOL_START2( "thread active" );
AtomicOperations::atomic_increment( &d_num_active );
set_bit( d_active, thread_id );
@@ -865,21 +913,22 @@ void ThreadPool::tpool_thread( int thread_id )
/******************************************************************
* This is the function that adds work to the thread pool *
* Note: this version uses a last in - first out work scheduling. *
******************************************************************/
inline void ThreadPool::add_work( const ThreadPool::thread_id_t& id )
* This is the function that adds work to the thread pool *
* Note: this version uses a last in - first out work scheduling. *
******************************************************************/
inline void ThreadPool::add_work( const ThreadPool::thread_id_t &id )
{
auto work = id.work();
auto work = id.work();
work->d_state = 1;
// Check and change priorities of dependency ids
const int priority = id.getPriority();
for (int i=0; i<work->d_N_ids; i++) {
const auto& id1 = work->d_ids[i];
if ( !id1.started() && id1<id ) {
for ( int i = 0; i < work->d_N_ids; i++ ) {
const auto &id1 = work->d_ids[i];
if ( !id1.started() && id1 < id ) {
// Remove and add the id back with a higher priority
auto id2 = d_queue_list.remove( []( const thread_id_t& a, const thread_id_t& b ) { return a==b; }, id1 );
id2.setPriority( std::max(priority,id2.getPriority()) );
auto id2 = d_queue_list.remove(
[]( const thread_id_t &a, const thread_id_t &b ) { return a == b; }, id1 );
id2.setPriority( std::max( priority, id2.getPriority() ) );
d_queue_list.insert( id2 );
}
}
@@ -894,7 +943,7 @@ void ThreadPool::add_work(
if ( N > block_size ) {
size_t i = 0;
while ( i < N ) {
add_work( std::min(N-i,block_size), &work[i], &priority[i], &ids[i] );
add_work( std::min( N - i, block_size ), &work[i], &priority[i], &ids[i] );
i += block_size;
}
return;
@@ -905,7 +954,7 @@ void ThreadPool::add_work(
#endif
// Create the thread ids (can be done without blocking)
for ( size_t i = 0; i < N; i++ )
ids[i].reset( priority[i], AtomicOperations::atomic_decrement(&d_id_assign), work[i] );
ids[i].reset( priority[i], AtomicOperations::atomic_decrement( &d_id_assign ), work[i] );
#if MONITOR_THREADPOOL_PERFORMANCE
auto t2 = std::chrono::high_resolution_clock::now();
accumulate( total_add_work_time[0], t1, t2 );
@@ -913,23 +962,23 @@ void ThreadPool::add_work(
// If there are no threads, perform the work immediately
if ( d_N_threads < 1 ) {
for ( size_t i = 0; i < N; i++ ) {
work[i]->d_state = 2;
work[i]->d_state = 2;
work[i]->run();
work[i]->d_state = 3;
work[i]->d_state = 3;
}
#if MONITOR_THREADPOOL_PERFORMANCE
auto t5 = std::chrono::high_resolution_clock::now();
accumulate( total_add_work_time[4], t2, t5 );
#endif
#if MONITOR_THREADPOOL_PERFORMANCE
auto t5 = std::chrono::high_resolution_clock::now();
accumulate( total_add_work_time[4], t2, t5 );
#endif
PROFILE_THREADPOOL_STOP2( "add_work" );
return;
}
// Wait for enough room in the queue (doesn't need blocking since it isn't that precise)
if ( N > static_cast<size_t>( MAX_QUEUED - d_queue_list.size() ) ) {
int N_wait = static_cast<int>( N - ( MAX_QUEUED - d_queue_list.size() ) );
auto N_wait = static_cast<int>( N - ( MAX_QUEUED - d_queue_list.size() ) );
while ( N_wait > 0 ) {
d_signal_count = static_cast<unsigned char>( std::min( N_wait, 255 ) );
d_wait_finished.wait_for(1e-4);
d_wait_finished.wait_for( 1e-4 );
N_wait = static_cast<int>( N - ( MAX_QUEUED - d_queue_list.size() ) );
}
}
@@ -965,19 +1014,8 @@ void ThreadPool::add_work(
/******************************************************************
* This function removes a finished work item *
******************************************************************/
ThreadPool::WorkItem *ThreadPool::getFinishedWorkItem( ThreadPool::thread_id_t id ) const
{
if ( id.finished() )
return id.work();
return nullptr;
}
/******************************************************************
* This function waits for a some of the work items to finish *
******************************************************************/
* This function waits for a some of the work items to finish *
******************************************************************/
static inline void check_finished(
size_t N_work, const ThreadPool::thread_id_t *ids, size_t &N_finished, bool *finished )
{
@@ -1004,8 +1042,8 @@ int ThreadPool::wait_some(
N_finished++;
}
size_t local_id = ids[k].getLocalID();
bool test = local_id == 0 || local_id > thread_id_t::maxThreadID || local_id <= next_id;
test = test && !finished[k];
bool test = local_id == 0 || local_id > thread_id_t::maxThreadID || local_id <= next_id;
test = test && !finished[k];
if ( test )
throw std::logic_error( "Invalid ids for wait" );
}
@@ -1018,7 +1056,7 @@ int ThreadPool::wait_some(
auto tmp = new wait_ids_struct( N_work, ids, N_wait, d_cond_pool, MAX_WAIT, d_wait );
// Wait for the ids
auto t1 = std::chrono::high_resolution_clock::now();
while ( !tmp->wait_for(0.01) ) {
while ( !tmp->wait_for( 0.01 ) ) {
check_wait_time( t1 );
}
// Update the ids that have finished
@@ -1027,33 +1065,35 @@ int ThreadPool::wait_some(
throw std::logic_error( "Internal error: failed to wait" );
// Delete the wait event struct
// Note: we want to maintain the reference in case a thread is still using it
// Note: technically this should be atomic
std::swap(d_wait_last,tmp);
// Note: technically this should be atomic, but it really isn't necessary here
std::swap( d_wait_last, tmp );
delete tmp;
return N_finished;
}
/******************************************************************
* This function waits for all of the threads to finish their work *
******************************************************************/
void ThreadPool::check_wait_time( std::chrono::time_point<std::chrono::high_resolution_clock>& t1 ) const
* This function waits for all of the threads to finish their work *
******************************************************************/
void ThreadPool::check_wait_time(
std::chrono::time_point<std::chrono::high_resolution_clock> &t1 ) const
{
auto t2 = std::chrono::high_resolution_clock::now();
if ( std::chrono::duration_cast<std::chrono::seconds>(t2-t1).count() > MAX_WAIT_TIME_DEBUG ) {
std::cout << "Warning: Maximum wait time in ThreadPool exceeded, threads may be hung\n";
std::cout << "N_active: " << d_num_active << std::endl;
std::cout << "N_queued: " << d_queue_list.size() << std::endl;
std::cout << "N_added: " << d_N_added << std::endl;
std::cout << "N_started: " << d_N_started << std::endl;
std::cout << "N_finished: " << d_N_finished << std::endl;
std::cout << "queue.insert(): " << d_queue_list.N_insert() << std::endl;
std::cout << "queue.remove(): " << d_queue_list.N_remove() << std::endl;
std::cout << "Stack Trace:\n";
auto call_stack = StackTrace::getAllCallStacks( );
if ( std::chrono::duration_cast<std::chrono::seconds>( t2 - t1 ).count() > d_max_wait_time ) {
pout << "Warning: Maximum wait time in ThreadPool exceeded, threads may be hung\n";
pout << "N_active: " << d_num_active << std::endl;
pout << "N_queued: " << d_queue_list.size() << std::endl;
pout << "N_added: " << d_N_added << std::endl;
pout << "N_started: " << d_N_started << std::endl;
pout << "N_finished: " << d_N_finished << std::endl;
pout << "queue.insert(): " << d_queue_list.N_insert() << std::endl;
pout << "queue.remove(): " << d_queue_list.N_remove() << std::endl;
pout << "Stack Trace:\n";
auto call_stack = StackTrace::getAllCallStacks();
StackTrace::cleanupStackTrace( call_stack );
auto text = call_stack.print( " " );
for ( auto& line : text )
std::cout << line << std::endl;
for ( auto &line : text )
pout << line << std::endl;
t1 = std::chrono::high_resolution_clock::now();
}
}
@@ -1068,82 +1108,91 @@ void ThreadPool::wait_pool_finished() const
while ( d_num_active > 0 || d_queue_list.size() > 0 ) {
check_wait_time( t1 );
d_signal_empty = true;
d_wait_finished.wait_for(10e-6);
d_wait_finished.wait_for( 10e-6 );
}
d_signal_empty = false;
}
/******************************************************************
* Member functions of wait_ids_struct *
******************************************************************/
ThreadPool::wait_ids_struct::wait_ids_struct( size_t N, const ThreadPool::thread_id_t *ids, size_t N_wait,
AtomicOperations::pool<condition_variable,128>& cv_pool, int N_wait_list, volatile wait_ids_struct **list ):
d_wait( N_wait ),
d_N(0),
d_cv_pool( cv_pool ),
d_wait_event( cv_pool.get() )
* Member functions of wait_ids_struct *
******************************************************************/
ThreadPool::wait_ids_struct::wait_ids_struct( size_t N, const ThreadPool::thread_id_t *ids,
size_t N_wait, AtomicOperations::pool<condition_variable, 128> &cv_pool, int N_wait_list,
volatile wait_ids_struct **list )
: d_wait( N_wait ), d_N( 0 ), d_cv_pool( cv_pool ), d_wait_event( cv_pool.get() )
{
d_ids = new ThreadPool::thread_id_t[N];
for ( size_t i = 0; i < N; i++ ) {
if ( ids[i].finished() )
d_wait = std::max(d_wait-1,0);
d_wait = std::max( d_wait - 1, 0 );
else
d_ids[d_N++] = ids[i];
}
quicksort( d_N, d_ids );
d_finished = new bool[d_N];
memset((void*)d_finished,0,d_N);
memset( (void *) d_finished, 0, d_N );
int i = 0;
while ( !AtomicOperations::atomic_compare_and_swap( (void *volatile *) &list[i], nullptr, this ) ) { i = (i+1)%N_wait_list; }
while (
!AtomicOperations::atomic_compare_and_swap( (void *volatile *) &list[i], nullptr, this ) ) {
i = ( i + 1 ) % N_wait_list;
}
d_ptr = &list[i];
}
void ThreadPool::wait_ids_struct::id_finished( const ThreadPool::thread_id_t& id ) const
ThreadPool::wait_ids_struct::~wait_ids_struct()
{
d_cv_pool.put( d_wait_event );
delete[] d_finished;
delete[] d_ids;
}
void ThreadPool::wait_ids_struct::id_finished( const ThreadPool::thread_id_t &id ) const
{
int index = find_id( d_N, d_ids, id );
if ( index >= 0 ) {
d_finished[index] = true;
int N_finished = 0;
for (int i=0; i<d_N; i++)
N_finished += d_finished[i] ? 1:0;
int N_finished = 0;
for ( int i = 0; i < d_N; i++ )
N_finished += d_finished[i] ? 1 : 0;
if ( N_finished >= d_wait ) {
*d_ptr = nullptr;
d_N = 0;
d_wait = 0;
d_N = 0;
AtomicOperations::atomic_compare_and_swap(
(void *volatile *) d_ptr, (void *) *d_ptr, nullptr );
d_wait_event->notify_all();
}
}
}
bool ThreadPool::wait_ids_struct::wait_for( double seconds )
{
for (int i=0; i<d_N; i++) {
for ( int i = 0; i < d_N; i++ ) {
if ( d_ids[i].finished() )
d_finished[i] = true;
}
auto t1 = std::chrono::high_resolution_clock::now();
while ( true ) {
int N_finished = 0;
for (int i=0; i<d_N; i++)
N_finished += d_finished[i] ? 1:0;
if ( N_finished>=d_wait || d_N==0 ) {
for ( int i = 0; i < d_N; i++ )
N_finished += d_finished[i] ? 1 : 0;
if ( N_finished >= d_wait || d_N == 0 ) {
*d_ptr = nullptr;
d_wait = 0;
d_N = 0;
d_N = 0;
break;
}
auto t2 = std::chrono::high_resolution_clock::now();
if ( 1e-6*std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count() > seconds )
if ( 1e-6 * std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count() >
seconds )
return false;
d_wait_event->wait_for(1e-5);
d_wait_event->wait_for( 1e-5 );
}
return true;
}
/******************************************************************
* templated quicksort routine *
******************************************************************/
template <class T>
* templated quicksort routine *
******************************************************************/
template<class T>
void quicksort( int n, T *arr )
{
if ( n <= 1 )
@@ -1154,7 +1203,7 @@ void quicksort( int n, T *arr )
jstack = 0;
l = 0;
ir = n - 1;
while ( 1 ) {
while ( true ) {
if ( ir - l < 7 ) { // Insertion sort when subarray small enough.
for ( j = l + 1; j <= ir; j++ ) {
a = arr[j];
@@ -1231,8 +1280,8 @@ void quicksort( int n, T *arr )
/************************************************************************
* Function to find the id in a sorted vector *
************************************************************************/
* Function to find the id in a sorted vector *
************************************************************************/
inline int find_id( int n, const ThreadPool::thread_id_t *x, const ThreadPool::thread_id_t &id )
{
if ( n == 0 )
@@ -1243,7 +1292,7 @@ inline int find_id( int n, const ThreadPool::thread_id_t *x, const ThreadPool::t
if ( id < x[0] )
return -1;
if ( id == x[n - 1] )
return n-1;
return n - 1;
if ( id > x[n - 1] )
return -1;
// Perform the search
@@ -1264,13 +1313,13 @@ inline int find_id( int n, const ThreadPool::thread_id_t *x, const ThreadPool::t
/************************************************************************
* Function to add dependencies to the work item *
* Note: when expanding the size of d_ids, we need to allocate space for *
* one extra entry for a spinlock. *
************************************************************************/
* Function to add dependencies to the work item *
* Note: when expanding the size of d_ids, we need to allocate space for *
* one extra entry for a spinlock. *
************************************************************************/
void ThreadPool::WorkItem::add_dependencies( size_t N, const ThreadPool::thread_id_t *ids )
{
if ( d_state!=0 ) {
if ( d_state != 0 ) {
// The item has already been added to the threadpool,
// we are not allowed to add dependencies
throw std::logic_error(
@@ -1291,9 +1340,9 @@ void ThreadPool::WorkItem::add_dependencies( size_t N, const ThreadPool::thread_
for ( size_t i = 0; i < d_N_ids; i++ )
const_cast<thread_id_t &>( ids[i] ).swap( tmp[i] );
delete[] tmp;
d_size = N2;
int* lock = reinterpret_cast<int*>(&d_ids[d_size-1]);
*lock = 0;
d_size = N2;
auto *lock = reinterpret_cast<int *>( &d_ids[d_size - 1] );
*lock = 0;
}
const ThreadPool::thread_id_t id0;
for ( size_t i = 0; i < N; i++ ) {