Adding parallel IO to help debug visit writer

This commit is contained in:
Mark Berrill 2014-11-11 14:05:39 -05:00
parent baef88b665
commit db1211ec6f
5 changed files with 376 additions and 29 deletions

172
IO/PIO.cpp Normal file
View File

@ -0,0 +1,172 @@
#include "IO/PIO.h"
#include "common/Utilities.h"
#include <fstream>
#include <string>
#include <cstring>
#ifdef USE_MPI
#include "mpi.h"
#endif
namespace IO {
static ParallelStreamBuffer pout_buffer;
static ParallelStreamBuffer perr_buffer;
static ParallelStreamBuffer plog_buffer;
std::ostream pout(&pout_buffer);
std::ostream perr(&perr_buffer);
std::ostream plog(&plog_buffer);
/****************************************************************************
* Functions to control logging *
****************************************************************************/
std::ofstream *global_filestream=NULL;
static void shutdownFilestream( )
{
if ( global_filestream!=NULL ) {
global_filestream->flush();
global_filestream->close();
delete global_filestream;
global_filestream = NULL;
}
}
void Utilities::logOnlyNodeZero( const std::string &filename )
{
int rank = 0;
#ifdef USE_MPI
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
#endif
if ( rank == 0 )
logAllNodes(filename,true);
}
void Utilities::logAllNodes( const std::string &filename, bool singleStream )
{
if ( singleStream )
ERROR("Not implimented yet");
// If the filestream was open, then close it and reset streams
shutdownFilestream();
// Open the log stream and redirect output
std::string full_filename = filename;
if ( !singleStream ) {
int rank = 0;
#ifdef USE_MPI
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
#endif
char tmp[100];
sprintf(tmp,".%04i",rank);
full_filename += std::string(tmp);
}
global_filestream = new std::ofstream(full_filename.c_str());
if ( !(*global_filestream) ) {
delete global_filestream;
global_filestream = NULL;
perr << "PIO: Could not open log file ``" << full_filename << "''\n";
} else {
pout_buffer.setOutputStream(global_filestream);
pout_buffer.setOutputStream(&std::cout);
perr_buffer.setOutputStream(global_filestream);
perr_buffer.setOutputStream(&std::cerr);
plog_buffer.setOutputStream(global_filestream);
}
}
/****************************************************************************
* ParallelStreamBuffer class *
****************************************************************************/
void Utilities::stopLogging( )
{
pout_buffer.reset();
perr_buffer.reset();
plog_buffer.reset();
shutdownFilestream();
delete global_filestream;
global_filestream = NULL;
}
/****************************************************************************
* ParallelStreamBuffer class *
****************************************************************************/
ParallelStreamBuffer::ParallelStreamBuffer( ):
d_rank(0), d_size(0), d_buffer_size(0), d_buffer(NULL)
{
}
ParallelStreamBuffer:: ~ParallelStreamBuffer()
{
delete [] d_buffer;
}
void ParallelStreamBuffer::setOutputStream( std::ostream *stream )
{
d_stream.push_back( stream );
}
int ParallelStreamBuffer::sync()
{
for (size_t i=0; i<d_stream.size(); i++) {
std::ostream& stream = *d_stream[i];
stream << d_buffer;
}
d_size = 0;
memset(d_buffer,0,d_buffer_size);
return 0;
}
void ParallelStreamBuffer::reserve( size_t size )
{
if ( size > d_buffer_size ) {
if ( d_buffer_size==0 ) {
d_buffer_size = 1024;
d_buffer = new char[d_buffer_size];
memset(d_buffer,0,d_buffer_size);
}
while ( size > d_buffer_size ) {
char *tmp = d_buffer;
d_buffer_size *= 2;
d_buffer = new char[d_buffer_size];
memset(d_buffer,0,d_buffer_size);
memcpy(d_buffer,tmp,d_size);
delete [] tmp;
}
}
}
std::streamsize ParallelStreamBuffer::xsputn( const std::string &text, std::streamsize n )
{
reserve(d_size+n);
memcpy(&d_buffer[d_size],text.c_str(),text.size());
if ( text[n-1]==0 || text[n-1]==10 ) { sync(); }
return n;
}
int ParallelStreamBuffer::overflow(int ch)
{
reserve(d_size+1);
d_buffer[d_size] = ch;
d_size++;
if ( ch==0 || ch==10 ) { sync(); }
return std::char_traits<char>::to_int_type(ch);
}
int ParallelStreamBuffer::underflow()
{
return -1;
}
void ParallelStreamBuffer::reset()
{
sync();
d_stream.clear();
delete [] d_buffer;
d_buffer = NULL;
d_buffer_size = 0;
}
} // IO namespace

