#define _CRT_NONSTDC_NO_DEPRECATE #include "thread_pool.h" #include #include #include #include #include #include #include #include "ProfilerApp.h" #include "common/Utilities.h" #define perr std::cerr #define pout std::cout #define printp printf #define MONITOR_THREADPOOL_PERFORMANCE 0 #if 0 #define PROFILE_THREAD_START(X) PROFILE_START(X,3) #define PROFILE_THREAD_START2(X) PROFILE_START2(X,3) #define PROFILE_THREAD_STOP(X) PROFILE_STOP(X,3) #define PROFILE_THREAD_STOP2(X) PROFILE_STOP2(X,3) #else #define PROFILE_THREAD_START(X) do {} while(0) #define PROFILE_THREAD_START2(X) do {} while(0) #define PROFILE_THREAD_STOP(X) do {} while(0) #define PROFILE_THREAD_STOP2(X) do {} while(0) #endif // Include system dependent headers and define some functions #ifdef __WORDSIZE #define ARCH_SIZE __WORDSIZE #elif defined(_WIN64) #define ARCH_SIZE 64 #elif defined(_WIN32) // Note: WIN64 also defines WIN32 #define ARCH_SIZE 32 #endif #ifdef USE_WINDOWS #include #define get_time(x) QueryPerformanceCounter(x) #define get_frequency(f) QueryPerformanceFrequency(f) #define get_diff(start,end,f) \ static_cast(end.QuadPart-start.QuadPart)/static_cast(f.QuadPart) #define TIME_TYPE LARGE_INTEGER #elif defined(USE_LINUX) #include #include #define Sleep(x) usleep(1000*x) #define get_time(x) gettimeofday(x,NULL); #define get_frequency(f) (*f=timeval()) #define get_diff(start,end,f) 1e-6*static_cast( \ 0xF4240*(static_cast(end.tv_sec)-static_cast(start.tv_sec)) + \ (static_cast(end.tv_usec)-static_cast(start.tv_usec)) ) #define TIME_TYPE timeval #elif defined(USE_MAC) #include #include #include #define Sleep(x) usleep(1000*x) #define get_time(x) gettimeofday(x,NULL); #define get_frequency(f) (*f=timeval()) #define get_diff(start,end,f) 1e-6*static_cast( \ 0xF4240*(static_cast(end.tv_sec)-static_cast(start.tv_sec)) + \ (static_cast(end.tv_usec)-static_cast(start.tv_usec)) ) #define TIME_TYPE timeval #ifndef ARCH_SIZE #ifdef __LP64__ #define ARCH_SIZE 64 #else #define ARCH_SIZE 32 #endif #endif #else #error Unknown OS #endif // Check the ARCH_SIZE and set macros // Note: ARCH_SIZE must match the number of bits in size_t #if ARCH_SIZE == 64 // 32-bit macros #elif ARCH_SIZE == 32 // 64-bit macros #else #error Cannot identify 32 vs 64-bit #endif #define MAX(a,b) (((a) > (b)) ? (a) : (b)) #define MIN(a,b) (((a) < (b)) ? (a) : (b)) #if MONITOR_THREADPOOL_PERFORMANCE==1 static TIME_TYPE frequency; // Clock frequency (only used for windows) static double total_add_work_time[3] = {0,0,0}; #endif // Helper functions template void quicksort(std::vector &x); static inline bool find_id(const std::vector &x_in, const ThreadPool::thread_id_t &id ); // Function to generate a random size_t number (excluding 0 and ~0) static size_t rand_size_t() { size_t key = 0; double tmp = 1; if ( sizeof(size_t)==4 ) { while ( tmp < 4e9 ) { key ^= rand()*0x9E3779B9; // 2^32*0.5*(sqrt(5)-1) tmp *= RAND_MAX; } } else if ( sizeof(size_t)==8 ) { while ( tmp < 1.8e19 ) { key ^= rand()*0x9E3779B97F4A7C15; // 2^64*0.5*(sqrt(5)-1) tmp *= RAND_MAX; } } else { throw std::logic_error("Unhandled case"); } if ( key==0 || (~key)==0 ) key = rand_size_t(); return key; } /****************************************************************** * Run some basic compile-time checks * ******************************************************************/ #if MAX_NUM_THREADS%64 != 0 // We use a bit array for d_active and d_cancel #error MAX_NUM_THREADS must be a multiple of 64 #endif #if MAX_NUM_THREADS >= 65535 // We store N_threads as a short int #error MAX_NUM_THREADS must < 65535 #endif #if MAX_QUEUED >= 65535 // We store the indicies to the queue list as short ints #error MAX_QUEUED must < 65535 #endif /****************************************************************** * Convert a string to binary * ******************************************************************/ template static inline std::string convert_binary(T x) { char buffer[65]; T mask = ((size_t)1)<<(8*sizeof(T)-1); for (size_t i=0; i<8*sizeof(T); i++) { if ( ( x & mask ) == 0 ) buffer[i] = '0'; else buffer[i] = '1'; mask >>= 1; } buffer[8*sizeof(T)] = 0; return std::string(buffer); } /****************************************************************** * Get/Set a bit * ******************************************************************/ static inline void set_bit( volatile ThreadPool::uint64* x, size_t index, bool val ) { ThreadPool::uint64 mask = 0x01; mask <<= index%64; if ( val ) x[index/64] |= mask; else x[index/64] &= ~mask; } static inline bool get_bit( const volatile ThreadPool::uint64* x, size_t index ) { ThreadPool::uint64 mask = 0x01; mask <<= index%64; return (x[index/64]&mask)!=0; } /****************************************************************** * Some mutex helper functions * ******************************************************************/ #if defined(USE_LINUX) || defined(USE_MAC) // Store a set of global attributes for the thread pool static pthread_mutexattr_t threadpool_global_attr; static int initialize_threadpool_global_attr() { pthread_mutexattr_init(&threadpool_global_attr); #ifdef __USE_UNIX98 pthread_mutexattr_settype( &threadpool_global_attr, PTHREAD_MUTEX_ERRORCHECK ); #endif return 1; } static int threadpool_global_attr_dummy = 0; static inline void throw_pthread_error( std::string msg, int value ) { std::string code; if ( value==0 ) { code = "SUCCESS"; } else if ( value==EINVAL ) { code = "EINVAL"; } else if ( value==EBUSY ) { code = "EBUSY"; } else if ( value==EAGAIN ) { code = "EAGAIN"; } else if ( value==EDEADLK ) { code = "EDEADLK"; } else if ( value==EPERM ) { code = "EPERM"; } else { char tmp[100]; sprintf(tmp,"Unknown (%i)",value); code = std::string(tmp); } throw std::logic_error(msg+code); } #endif #ifdef USE_WINDOWS static inline void lock_mutex( CRITICAL_SECTION *lock ) { EnterCriticalSection(lock); } static inline void unlock_mutex( CRITICAL_SECTION *lock ) { LeaveCriticalSection(lock); } static CRITICAL_SECTION* create_mutex( ) { CRITICAL_SECTION *lock = new CRITICAL_SECTION; if (!InitializeCriticalSectionAndSpinCount(lock,0x00000400) ) throw std::exception(); return lock; } static void destroy_mutex( CRITICAL_SECTION *lock ) { DeleteCriticalSection(lock); delete lock; } #elif defined(USE_LINUX) || defined(USE_MAC) static inline void lock_mutex( pthread_mutex_t *lock ) { int retval = pthread_mutex_lock(lock); if ( retval != 0 ) throw_pthread_error("Error locking mutex: ",retval); } static inline void unlock_mutex( pthread_mutex_t *lock ) { int retval = pthread_mutex_unlock(lock); if ( retval != 0 ) throw_pthread_error("Error unlocking mutex: ",retval); } static pthread_mutex_t* create_mutex( ) { pthread_mutex_t* lock = NULL; #if defined(USE_LINUX) || defined(USE_MAC) if (threadpool_global_attr_dummy!=1) threadpool_global_attr_dummy = initialize_threadpool_global_attr(); #endif // We are creating a new mutex lock = new pthread_mutex_t; int error = pthread_mutex_init(lock,&threadpool_global_attr); if ( error != 0 ) throw_pthread_error("Error initializing mutex: ",error); return lock; } static void destroy_mutex( pthread_mutex_t* lock ) { pthread_mutex_destroy(lock); delete lock; } #else #error Unknown OS #endif /****************************************************************** * Mutex class * ******************************************************************/ Mutex::Mutex() { d_lock = create_mutex(); d_recursive = false; d_count = new int; d_lock_count = new int; d_thread = new size_t; *d_count = 1; *d_lock_count = 0; *d_thread = 0; } Mutex::Mutex(bool recursive) { d_lock = create_mutex(); d_recursive = recursive; d_count = new int; d_lock_count = new int; d_thread = new size_t; *d_count = 1; *d_lock_count = 0; *d_thread = 0; } Mutex::Mutex(const Mutex& rhs) { rhs.lock(); d_lock = rhs.d_lock; d_count = rhs.d_count; d_recursive = rhs.d_recursive; d_lock_count = rhs.d_lock_count; d_thread = rhs.d_thread; ++(*d_count); rhs.unlock(); } Mutex& Mutex::operator=(const Mutex& rhs) { if (this == &rhs) // protect against invalid self-assignment return *this; rhs.lock(); this->d_lock = rhs.d_lock; this->d_count = rhs.d_count; this->d_recursive = rhs.d_recursive; this->d_lock_count = rhs.d_lock_count; this->d_thread = rhs.d_thread; ++(*this->d_count); rhs.unlock(); return *this; } Mutex::~Mutex() { lock(); bool destroy = (*d_count)==1; (*d_count)--; unlock(); if ( destroy ) { delete d_count; delete d_lock_count; delete d_thread; destroy_mutex(d_lock); } } void Mutex::lock() const { // Check if we already own the lock size_t id = ThreadPool::getThreadId(); if ( *d_lock_count>0 && *d_thread==id ) { if ( !d_recursive ) throw std::logic_error("Lock is already locked and non-recursive"); // Increment the lock count and return ++(*d_lock_count); return; } // Acquire the lock lock_mutex(d_lock); if ( *d_lock_count != 0 ) // If we are getting the lock, the count must be 0 throw std::logic_error("Internal error"); *d_lock_count = 1; // Change lock count after acquiring mutex *d_thread = id; } bool Mutex::tryLock() const { // Check if we already own the lock size_t id = ThreadPool::getThreadId(); if ( *d_lock_count>0 && *d_thread==id ) { if ( !d_recursive ) return false; // Increment the lock count and return ++(*d_lock_count); return true; } // Try and acquire the lock #ifdef USE_WINDOWS bool success = TryEnterCriticalSection(d_lock)!=0; #elif defined(USE_LINUX) || defined(USE_MAC) bool success = pthread_mutex_trylock(const_cast(d_lock))==0; #else #error Unknown OS #endif if ( success ) { if ( *d_lock_count != 0 ) // If we are getting the lock, the count must be 0 throw std::logic_error("Internal error"); *d_lock_count = 1; // Chage lock count after acquiring mutex *d_thread = id; } return success; } void Mutex::unlock() const { // Check if we already own the lock size_t id = ThreadPool::getThreadId(); if ( *d_lock_count <= 0 ) throw std::logic_error("Trying to release a lock that has not been locked"); if ( *d_thread != id ) throw std::logic_error("Thread that does not own lock is attempting to release"); // Release the lock --(*d_lock_count); // Change lock count before releasing mutex if ( *d_lock_count == 0 ) { *d_thread = 0; unlock_mutex(d_lock); } } bool Mutex::ownLock() const { size_t id = ThreadPool::getThreadId(); if ( *d_lock_count>0 && *d_thread==id ) return true; return false; } /****************************************************************** * Functions to deal with the signaling * ******************************************************************/ #ifdef USE_WINDOWS static inline bool SIGNAL_EVENT(HANDLE event) { SetEvent(event); return false; } #elif defined(USE_LINUX) || defined(USE_MAC) static inline bool SIGNAL_EVENT(pthread_cond_t *event) { int retval = pthread_cond_signal(event); if ( retval == -1 ) { perr << "Error signaling event\n"; return true; } return false; } #else #error Not programmed #endif /****************************************************************** * Simple function to check if the parity is odd (true) or even * ******************************************************************/ static inline bool is_odd8(size_t x) { // This only works for 64-bit integers x ^= (x >> 1); x ^= (x >> 2); x ^= (x >> 4); x ^= (x >> 8); x ^= (x >> 16); x ^= (x >> 32); return (x & 0x01) > 0; } template static inline int count_bits(int_type x) { int count = 0; for (size_t i=0; i<8*sizeof(int_type); i++) { if ( (x>>i)&0x01 ) ++count; } return count; } /****************************************************************** * Set the bahvior of OS warnings * ******************************************************************/ static int global_OS_behavior = 0; void ThreadPool::set_OS_warnings( int behavior ) { ASSERT(behavior>=0&&behavior<=2); global_OS_behavior = behavior; } static void OS_warning( const std::string& message ) { if ( global_OS_behavior==0 ) { pout << "Warning: " << message << std::endl; } else if ( global_OS_behavior==2 ) { perr << "Error: " << message << std::endl; } } /****************************************************************** * Function to return the number of prcessors availible * ******************************************************************/ int ThreadPool::getNumberOfProcessors() { #if defined(USE_LINUX) || defined(USE_MAC) return sysconf( _SC_NPROCESSORS_ONLN ); #elif defined(USE_WINDOWS) SYSTEM_INFO sysinfo; GetSystemInfo( &sysinfo ); return static_cast(sysinfo.dwNumberOfProcessors); #else #error Unknown OS #endif } /****************************************************************** * Function to return the processor number of the current thread * ******************************************************************/ int ThreadPool::getCurrentProcessor() { #if defined(USE_LINUX) return sched_getcpu()+1; #elif defined(USE_MAC) OS_warning("MAC does not support getCurrentProcessor"); return 0; #elif defined(USE_WINDOWS) return GetCurrentProcessorNumber()+1; #else #error Unknown OS #endif } /****************************************************************** * Function to get/set the affinity of the current process * ******************************************************************/ std::vector ThreadPool::getProcessAffinity() { std::vector procs; #ifdef USE_LINUX #ifdef _GNU_SOURCE cpu_set_t mask; int error = sched_getaffinity(getpid(), sizeof(cpu_set_t), &mask ); if ( error!=0 ) throw std::logic_error("Error getting process affinity"); for (int i=0; i<(int)sizeof(cpu_set_t)*CHAR_BIT; i++) { if ( CPU_ISSET(i,&mask) ) procs.push_back(i); } #else #warning sched_getaffinity is not supported for this compiler/OS OS_warning("sched_getaffinity is not supported for this compiler/OS"); procs.clear(); #endif #elif defined(USE_MAC) // MAC does not support getting or setting the affinity OS_warning("MAC does not support getting the process affinity"); procs.clear(); #elif defined(USE_WINDOWS) HANDLE hProc = GetCurrentProcess(); size_t procMask; size_t sysMask; PDWORD_PTR procMaskPtr = reinterpret_cast(&procMask); PDWORD_PTR sysMaskPtr = reinterpret_cast(&sysMask); GetProcessAffinityMask(hProc,procMaskPtr,sysMaskPtr); for (int i=0; i<(int)sizeof(size_t)*CHAR_BIT; i++) { if ( (procMask&0x1) != 0 ) procs.push_back(i); procMask >>= 1; } #else #error Unknown OS #endif return procs; } void ThreadPool::setProcessAffinity( std::vector procs ) { #ifdef USE_LINUX #ifdef _GNU_SOURCE cpu_set_t mask; CPU_ZERO(&mask); for (size_t i=0; i ThreadPool::getThreadAffinity() { std::vector procs; #ifdef USE_LINUX #ifdef _GNU_SOURCE cpu_set_t mask; int error = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask ); if ( error!=0 ) throw std::logic_error("Error getting thread affinity"); for (int i=0; i<(int)sizeof(cpu_set_t)*CHAR_BIT; i++) { if ( CPU_ISSET(i,&mask) ) procs.push_back(i); } #else #warning pthread_getaffinity_np is not supported OS_warning("pthread does not support pthread_getaffinity_np"); procs.clear(); #endif #elif defined(USE_MAC) // MAC does not support getting or setting the affinity OS_warning("MAC does not support getting the thread affinity"); procs.clear(); #elif defined(USE_WINDOWS) size_t procMask = GetThreadAffinityMask(GetCurrentThread()); for (int i=0; i<(int)sizeof(size_t)*CHAR_BIT; i++) { if ( (procMask&0x1) != 0 ) procs.push_back(i); procMask >>= 1; } #else #error Unknown OS #endif return procs; } std::vector ThreadPool::getThreadAffinity( int thread ) const { if ( thread >= getNumThreads() ) std::logic_error("Invalid thread number"); std::vector procs; #ifdef USE_LINUX #ifdef _GNU_SOURCE cpu_set_t mask; int error = pthread_getaffinity_np(d_hThread[thread], sizeof(cpu_set_t), &mask ); if ( error!=0 ) throw std::logic_error("Error getting thread affinity"); for (int i=0; i<(int)sizeof(cpu_set_t)*CHAR_BIT; i++) { if ( CPU_ISSET(i,&mask) ) procs.push_back(i); } #else #warning pthread_getaffinity_np is not supported OS_warning("pthread does not support pthread_getaffinity_np"); procs.clear(); #endif #elif defined(USE_MAC) // MAC does not support getting or setting the affinity OS_warning("MAC does not support getting the thread affinity"); procs.clear(); #elif defined(USE_WINDOWS) size_t procMask = GetThreadAffinityMask(d_hThread[thread]); for (int i=0; i<(int)sizeof(size_t)*CHAR_BIT; i++) { if ( (procMask&0x1) != 0 ) procs.push_back(i); procMask >>= 1; } #else #error Unknown OS #endif return procs; } /****************************************************************** * Function to set the thread affinity * ******************************************************************/ void ThreadPool::setThreadAffinity( std::vector procs ) { #ifdef USE_LINUX #ifdef _GNU_SOURCE cpu_set_t mask; CPU_ZERO(&mask); for (size_t i=0; i procs ) const { if ( thread >= getNumThreads() ) std::logic_error("Invalid thread number"); #ifdef USE_LINUX #ifdef __USE_GNU cpu_set_t mask; CPU_ZERO(&mask); for (size_t i=0; i(MAXID64-i).c_str()); pass = false; } } } initialize_id(); advance_id(); advance_id(); ThreadPool::thread_id_t id2; id2.reset(3,d_id_assign,NULL); if ( isValid(id) || !isValid(id2) ) pass = false; if ( !pass ) { throw std::logic_error("Thread pool failed to initialize"); } } /****************************************************************** * Function to initialize the thread pool * ******************************************************************/ void ThreadPool::initialize( const int N, const char* affinity, int N_procs, const int* procs ) { // Get the clock frequency #if MONITOR_THREADPOOL_PERFORMANCE==1 get_frequency( &frequency ); #endif // Initialize the header/tail d_NULL_HEAD = rand_size_t(); d_NULL_TAIL = d_NULL_HEAD; for (int i=0; id_N_threads<0 || tpool->d_N_threads>MAX_NUM_THREADS ) return false; if ( tpool->d_NULL_HEAD==0 || tpool->d_NULL_HEAD!=tpool->d_NULL_TAIL ) return false; return true; } /****************************************************************** * This function creates the threads in the thread pool * ******************************************************************/ void ThreadPool::setNumThreads( int num_worker_threads, const char* affinity2, int N_procs, const int* procs ) { // Check if we are a member thread if ( isMemberThread() ) throw std::logic_error("Member threads are not allowed to change the number of threads in the pool"); // Determing the number of threads we need to create or destroy if ( num_worker_threads > MAX_NUM_THREADS ) { printp("Warning: Maximum Number of Threads is %i\n",MAX_NUM_THREADS); printp(" Only that number will be created\n"); num_worker_threads = MAX_NUM_THREADS; } else if ( num_worker_threads < 0 ) { printp("Error: cannot have a negitive number of threads\n"); printp(" Setting the number of threads to 0\n"); num_worker_threads = 0; } int d_N_threads_diff = num_worker_threads-d_N_threads; if ( d_N_threads_diff > 0 ) { // Create new threads lock_mutex(d_lock_queue); // Check that no threads are in the process of being deleted for (int i=0; id_N_threads_diff; i--) set_bit(d_cancel,d_N_threads-1+i,true); #ifdef USE_WINDOWS // Release the lock unlock_mutex(d_lock_queue); // Wake all threads to process the shutdown (Doesn't require blocking) for (int i=0; id_N_threads_diff; i--) { int rtn = pthread_join(d_hThread[d_N_threads-1+i],NULL); if ( rtn != 0 ) { perr << "error\n"; perr << "Error joining threads"; } } #else #error Not programmed #endif for (int i=0; i>d_N_threads_diff; i--) { set_bit(d_cancel,d_N_threads-1+i,false); d_hThread[d_N_threads-1+i] = 0; d_ThreadId[d_N_threads-1+i] = ~((size_t)0); } d_N_threads += d_N_threads_diff; } if ( d_N_threads == 0 ) return; // Get the default thread affinity to use std::vector cpus; int tmp = global_OS_behavior; global_OS_behavior = 1; OS_warning("Dummy message (should not print)"); try { cpus = ThreadPool::getProcessAffinity(); } catch(...) { pout << "Warning: Unable to get default cpus for thread affinities\n"; } if ( !cpus.empty() && N_procs>0 ) { cpus.resize(N_procs); for (int i=0; i > t_procs(d_N_threads); std::string affinity(affinity2); if ( cpus.empty() ) { // We do not have a list of cpus to use, do nothing (OS not supported) } else if ( affinity=="none" ) { // We are using the default thread affinities (all threads get all procs of the program) for (int i=0; i(1,cpus[i]); } else if ( (int) cpus.size() > d_N_threads ) { // There are more cpus than threads, threads will use more the one processor int N_procs_thread = (cpus.size()+d_N_threads-1)/d_N_threads; size_t k = 0; for (int i=0; i cpus2 = getThreadAffinity( i ); if ( cpus2 != t_procs[i] ) pout << "Warning: error setting affinities (failed to set)\n"; } } catch (...) { pout << "Warning: error setting affinities (exception)\n"; } global_OS_behavior = tmp; } /****************************************************************** * Get an item in the work queue that is ready to be processed * ******************************************************************/ int ThreadPool::getThreadNumber() const { size_t id = getThreadId(); int index = 0; for (int i=0; i(d_queue_ids); const queue_list_struct *list = const_cast(d_queue_list); short int index = d_queue_head; short int index2 = check_dependecies(list,ids,index); while ( index2==-1 && index!=-1 ) { index = d_queue_list[index].next; index2 = index==-1 ? -1:check_dependecies(list,ids,index); } return index2; } inline short int ThreadPool::check_dependecies( const ThreadPool::queue_list_struct *list, const thread_id_t *queue, short int index ) { if ( index==-1 ) return -1; WorkItem* work = reinterpret_cast(queue[index].d_work); // Loop through the dependencies, removing any that have finished, // and search for any that have not started (keeping the one with the fewest dependencies) size_t N_active = 0; thread_id_t* ids = work->d_ids; short int index2 = index; int N_dependencies = static_cast(work->d_N_ids); for (int i=N_dependencies-1; i>=0; i--) { WorkItem* work2 = reinterpret_cast(ids[i].d_work); char state = work2->d_state; if ( state==0 ) { // We found a new potential item to process index2 = work2->d_tpool_index; index2 = check_dependecies(list,queue,index2); if ( index2 != -1 ) break; } else if ( state==1 || state==-1 ) { // We found an item that is processing N_active++; } else if ( state==2 ) { // The item has finished ids[i].reset(); std::swap(ids[i],ids[work->d_N_ids-1]); work->d_N_ids--; continue; } } if ( N_active>0 ) { // Some dependencies are working, choose a different work item index2 = -1; } return index2; } /****************************************************************** * This is the function that controls the individual thread and * * allows it to do work. * ******************************************************************/ void ThreadPool::tpool_thread(int thread_id) { if ( getThreadId()==0 ) throw std::logic_error("Invalid thread id"); bool shutdown = false; bool printInfo = false; d_ThreadId[thread_id] = getThreadId(); // Acquire mutex lock_mutex(d_lock_queue); if ( get_bit(d_active,thread_id) ) throw std::logic_error("Thread cannot already be active"); d_num_active++; set_bit(d_active,thread_id,true); set_bit(d_cancel,thread_id,false); if ( printInfo ) { // Print the pid printp("pid = %i\n",(int)getpid()); // Print the processor affinities for the process try { std::vector cpus = ThreadPool::getProcessAffinity(); printp("%i cpus for current thread: ",(int)cpus.size()); for (size_t i=0; i0 ) { // Get next work item to process short int work_index = ThreadPool::get_work_item(); if ( work_index==-1 ) { unlock_mutex(d_lock_queue); Sleep(0); lock_mutex(d_lock_queue); continue; } // Remove the work item from the queue #ifdef D_DEBUG short int cur = d_queue_list[work_index].position; #endif short int next = d_queue_list[work_index].next; short int prev = d_queue_list[work_index].prev; if ( prev==-1 ) { d_queue_head = next; } else { d_queue_list[prev].next = next; } if ( next!=-1 ) { d_queue_list[next].prev = prev; } --d_queue_size; #ifdef D_DEBUG if ( cur!=work_index || ( d_queue_size>0 && d_queue_head==-1 ) ) throw std::logic_error("Internal error with threadpool"); #endif thread_id_t work_id = const_cast(d_queue_ids[work_index]); d_queue_ids[work_index].reset(); d_queue_list[work_index].reset(); d_queue_list[work_index].next = d_queue_free; d_queue_free = work_index; WorkItem* work = reinterpret_cast(work_id.d_work); work->d_state = -1; // Release mutex unlock_mutex(d_lock_queue); // Start work here PROFILE_THREAD_START("thread working"); work->run(); if ( work->d_state!=2 ) { throw std::logic_error("Work item is not changing state"); } PROFILE_THREAD_STOP("thread working"); // Work finished, acquire mutex and remove it from the active list lock_mutex(d_lock_queue); // Check if any threads are waiting on the current work item for (int i=0; i(d_wait[i]); bool found = false; if ( wait->ids.empty() ) { // Special case where we just want to wait for any work items to finish found = true; } else { found = find_id( wait->ids, work_id ); } if ( found ) { wait_type event = 0; volatile int* count = &(wait->count); if ( *count == 1 ) event = const_cast(wait->wait_event); --(*count); if ( event != 0 ) SIGNAL_EVENT(event); } } // Check the signal count and signal if desired if ( d_signal_count > 0 ) { --d_signal_count; if ( d_signal_count == 0 ) SIGNAL_EVENT(d_wait_finished); } } else { int N_active = --d_num_active; set_bit(d_active,thread_id,false); // Alert main thread that a thread finished processing if ( N_active==0 ) { if ( d_signal_empty ) { SIGNAL_EVENT(d_wait_finished); d_signal_empty = false; } } // Wait for work PROFILE_THREAD_STOP2("thread active"); #ifdef USE_WINDOWS unlock_mutex(d_lock_queue); SuspendThread(d_hThread[thread_id]); lock_mutex(d_lock_queue); #elif defined(USE_LINUX) || defined(USE_MAC) pthread_cond_wait(d_queue_not_empty,d_lock_queue); #endif PROFILE_THREAD_START2("thread active"); ++d_num_active; set_bit(d_active,thread_id,true); } // Check if there is a shutdown requested shutdown = get_bit(d_cancel,thread_id); } PROFILE_THREAD_STOP("thread active"); d_num_active--; set_bit(d_active,thread_id,false); // Release mutex unlock_mutex(d_lock_queue); return; } /****************************************************************** * This is the function that adds work to the thread pool * * Note: this version uses a last in - first out work scheduling. * ******************************************************************/ void ThreadPool::add_work( size_t N, ThreadPool::WorkItem* work[], const int* priority, ThreadPool::thread_id_t* ids ) { #if MONITOR_THREADPOOL_PERFORMANCE TIME_TYPE start_time_local; get_time(&start_time_local); #endif // If we have a very long list, break it up into smaller pieces to keep the threads busy const size_t block_size = MAX_QUEUED/4; if ( N > block_size ) { size_t N_sets = (N+block_size-1)/block_size; for (size_t i=0; i(block_size,N-index); add_work( N2, &work[index], &priority[index], &ids[index] ); } return; } // Create the thread ids (can be done without blocking) for (size_t i=0; id_tpool_index = -2; } // If there are no threads, perform the work immediately if ( d_N_threads < 1 ) { for (size_t i=0; irun(); } return; } // Wait for enough room in the queue (doesn't need blocking since it isn't that precise) if ( N > static_cast(MAX_QUEUED-d_queue_size) ) { int N_wait = static_cast( N - (MAX_QUEUED-d_queue_size) ); while ( N_wait > 0 ) { d_signal_count = static_cast(std::min(N_wait,255)); #ifdef USE_WINDOWS DWORD ret = WaitForSingleObject( d_wait_finished, INFINITE ); #elif defined(USE_LINUX) || defined(USE_MAC) lock_mutex(d_lock_queue); if ( d_signal_count > 0 ) pthread_cond_wait(d_wait_finished,d_lock_queue); unlock_mutex(d_lock_queue); #else #error Not programmed #endif N_wait = static_cast( N - (MAX_QUEUED-d_queue_size) ); } } // Get the lock and add the work items lock_mutex(d_lock_queue); #if MONITOR_THREADPOOL_PERFORMANCE TIME_TYPE stop_time_local; get_time(&stop_time_local); total_add_work_time[0] += get_diff(start_time_local,stop_time_local,frequency); #endif // Next create the work items and add them to the queue for (size_t i=0; i(&d_queue_list[d_queue_free]); d_queue_free = work_item->next; work_item->next = -1; work_item->prev = -1; d_queue_ids[work_item->position] = ids[i]; reinterpret_cast(ids[i].d_work)->d_tpool_index = work_item->position; if ( d_queue_head==-1 ) { d_queue_head = work_item->position; } else if ( ids[i] > d_queue_ids[d_queue_list[d_queue_head].position] ) { work_item->next = d_queue_head; d_queue_list[d_queue_head].prev = work_item->position; d_queue_head = work_item->position; } else { short int prev = d_queue_head; short int cur = d_queue_list[prev].next; while ( cur!=-1 ) { if ( d_queue_ids[cur] < ids[i] ) break; prev = cur; cur = d_queue_list[prev].next; } work_item->prev = prev; work_item->next = cur; if ( cur != -1 ) d_queue_list[cur].prev = work_item->position; d_queue_list[prev].next = work_item->position; } ++d_queue_size; } int num_active2 = d_num_active; // Copy the number of active threads to a local variable unlock_mutex(d_lock_queue); #if MONITOR_THREADPOOL_PERFORMANCE get_time(&stop_time_local); total_add_work_time[1] += get_diff(start_time_local,stop_time_local,frequency); #endif // Activate sleeping threads #ifdef USE_WINDOWS for (int i=0; i(id.d_work)->d_state==2; } /****************************************************************** * This function removes a finished work item * ******************************************************************/ ThreadPool::WorkItem* ThreadPool::getFinishedWorkItem(ThreadPool::thread_id_t id) const { if ( !isValid(id) ) return NULL; if ( reinterpret_cast(id.d_work)->d_state!=2 ) return NULL; // Return the result WorkItem* work = reinterpret_cast(id.d_work); return work; } /****************************************************************** * This function waits for a some of the work items to finish * ******************************************************************/ static inline void check_finished( size_t N_work, const ThreadPool::thread_id_t *ids, size_t& N_finished, bool* finished) { for (size_t k=0; kN_work ) { printp("Invalid arguments in thread pool wait (%i,%i)\n",(int)N_work,(int)N_wait); return -1; } size_t N_finished = 0; memset(finished,0,N_work*sizeof(bool)); // Check that all the ids are valid size_t next_id = d_id_assign-1; for (size_t k=0; kMAXID64 || local_id<=next_id; test = test && !finished[k]; if ( test ) { printp("Invalid ids for wait\n"); return -1; } } // Check which ids have finished check_finished(N_work,ids,N_finished,finished); // If enough ids have finished return if ( N_finished >= N_wait ) { return 0; } // Acquire the lock and update the finished list // It is possible that in the time required to acquire the lock, the work items may finish lock_mutex(d_lock_queue); check_finished(N_work,ids,N_finished,finished); if ( N_finished >= N_wait ) { unlock_mutex(d_lock_queue); return 0; } // Create the wait event struct wait_event_struct* tmp = new wait_event_struct(&wait_pool); wait_type event = tmp->wait_event; tmp->count = static_cast(N_wait-N_finished); tmp->ids.reserve(N_wait-N_finished); for (size_t k=0; kids.push_back(ids[k]); } quicksort(tmp->ids); d_wait[d_N_wait] = tmp; d_N_wait++; // Wait for a signal indicating that a thread has finished #ifdef USE_WINDOWS unlock_mutex(d_lock_queue); DWORD ret = WaitForSingleObject( event, INFINITE ); lock_mutex(d_lock_queue); #elif defined(USE_LINUX) || defined(USE_MAC) pthread_cond_wait(event,d_lock_queue); #endif // Check for remaining references to the wait struct and delete the structure for (int k=0; k0 || d_queue_size>0 ) { d_signal_empty = true; #ifdef USE_WINDOWS unlock_mutex(d_lock_queue); DWORD ret = WaitForSingleObject( d_wait_finished, INFINITE ); lock_mutex(d_lock_queue); #elif defined(USE_LINUX) || defined(USE_MAC) pthread_cond_wait(d_wait_finished,d_lock_queue); #else #error Not programmed #endif } d_signal_empty = false; unlock_mutex(d_lock_queue); } /****************************************************************** * These functions create the unique id to assign each work item * * If id is a 32-bit number we have 4e9 possible work items * * If id is a 64-bit number we have 9e19 possible work items and * * we have some checking that will catch some invalid ids * ******************************************************************/ inline void ThreadPool::initialize_id() { // Note that the best option is to use a 64-bit integer if ( sizeof(size_t)==8 ) { // Set the starting value to 2^56-3 d_id_assign = MAXID64; } else if ( sizeof(size_t)==4 ) { // Set the starting value to 2^32-3 d_id_assign = MAXID32; } else { throw std::logic_error("Internal error: failed to initialize ids"); } } inline size_t ThreadPool::advance_id() { size_t id = AtomicOperations::atomic_decrement( &d_id_assign ); if ( id==0 ) throw std::logic_error("Ran out of valid ids"); return id; } /****************************************************************** * Function to check if the current thread is a member thread * ******************************************************************/ inline bool ThreadPool::isMemberThread() const { size_t id = getThreadId(); for (int i=0; ipop(); } ThreadPool::wait_event_struct::~wait_event_struct( ) { d_wait_pool->push(wait_event); } /****************************************************************** * Member functions of wait_pool_struct * ******************************************************************/ ThreadPool::wait_pool_struct::wait_pool_struct( ) { d_size = 16; d_count = 0; d_pool = new wait_type[d_size]; memset(const_cast(d_pool),0,d_size*sizeof(wait_type)); d_lock = create_mutex( ); } ThreadPool::wait_pool_struct::~wait_pool_struct( ) { for (size_t i=0; i= d_size ) { volatile wait_type* tmp = d_pool; d_pool = new wait_type[2*d_size]; memset((void*)d_pool,0,2*d_size*sizeof(wait_type)); memcpy((void*)d_pool,(void*)tmp,d_size*sizeof(wait_type)); delete [] d_pool; d_size = 2*d_size; } d_pool[d_count] = event; ++d_count; unlock_mutex(d_lock); } ThreadPool::wait_type ThreadPool::wait_pool_struct::pop( ) { lock_mutex(d_lock); wait_type event = 0; if ( d_count == 0 ) { #ifdef USE_WINDOWS event = CreateEvent(NULL,FALSE,FALSE,NULL); #elif defined(USE_LINUX) || defined(USE_MAC) event = new pthread_cond_t; int error = pthread_cond_init(event,NULL); if ( error == -1 ) std::logic_error("Error creating wait_event"); #else #error Not programmed #endif } else { event = d_pool[d_count-1]; --d_count; } unlock_mutex(d_lock); return event; } /****************************************************************** * templated quicksort routine * ******************************************************************/ template void quicksort(std::vector &x) { int n = (int) x.size(); if ( n <= 1 ) return; T *arr = &x[0]; bool test; int i, ir, j, jstack, k, l, istack[100]; T a, tmp_a; jstack = 0; l = 0; ir = n-1; while (1) { if ( ir-l < 7 ) { // Insertion sort when subarray small enough. for ( j=l+1; j<=ir; j++ ) { a = arr[j]; test = true; for (i=j-1; i>=0; i--) { if ( arr[i] < a ) { arr[i+1] = a; test = false; break; } arr[i+1] = arr[i]; } if ( test ) { i = l-1; arr[i+1] = a; } } if ( jstack==0 ) return; ir = istack[jstack]; // Pop stack and begin a new round of partitioning. l = istack[jstack-1]; jstack -= 2; } else { k = (l+ir)/2; // Choose median of left, center and right elements as partitioning // element a. Also rearrange so that a(l) < a(l+1) < a(ir). tmp_a = arr[k]; arr[k] = arr[l+1]; arr[l+1] = tmp_a; if ( arr[l]>arr[ir] ) { tmp_a = arr[l]; arr[l] = arr[ir]; arr[ir] = tmp_a; } if ( arr[l+1] > arr[ir] ) { tmp_a = arr[l+1]; arr[l+1] = arr[ir]; arr[ir] = tmp_a; } if ( arr[l] > arr[l+1] ) { tmp_a = arr[l]; arr[l] = arr[l+1]; arr[l+1] = tmp_a; } // Scan up to find element > a j = ir; a = arr[l+1]; // Partitioning element. for (i=l+2; i<=ir; i++) { if ( arr[i]a ) // Scan down to find element < a. j--; if ( j < i ) break; // Pointers crossed. Exit with partitioning complete. tmp_a = arr[i]; // Exchange elements of both arrays. arr[i] = arr[j]; arr[j] = tmp_a; } arr[l+1] = arr[j]; // Insert partitioning element in both arrays. arr[j] = a; jstack += 2; // Push pointers to larger subarray on stack, process smaller subarray immediately. if ( ir-i+1 >= j-l ) { istack[jstack] = ir; istack[jstack-1] = i; ir = j-1; } else { istack[jstack] = j-1; istack[jstack-1] = l; l = i; } } } } /************************************************************************ * Function to find the id in a sorted vector * ************************************************************************/ inline bool find_id(const std::vector &x_in, const ThreadPool::thread_id_t &id ) { if ( x_in.empty() ) return false; size_t n = x_in.size(); const ThreadPool::thread_id_t *x = &x_in[0]; // Use the pointer for speed if ( n<4 ) { for (size_t i=0; i x[n-1] ) return false; // Perform the search size_t lower = 0; size_t upper = n-1; size_t index; while ( (upper-lower) != 1 ) { index = (upper+lower)/2; if ( x[index] == id ) return true; if ( x[index] >= id ) upper = index; else lower = index; } return false; } /************************************************************************ * Function to add dependencies to the work item * ************************************************************************/ void ThreadPool::WorkItem::add_dependencies( size_t N, const ThreadPool::thread_id_t* ids) { if ( d_tpool_index != -1 ) { // The item has already been added to the threadpool, // we are not allowed to add dependencies throw std::logic_error("Cannot add dependency to work item once it has been added the the threadpool"); } if ( static_cast(d_N_ids)+N > 0xFFFF ) { throw std::logic_error("Cannot add more than 65000 dependencies"); } for (size_t i=0; i= d_size ) { thread_id_t* tmp = d_ids; unsigned int N2 = d_size; if ( N2 == 0 ) { N2 = 8; } while ( N2 <= d_N_ids ) N2 *= 2; d_ids = new thread_id_t[N2]; for (size_t i=0; i