diff --git a/IO/PIO.cpp b/IO/PIO.cpp new file mode 100644 index 00000000..83277ff7 --- /dev/null +++ b/IO/PIO.cpp @@ -0,0 +1,172 @@ +#include "IO/PIO.h" +#include "common/Utilities.h" + +#include +#include +#include + + +#ifdef USE_MPI + #include "mpi.h" +#endif + + +namespace IO { + + +static ParallelStreamBuffer pout_buffer; +static ParallelStreamBuffer perr_buffer; +static ParallelStreamBuffer plog_buffer; + + +std::ostream pout(&pout_buffer); +std::ostream perr(&perr_buffer); +std::ostream plog(&plog_buffer); + + + +/**************************************************************************** +* Functions to control logging * +****************************************************************************/ +std::ofstream *global_filestream=NULL; +static void shutdownFilestream( ) +{ + if ( global_filestream!=NULL ) { + global_filestream->flush(); + global_filestream->close(); + delete global_filestream; + global_filestream = NULL; + } +} +void Utilities::logOnlyNodeZero( const std::string &filename ) +{ + int rank = 0; + #ifdef USE_MPI + MPI_Comm_rank( MPI_COMM_WORLD, &rank ); + #endif + if ( rank == 0 ) + logAllNodes(filename,true); +} +void Utilities::logAllNodes( const std::string &filename, bool singleStream ) +{ + if ( singleStream ) + ERROR("Not implimented yet"); + + // If the filestream was open, then close it and reset streams + shutdownFilestream(); + + // Open the log stream and redirect output + std::string full_filename = filename; + if ( !singleStream ) { + int rank = 0; + #ifdef USE_MPI + MPI_Comm_rank( MPI_COMM_WORLD, &rank ); + #endif + char tmp[100]; + sprintf(tmp,".%04i",rank); + full_filename += std::string(tmp); + } + global_filestream = new std::ofstream(full_filename.c_str()); + + if ( !(*global_filestream) ) { + delete global_filestream; + global_filestream = NULL; + perr << "PIO: Could not open log file ``" << full_filename << "''\n"; + } else { + pout_buffer.setOutputStream(global_filestream); + pout_buffer.setOutputStream(&std::cout); + perr_buffer.setOutputStream(global_filestream); + perr_buffer.setOutputStream(&std::cerr); + plog_buffer.setOutputStream(global_filestream); + } +} + + +/**************************************************************************** +* ParallelStreamBuffer class * +****************************************************************************/ +void Utilities::stopLogging( ) +{ + pout_buffer.reset(); + perr_buffer.reset(); + plog_buffer.reset(); + shutdownFilestream(); + delete global_filestream; + global_filestream = NULL; +} + + +/**************************************************************************** +* ParallelStreamBuffer class * +****************************************************************************/ +ParallelStreamBuffer::ParallelStreamBuffer( ): + d_rank(0), d_size(0), d_buffer_size(0), d_buffer(NULL) +{ +} +ParallelStreamBuffer:: ~ParallelStreamBuffer() +{ + delete [] d_buffer; +} +void ParallelStreamBuffer::setOutputStream( std::ostream *stream ) +{ + d_stream.push_back( stream ); +} +int ParallelStreamBuffer::sync() +{ + for (size_t i=0; i d_buffer_size ) { + if ( d_buffer_size==0 ) { + d_buffer_size = 1024; + d_buffer = new char[d_buffer_size]; + memset(d_buffer,0,d_buffer_size); + } + while ( size > d_buffer_size ) { + char *tmp = d_buffer; + d_buffer_size *= 2; + d_buffer = new char[d_buffer_size]; + memset(d_buffer,0,d_buffer_size); + memcpy(d_buffer,tmp,d_size); + delete [] tmp; + } + } +} +std::streamsize ParallelStreamBuffer::xsputn( const std::string &text, std::streamsize n ) +{ + reserve(d_size+n); + memcpy(&d_buffer[d_size],text.c_str(),text.size()); + if ( text[n-1]==0 || text[n-1]==10 ) { sync(); } + return n; +} +int ParallelStreamBuffer::overflow(int ch) +{ + reserve(d_size+1); + d_buffer[d_size] = ch; + d_size++; + if ( ch==0 || ch==10 ) { sync(); } + return std::char_traits::to_int_type(ch); +} +int ParallelStreamBuffer::underflow() +{ + return -1; +} +void ParallelStreamBuffer::reset() +{ + sync(); + d_stream.clear(); + delete [] d_buffer; + d_buffer = NULL; + d_buffer_size = 0; +} + + +} // IO namespace + diff --git a/IO/PIO.h b/IO/PIO.h new file mode 100644 index 00000000..e21d7624 --- /dev/null +++ b/IO/PIO.h @@ -0,0 +1,135 @@ +#ifndef included_PIO +#define included_PIO + +#include +#include + + +namespace IO { + + +/*! + * Parallel output stream pout writes to the standard output from node zero + * only. Output from other nodes is ignored. If logging is enabled, then + * output is mirrored to the log stream, as well. + */ +extern std::ostream pout; + +/*! + * Parallel output stream perr writes to the standard error from all nodes. + * Output is prepended with the processor number. + */ +extern std::ostream perr; + +/*! + * Parallel output stream plog writes output to the log file. When logging + * from multiple processors, the processor number is appended to the filename. + */ +extern std::ostream plog; + +/*! + * Parallel output printp pout writes to the standard output from node zero + * only. Output from other nodes is ignored. If logging is enabled, then + * output is mirrored to the log stream, as well. + * The format matches the format for printf + */ +inline int printp( const char *format, ... ); + + +/*! + * Class ParallelBuffer is a simple I/O stream utility that + * intercepts output from an ostream and redirects the output as necessary + * for parallel I/O. This class defines a stream buffer class for an + * ostream class. + */ +class ParallelStreamBuffer : public std::streambuf +{ +public: + + /*! + * Create a parallel buffer class. The object will require further + * initialization to set up the I/O streams and prefix string. + */ + ParallelStreamBuffer( ); + + /*! + * Set the output file stream (multiple output streams are supported) + * @param stream Output stream + */ + void setOutputStream( std::ostream *stream ); + + /*! + * The destructor simply deallocates any internal data + * buffers. It does not modify the output streams. + */ + virtual ~ParallelStreamBuffer(); + + /*! + * Synchronize the parallel buffer (called from streambuf). + */ + virtual int sync(); + + /** + * Write the specified number of characters into the output stream (called + * from streambuf). + */ + virtual std::streamsize xsputn(const std::string &text, std::streamsize n); + + /*! + * Write an overflow character into the parallel buffer (called from + * streambuf). + */ + virtual int overflow(int ch); + + /*! + * Read an overflow character from the parallel buffer (called from + * streambuf). This is not implemented. It is needed by the + * MSVC++ stream implementation. + */ + virtual int underflow(); + + /*! + * Clear the internal buffer's memory + */ + virtual void reset(); + +private: + int d_rank; + size_t d_size; + size_t d_buffer_size; + char *d_buffer; + std::vector d_stream; + inline void reserve( size_t size ); +}; + + +namespace Utilities { + + /*! + * Log messages for node zero only to the specified filename. All output + * to pout, perr, and plog on node zero will go to the log file. + */ + void logOnlyNodeZero( const std::string &filename ); + + /*! + * Log messages from all nodes. The diagnostic data for processor XXXXX + * will be sent to a file with the name filename.XXXXX, where filename is + * the function argument. + */ + void logAllNodes( const std::string &filename, bool singleStream=false ); + + /*! + * Stop logging messages, flush buffers, and reset memory. + */ + void stopLogging( ); + + +} // namespace Utilities + + +} // namespace IO + + +#include "IO/PIO.hpp" + +#endif diff --git a/IO/PIO.hpp b/IO/PIO.hpp new file mode 100644 index 00000000..67b32cdb --- /dev/null +++ b/IO/PIO.hpp @@ -0,0 +1,29 @@ +#ifndef included_PIO_hpp +#define included_PIO_hpp + +#include "IO/PIO.h" + +#include +#include +#include + + +namespace IO { + + +inline int printp( const char *format, ... ) +{ + va_list ap; + va_start(ap,format); + char tmp[1024]; + int n = vsprintf(tmp,format,ap); + va_end(ap); + pout << tmp; + pout.flush(); + return n; +} + + +} // IO namespace + +#endif diff --git a/common/UtilityMacros.h b/common/UtilityMacros.h index 88d15ec7..2165b1d5 100644 --- a/common/UtilityMacros.h +++ b/common/UtilityMacros.h @@ -47,7 +47,7 @@ * \param MSG Error message to print */ #define ERROR(MSG) do { \ - Utilities::abort(MSG,__FILE__,__LINE__); \ + ::Utilities::abort(MSG,__FILE__,__LINE__); \ }while(0) @@ -75,7 +75,7 @@ if ( !(EXP) ) { \ std::stringstream tboxos; \ tboxos << "Failed assertion: " << #EXP << std::ends; \ - Utilities::abort(tboxos.str(), __FILE__, __LINE__); \ + ::Utilities::abort(tboxos.str(), __FILE__, __LINE__); \ } \ }while(0) @@ -94,7 +94,7 @@ std::stringstream tboxos; \ tboxos << "Failed insist: " << #EXP << std::endl; \ tboxos << "Message: " << MSG << std::ends; \ - Utilities::abort(tboxos.str(), __FILE__, __LINE__); \ + ::Utilities::abort(tboxos.str(), __FILE__, __LINE__); \ } \ }while(0) diff --git a/visit/avtLBMFileFormat.C b/visit/avtLBMFileFormat.C index ae7f9afb..1a27c519 100644 --- a/visit/avtLBMFileFormat.C +++ b/visit/avtLBMFileFormat.C @@ -48,6 +48,7 @@ // LBPM headers #include "IO/Reader.h" #include "IO/IOHelpers.h" +#include "IO/PIO.h" #include "common/Utilities.h" // vtk headers @@ -98,6 +99,13 @@ #endif +// Output streams +static IO::ParallelStreamBuffer DebugStreamBuffer1; +static IO::ParallelStreamBuffer DebugStreamBuffer2; +std::ostream DebugStream1(&DebugStreamBuffer1); +std::ostream DebugStream2(&DebugStreamBuffer2); + + // **************************************************************************** // Method: avtLBMFileFormat constructor // @@ -109,9 +117,13 @@ avtLBMFileFormat::avtLBMFileFormat(const char *filename) : avtMTMDFileFormat(filename) { - DebugStream::Stream1() << "avtLBMFileFormat::avtLBMFileFormat: " << filename << std::endl; + // Set abort behavior Utilities::setAbortBehavior(true,true,true); Utilities::setErrorHandlers(); + // Set debug streams + DebugStreamBuffer1.setOutputStream( &DebugStream::Stream1() ); + DebugStreamBuffer2.setOutputStream( &DebugStream::Stream2() ); + DebugStreamBuffer1.setOutputStream( &std::cout ); // Get the path to the input file std::string file(filename); size_t k1 = file.rfind(47); @@ -120,7 +132,7 @@ avtLBMFileFormat::avtLBMFileFormat(const char *filename) if ( k2==std::string::npos ) { k2=0; } d_path = file.substr(0,std::max(k1,k2)); // Load the summary file - DebugStream::Stream1() << "Loading " << filename << std::endl; + DebugStream1 << "Loading " << filename << std::endl; d_timesteps = IO::readTimesteps(filename); // Read the mesh dabases d_database.clear(); @@ -144,7 +156,7 @@ avtLBMFileFormat::avtLBMFileFormat(const char *filename) int avtLBMFileFormat::GetNTimesteps(void) { - DebugStream::Stream1() << "avtLBMFileFormat::GetNTimesteps" << std::endl; + DebugStream2 << "avtLBMFileFormat::GetNTimesteps" << std::endl; return static_cast(d_timesteps.size()); } @@ -166,16 +178,16 @@ avtLBMFileFormat::GetNTimesteps(void) void avtLBMFileFormat::FreeUpResources(void) { - DebugStream::Stream1() << "avtLBMFileFormat::FreeUpResources" << std::endl; + DebugStream1 << "avtLBMFileFormat::FreeUpResources" << std::endl; std::map::iterator it; for ( it=d_meshcache.begin(); it!=d_meshcache.end(); ++it ) { - DebugStream::Stream2() << " deleting: " << it->first << std::endl; + DebugStream2 << " deleting: " << it->first << std::endl; vtkObjectBase* obj = it->second; it->second = NULL; if ( obj!=NULL ) obj->Delete(); } - DebugStream::Stream2() << " finished" << std::endl; + DebugStream2 << " finished" << std::endl; } @@ -195,11 +207,11 @@ avtLBMFileFormat::FreeUpResources(void) void avtLBMFileFormat::PopulateDatabaseMetaData(avtDatabaseMetaData *md, int timestate) { - DebugStream::Stream1() << "avtLBMFileFormat::PopulateDatabaseMetaData: " << timestate << std::endl; + DebugStream1 << "avtLBMFileFormat::PopulateDatabaseMetaData: " << timestate << std::endl; // Add the mesh domains to the meta data const std::vector database = d_database[timestate]; for (size_t i=0; iname = database[i].name; mmd->meshType = vtkMeshType(database[i].type); @@ -244,7 +256,7 @@ avtLBMFileFormat::PopulateDatabaseMetaData(avtDatabaseMetaData *md, int timestat } } } - DebugStream::Stream1() << " Finished" << std::endl; + DebugStream2 << " Finished" << std::endl; // // CODE TO ADD A MATERIAL @@ -307,7 +319,7 @@ avtLBMFileFormat::PopulateDatabaseMetaData(avtDatabaseMetaData *md, int timestat vtkDataSet * avtLBMFileFormat::GetMesh(int timestate, int domain, const char *meshname) { - DebugStream::Stream1() << "avtLBMFileFormat::GetMesh - " << meshname + DebugStream1 << "avtLBMFileFormat::GetMesh - " << meshname << "," << timestate << "," << domain << std::endl; TIME_TYPE start, stop, freq; get_frequency(&freq); @@ -323,33 +335,32 @@ avtLBMFileFormat::GetMesh(int timestate, int domain, const char *meshname) const std::string timestep = d_timesteps[timestate]; for (size_t i=0; iPrintSelf(std::cerr,vtkIndent(6)); - DebugStream::Stream2() << " mesh created:" << std::endl; + DebugStream2 << " mesh created:" << std::endl; ASSERT(vtkMesh!=NULL); - DebugStream::Stream2() << " " << vtkMesh->GetNumberOfCells() << std::endl; - vtkMesh->PrintSelf(DebugStream::Stream2(),vtkIndent(6)); + DebugStream2 << " " << vtkMesh->GetNumberOfCells() << std::endl; + vtkMesh->PrintSelf(DebugStream2,vtkIndent(6)); // Cache the mesh and return // meshcache[cache_name] = mesh; get_time(&stop); - DebugStream::Stream2() << " Time required: " << get_diff(start,stop,freq) << std::endl; + DebugStream2 << " Time required: " << get_diff(start,stop,freq) << std::endl; return vtkMesh; } @@ -378,7 +389,7 @@ vtkMesh->PrintSelf(std::cerr,vtkIndent(6)); vtkDataArray * avtLBMFileFormat::GetVar(int timestate, int domain, const char *meshvarname) { - DebugStream::Stream1() << "avtLBMFileFormat::GetVar: " << meshvarname + DebugStream1 << "avtLBMFileFormat::GetVar: " << meshvarname << "," << timestate << "," << domain << std::endl; std::vector tmp = IO::splitList(meshvarname,'/'); ASSERT(tmp.size()==2); @@ -389,14 +400,14 @@ avtLBMFileFormat::GetVar(int timestate, int domain, const char *meshvarname) std::shared_ptr variable; for (size_t i=0; i