135
IO/PIO.h Normal file
View File

@ -0,0 +1,135 @@
#ifndef included_PIO
#define included_PIO
#include <iostream>
#include <vector>
namespace IO {
/*!
* Parallel output stream pout writes to the standard output from node zero
* only. Output from other nodes is ignored. If logging is enabled, then
* output is mirrored to the log stream, as well.
*/
extern std::ostream pout;
/*!
* Parallel output stream perr writes to the standard error from all nodes.
* Output is prepended with the processor number.
*/
extern std::ostream perr;
/*!
* Parallel output stream plog writes output to the log file. When logging
* from multiple processors, the processor number is appended to the filename.
*/
extern std::ostream plog;
/*!
* Parallel output printp pout writes to the standard output from node zero
* only. Output from other nodes is ignored. If logging is enabled, then
* output is mirrored to the log stream, as well.
* The format matches the format for printf
*/
inline int printp( const char *format, ... );
/*!
* Class ParallelBuffer is a simple I/O stream utility that
* intercepts output from an ostream and redirects the output as necessary
* for parallel I/O. This class defines a stream buffer class for an
* ostream class.
*/
class ParallelStreamBuffer : public std::streambuf
{
public:
/*!
* Create a parallel buffer class. The object will require further
* initialization to set up the I/O streams and prefix string.
*/
ParallelStreamBuffer( );
/*!
* Set the output file stream (multiple output streams are supported)
* @param stream Output stream
*/
void setOutputStream( std::ostream *stream );
/*!
* The destructor simply deallocates any internal data
* buffers. It does not modify the output streams.
*/
virtual ~ParallelStreamBuffer();
/*!
* Synchronize the parallel buffer (called from streambuf).
*/
virtual int sync();
/**
* Write the specified number of characters into the output stream (called
* from streambuf).
*/
virtual std::streamsize xsputn(const std::string &text, std::streamsize n);
/*!
* Write an overflow character into the parallel buffer (called from
* streambuf).
*/
virtual int overflow(int ch);
/*!
* Read an overflow character from the parallel buffer (called from
* streambuf). This is not implemented. It is needed by the
* MSVC++ stream implementation.
*/
virtual int underflow();
/*!
* Clear the internal buffer's memory
*/
virtual void reset();
private:
int d_rank;
size_t d_size;
size_t d_buffer_size;
char *d_buffer;
std::vector<std::ostream*> d_stream;
inline void reserve( size_t size );
};
namespace Utilities {
/*!
* Log messages for node zero only to the specified filename. All output
* to pout, perr, and plog on node zero will go to the log file.
*/
void logOnlyNodeZero( const std::string &filename );
/*!
* Log messages from all nodes. The diagnostic data for processor XXXXX
* will be sent to a file with the name filename.XXXXX, where filename is
* the function argument.
*/
void logAllNodes( const std::string &filename, bool singleStream=false );
/*!
* Stop logging messages, flush buffers, and reset memory.
*/
void stopLogging( );
} // namespace Utilities
} // namespace IO
#include "IO/PIO.hpp"
#endif

29
IO/PIO.hpp Normal file
View File

@ -0,0 +1,29 @@
#ifndef included_PIO_hpp
#define included_PIO_hpp
#include "IO/PIO.h"
#include <iostream>
#include <stdarg.h>
#include <cstdio>
namespace IO {
inline int printp( const char *format, ... )
{
va_list ap;
va_start(ap,format);
char tmp[1024];
int n = vsprintf(tmp,format,ap);
va_end(ap);
pout << tmp;
pout.flush();
return n;
}
} // IO namespace
#endif

View File

@ -47,7 +47,7 @@
* \param MSG Error message to print
*/
#define ERROR(MSG) do { \
Utilities::abort(MSG,__FILE__,__LINE__); \
::Utilities::abort(MSG,__FILE__,__LINE__); \
}while(0)
@ -75,7 +75,7 @@
if ( !(EXP) ) { \
std::stringstream tboxos; \
tboxos << "Failed assertion: " << #EXP << std::ends; \
Utilities::abort(tboxos.str(), __FILE__, __LINE__); \
::Utilities::abort(tboxos.str(), __FILE__, __LINE__); \
} \
}while(0)
@ -94,7 +94,7 @@
std::stringstream tboxos; \
tboxos << "Failed insist: " << #EXP << std::endl; \
tboxos << "Message: " << MSG << std::ends; \
Utilities::abort(tboxos.str(), __FILE__, __LINE__); \
::Utilities::abort(tboxos.str(), __FILE__, __LINE__); \
} \
}while(0)

