Created standalone MPI failure test

This commit is contained in:
Mark Berrill 2022-02-10 14:02:40 -05:00
parent e657971ee0
commit 7787f9e8f1

View File

@ -1,9 +1,12 @@
#include <csignal>
#include <algorithm>
#include <array>
#include <cstring>
#include <iostream>
#include <fstream>
#include <math.h>
#include <memory>
#include <mutex>
#include <sstream>
#include <stdexcept>
#include <stdio.h>
@ -11,7 +14,13 @@
#include <time.h>
#include <thread>
#include <vector>
#include <dlfcn.h>
#include <execinfo.h>
#include <sched.h>
#include <sys/time.h>
#include <ctime>
#include <unistd.h>
#include <sys/syscall.h>
#include "mpi.h"
@ -392,59 +401,126 @@ private:
};
// Code to mimic StackTrace backend
static MPI_Comm globalCommForGlobalCommStack = MPI_COMM_NULL;
static volatile int globalMonitorThreadStatus = -1;
static std::shared_ptr<std::thread> globalMonitorThread;
static void runGlobalMonitorThread()
// Return the number of threads
static std::mutex StackTrace_mutex;
typedef void ( *handle_type )( int );
static int reset_signal_count[128];
static handle_type reset_signal_handler[128] = { nullptr };
static int global_thread_backtrace_count;
static void* global_thread_backtrace[1000];
static std::thread::native_handle_type thread_handle;
static bool thread_id_finished;
static void _activeThreads_signal_handler( int )
{
int rank = 0;
int size = 1;
MPI_Comm_size( globalCommForGlobalCommStack, &size );
MPI_Comm_rank( globalCommForGlobalCommStack, &rank );
while ( globalMonitorThreadStatus == 1 ) {
// Check for any messages
int flag = 0;
MPI_Status status;
int err = MPI_Iprobe( MPI_ANY_SOURCE, 1, globalCommForGlobalCommStack, &flag, &status );
if ( err != MPI_SUCCESS ) {
printf( "Internal error in runGlobalMonitorThread\n" );
break;
} else if ( flag != 0 ) {
// We received a request
int src_rank = status.MPI_SOURCE;
int tag;
MPI_Recv( &tag, 1, MPI_INT, src_rank, 1, globalCommForGlobalCommStack, &status );
// Do stuff
throw std::logic_error( "This should not happen for this test" );
} else {
// No requests received
std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
thread_handle = pthread_self();
thread_id_finished = true;
}
static int get_thread_callstack_signal()
{
if ( 39 >= SIGRTMIN && 39 <= SIGRTMAX )
return 39;
return std::min<int>( SIGRTMIN+4, SIGRTMAX );
}
static int thread_callstack_signal = get_thread_callstack_signal();
static bool initialize_reset_signal_count()
{
for ( int i = 0; i < 128; i++ )
reset_signal_count[i] = 0;
return true;
}
static bool reset_signal_vars_initialize = initialize_reset_signal_count();
static void clearSignal( int sig )
{
reset_signal_handler[sig] = signal( sig, SIG_IGN );
}
static void resetSignal( int sig )
{
if ( reset_signal_count[sig] == 0 )
signal( sig, reset_signal_handler[sig] );
}
static constexpr int get_tid( int pid, const char *line )
{
char buf2[128]={0};
int i1 = 0;
while ( line[i1]==' ' ) { i1++; }
int i2 = i1;
while ( line[i2]!=' ' ) { i2++; }
memcpy(buf2,&line[i1],i2-i1);
buf2[i2-i1+1] = 0;
int pid2 = atoi(buf2);
if ( pid2 != pid )
return -1;
i1 = i2;
while ( line[i1]==' ' ) { i1++; }
i2 = i1;
while ( line[i2]!=' ' ) { i2++; }
memcpy(buf2,&line[i1],i2-i1);
buf2[i2-i1+1] = 0;
int tid = atoi(buf2);
return tid;
}
template<class FUNCTION>
static inline int exec3( const char *cmd, FUNCTION &fun )
{
clearSignal( SIGCHLD ); // Clear child exited
auto pipe = popen( cmd, "r" );
if ( pipe == nullptr )
return -1;
while ( !feof( pipe ) ) {
char buffer[0x2000];
buffer[0] = 0;
auto ptr = fgets( buffer, sizeof( buffer ), pipe );
if ( buffer[0] != 0 )
fun( buffer );
}
int code = pclose( pipe );
if ( errno == ECHILD ) {
errno = 0;
code = 0;
}
std::this_thread::yield(); // Allow any signals to process
resetSignal( SIGCHLD ); // Clear child exited
return code;
}
static std::vector<std::thread::native_handle_type> getActiveThreads( )
{
std::vector<std::thread::native_handle_type> threads;
int N_tid = 0, tid[1024];
int pid = getpid();
char cmd[128];
sprintf( cmd, "ps -T -p %i", pid );
auto fun = [&N_tid,&tid,pid]( const char* line ) {
int id = get_tid( pid, line );
if ( id != -1 && N_tid < 1024 )
tid[N_tid++] = id;
};
exec3( cmd, fun );
int myid = syscall(SYS_gettid);
for ( int i=0; i<N_tid; i++) {
if ( tid[i] == myid )
std::swap( tid[i], tid[--N_tid] );
}
auto old = signal( thread_callstack_signal, _activeThreads_signal_handler );
for ( int i=0; i<N_tid; i++) {
StackTrace_mutex.lock();
thread_id_finished = false;
thread_handle = pthread_self();
syscall( SYS_tgkill, pid, tid[i], thread_callstack_signal );
auto t1 = std::chrono::high_resolution_clock::now();
auto t2 = std::chrono::high_resolution_clock::now();
while ( !thread_id_finished && std::chrono::duration<double>(t2-t1).count()<0.1 ) {
std::this_thread::yield();
t2 = std::chrono::high_resolution_clock::now();
}
threads.push_back( thread_handle );
StackTrace_mutex.unlock();
}
}
void globalCallStackInitialize( MPI_Comm comm )
{
globalMonitorThreadStatus = 3;
// Check that we have support to get call stacks from threads
int N_threads = 2;
MPI_Bcast( &N_threads, 1, MPI_INT, 0, comm );
// Create the communicator and initialize the helper thread
globalMonitorThreadStatus = 1;
MPI_Comm_dup( comm, &globalCommForGlobalCommStack );
globalMonitorThread.reset( new std::thread( runGlobalMonitorThread ) );
std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
}
void globalCallStackFinalize()
{
if ( globalMonitorThread ) {
globalMonitorThreadStatus = 2;
globalMonitorThread->join();
globalMonitorThread.reset();
}
if ( globalCommForGlobalCommStack != MPI_COMM_NULL )
MPI_Comm_free( &globalCommForGlobalCommStack );
globalCommForGlobalCommStack = MPI_COMM_NULL;
signal( thread_callstack_signal, old );
// Add the current thread
threads.push_back( pthread_self() );
// Sort the threads, remove any duplicates and remove the globalMonitorThread
std::sort( threads.begin(), threads.end() );
return threads;
}
@ -468,17 +544,12 @@ std::array<int,3> get_nproc( int P )
int main(int argc, char **argv)
{
// Start MPI
bool multiple = true;
if (multiple) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
if (provided < MPI_THREAD_MULTIPLE)
std::cerr << "Warning: Failed to start MPI with thread support\n";
globalCallStackInitialize(MPI_COMM_WORLD);
} else {
MPI_Init(&argc, &argv);
}
MPI_Init(&argc, &argv);
// Initialize the bug?
auto thread_ids = getActiveThreads();
std::cout << "N_threads = " << thread_ids.size() << std::endl << std::flush;
// Run the problem
int size = 0;
MPI_Comm_size( MPI_COMM_WORLD, &size );
@ -494,7 +565,6 @@ int main(int argc, char **argv)
std::cout << "step 3" << std::endl << std::flush;
// Shutdown MPI
globalCallStackFinalize();
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
std::cout << "step 4" << std::endl << std::flush;