View File

@ -48,6 +48,7 @@
// LBPM headers
#include "IO/Reader.h"
#include "IO/IOHelpers.h"
#include "IO/PIO.h"
#include "common/Utilities.h"
// vtk headers
@ -98,6 +99,13 @@
#endif
// Output streams
static IO::ParallelStreamBuffer DebugStreamBuffer1;
static IO::ParallelStreamBuffer DebugStreamBuffer2;
std::ostream DebugStream1(&DebugStreamBuffer1);
std::ostream DebugStream2(&DebugStreamBuffer2);
// ****************************************************************************
// Method: avtLBMFileFormat constructor
//
@ -109,9 +117,13 @@
avtLBMFileFormat::avtLBMFileFormat(const char *filename)
: avtMTMDFileFormat(filename)
{
DebugStream::Stream1() << "avtLBMFileFormat::avtLBMFileFormat: " << filename << std::endl;
// Set abort behavior
Utilities::setAbortBehavior(true,true,true);
Utilities::setErrorHandlers();
// Set debug streams
DebugStreamBuffer1.setOutputStream( &DebugStream::Stream1() );
DebugStreamBuffer2.setOutputStream( &DebugStream::Stream2() );
DebugStreamBuffer1.setOutputStream( &std::cout );
// Get the path to the input file
std::string file(filename);
size_t k1 = file.rfind(47);
@ -120,7 +132,7 @@ avtLBMFileFormat::avtLBMFileFormat(const char *filename)
if ( k2==std::string::npos ) { k2=0; }
d_path = file.substr(0,std::max(k1,k2));
// Load the summary file
DebugStream::Stream1() << "Loading " << filename << std::endl;
DebugStream1 << "Loading " << filename << std::endl;
d_timesteps = IO::readTimesteps(filename);
// Read the mesh dabases
d_database.clear();
@ -144,7 +156,7 @@ avtLBMFileFormat::avtLBMFileFormat(const char *filename)
int
avtLBMFileFormat::GetNTimesteps(void)
{
DebugStream::Stream1() << "avtLBMFileFormat::GetNTimesteps" << std::endl;
DebugStream2 << "avtLBMFileFormat::GetNTimesteps" << std::endl;
return static_cast<int>(d_timesteps.size());
}
@ -166,16 +178,16 @@ avtLBMFileFormat::GetNTimesteps(void)
void
avtLBMFileFormat::FreeUpResources(void)
{
DebugStream::Stream1() << "avtLBMFileFormat::FreeUpResources" << std::endl;
DebugStream1 << "avtLBMFileFormat::FreeUpResources" << std::endl;
std::map<std::string,vtkObjectBase*>::iterator it;
for ( it=d_meshcache.begin(); it!=d_meshcache.end(); ++it ) {
DebugStream::Stream2() << " deleting: " << it->first << std::endl;
DebugStream2 << " deleting: " << it->first << std::endl;
vtkObjectBase* obj = it->second;
it->second = NULL;
if ( obj!=NULL )
obj->Delete();
}
DebugStream::Stream2() << " finished" << std::endl;
DebugStream2 << " finished" << std::endl;
}
@ -195,11 +207,11 @@ avtLBMFileFormat::FreeUpResources(void)
void
avtLBMFileFormat::PopulateDatabaseMetaData(avtDatabaseMetaData *md, int timestate)
{
DebugStream::Stream1() << "avtLBMFileFormat::PopulateDatabaseMetaData: " << timestate << std::endl;
DebugStream1 << "avtLBMFileFormat::PopulateDatabaseMetaData: " << timestate << std::endl;
// Add the mesh domains to the meta data
const std::vector<IO::MeshDatabase> database = d_database[timestate];
for (size_t i=0; i<database.size(); i++) {
DebugStream::Stream1() << " Adding " << database[i].name << std::endl;
DebugStream2 << " Adding " << database[i].name << std::endl;
avtMeshMetaData *mmd = new avtMeshMetaData;
mmd->name = database[i].name;
mmd->meshType = vtkMeshType(database[i].type);
@ -244,7 +256,7 @@ avtLBMFileFormat::PopulateDatabaseMetaData(avtDatabaseMetaData *md, int timestat
}
}
}
DebugStream::Stream1() << " Finished" << std::endl;
DebugStream2 << " Finished" << std::endl;
//
// CODE TO ADD A MATERIAL
@ -307,7 +319,7 @@ avtLBMFileFormat::PopulateDatabaseMetaData(avtDatabaseMetaData *md, int timestat
vtkDataSet *
avtLBMFileFormat::GetMesh(int timestate, int domain, const char *meshname)
{
DebugStream::Stream1() << "avtLBMFileFormat::GetMesh - " << meshname
DebugStream1 << "avtLBMFileFormat::GetMesh - " << meshname
<< "," << timestate << "," << domain << std::endl;
TIME_TYPE start, stop, freq;
get_frequency(&freq);
@ -323,33 +335,32 @@ avtLBMFileFormat::GetMesh(int timestate, int domain, const char *meshname)
const std::string timestep = d_timesteps[timestate];
for (size_t i=0; i<database.size(); i++) {
if ( database[i].name==std::string(meshname) ) {
DebugStream::Stream1() << " calling getMesh" << std::endl;
DebugStream2 << " calling getMesh" << std::endl;
try {
mesh = IO::getMesh(d_path,timestep,database[i],domain);
} catch (const std::exception &err) {
DebugStream::Stream1() << " Caught errror calling getMesh:" << std::endl;
DebugStream::Stream1() << err.what() << std::endl;
DebugStream1 << " Caught errror calling getMesh:" << std::endl;
DebugStream1 << err.what() << std::endl;
} catch (...) {
DebugStream::Stream1() << " Caught unknown errror calling getMesh" << std::endl;
DebugStream1 << " Caught unknown errror calling getMesh" << std::endl;
return NULL;
}
}
}
if ( mesh==NULL ) {
DebugStream::Stream1() << " Error loading mesh" << std::endl;
DebugStream1 << " Error loading mesh" << std::endl;
return NULL;
}
// Create the mesh in vtk
vtkDataSet* vtkMesh = meshToVTK(mesh);
vtkMesh->PrintSelf(std::cerr,vtkIndent(6));
DebugStream::Stream2() << " mesh created:" << std::endl;
DebugStream2 << " mesh created:" << std::endl;
ASSERT(vtkMesh!=NULL);
DebugStream::Stream2() << " " << vtkMesh->GetNumberOfCells() << std::endl;
vtkMesh->PrintSelf(DebugStream::Stream2(),vtkIndent(6));
DebugStream2 << " " << vtkMesh->GetNumberOfCells() << std::endl;
vtkMesh->PrintSelf(DebugStream2,vtkIndent(6));
// Cache the mesh and return
// meshcache[cache_name] = mesh;
get_time(&stop);
DebugStream::Stream2() << " Time required: " << get_diff(start,stop,freq) << std::endl;
DebugStream2 << " Time required: " << get_diff(start,stop,freq) << std::endl;
return vtkMesh;
}
@ -378,7 +389,7 @@ vtkMesh->PrintSelf(std::cerr,vtkIndent(6));
vtkDataArray *
avtLBMFileFormat::GetVar(int timestate, int domain, const char *meshvarname)
{
DebugStream::Stream1() << "avtLBMFileFormat::GetVar: " << meshvarname
DebugStream1 << "avtLBMFileFormat::GetVar: " << meshvarname
<< "," << timestate << "," << domain << std::endl;
std::vector<std::string> tmp = IO::splitList(meshvarname,'/');
ASSERT(tmp.size()==2);
@ -389,14 +400,14 @@ avtLBMFileFormat::GetVar(int timestate, int domain, const char *meshvarname)
std::shared_ptr<const IO::Variable> variable;
for (size_t i=0; i<database.size(); i++) {
if ( database[i].name==std::string(meshname) ) {
DebugStream::Stream1() << " calling getVar" << std::endl;
DebugStream2 << " calling getVar" << std::endl;
try {
variable = IO::getVariable(d_path,timestep,database[i],domain,varname);
} catch (const std::exception &err) {
DebugStream::Stream1() << " Caught errror calling getVar:" << std::endl;
DebugStream::Stream1() << err.what() << std::endl;
DebugStream1 << " Caught errror calling getVar:" << std::endl;
DebugStream1 << err.what() << std::endl;
} catch (...) {
DebugStream::Stream1() << " Caught unknown errror calling getVar" << std::endl;
DebugStream1 << " Caught unknown errror calling getVar" << std::endl;
return NULL;
}
}
@ -434,7 +445,7 @@ avtLBMFileFormat::GetVectorVar(int timestate, int domain, const char *varname)
{
cerr << "avtLBMFileFormat::GetVectorVar - " << varname
<< "," << timestate << "," << domain << std::endl;
DebugStream::Stream1() << "avtLBMFileFormat::GetVectorVar" << std::endl;
DebugStream1 << "avtLBMFileFormat::GetVectorVar" << std::endl;
EXCEPTION1(InvalidVariableException, varname);
return NULL;