added: parallel support to HDF5File / HDF5Serializer

This commit is contained in:
Arne Morten Kvarving
2023-02-09 23:30:02 +01:00
parent 0255bcebb1
commit 8c3400f562
10 changed files with 518 additions and 68 deletions

View File

@@ -24,6 +24,8 @@
#include <opm/common/utility/String.hpp>
#include <opm/simulators/utils/DeferredLoggingErrorHelpers.hpp>
#include <filesystem>
#include <stdexcept>
@@ -46,18 +48,32 @@ bool groupExists(hid_t parent, const std::string& path)
namespace Opm {
HDF5File::HDF5File(const std::string& fileName, OpenMode mode)
HDF5File::HDF5File(const std::string& fileName,
OpenMode mode,
Parallel::Communication comm)
: comm_(comm)
{
bool exists = std::filesystem::exists(fileName);
hid_t acc_tpl = H5P_DEFAULT;
if (comm.size() > 1) {
#if HAVE_MPI
MPI_Info info = MPI_INFO_NULL;
acc_tpl = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(acc_tpl, comm_, info);
#else
assert(0); // should be unreachable
#endif
}
if (mode == OpenMode::OVERWRITE ||
(mode == OpenMode::APPEND && !exists)) {
m_file = H5Fcreate(fileName.c_str(),
H5F_ACC_TRUNC,
H5P_DEFAULT, H5P_DEFAULT);
H5P_DEFAULT, acc_tpl);
} else {
m_file = H5Fopen(fileName.c_str(),
mode == OpenMode::READ ? H5F_ACC_RDONLY : H5F_ACC_RDWR,
H5P_DEFAULT);
acc_tpl);
}
if (m_file == H5I_INVALID_HID) {
throw std::runtime_error(std::string("HDF5File: Failed to ") +
@@ -65,6 +81,10 @@ HDF5File::HDF5File(const std::string& fileName, OpenMode mode)
(mode == OpenMode::APPEND && !exists) ? "create" : "open") +
fileName);
}
if (comm_.size() > 1) {
H5Pclose(acc_tpl);
}
}
HDF5File::~HDF5File()
@@ -76,63 +96,71 @@ HDF5File::~HDF5File()
void HDF5File::write(const std::string& group,
const std::string& dset,
const std::vector<char>& buffer)
const std::vector<char>& buffer,
DataSetMode mode) const
{
hid_t grp;
if (groupExists(m_file, group)) {
grp = H5Gopen2(m_file, group.c_str(), H5P_DEFAULT);
hid_t grp = H5I_INVALID_HID;
std::string realGroup = group;
if (mode == DataSetMode::PROCESS_SPLIT) {
if (group != "/")
realGroup += '/';
realGroup += dset;
}
OPM_BEGIN_PARALLEL_TRY_CATCH();
if (groupExists(m_file, realGroup)) {
grp = H5Gopen2(m_file, realGroup.c_str(), H5P_DEFAULT);
} else {
auto grps = split_string(group, '/');
auto grps = split_string(realGroup, '/');
std::string curr;
for (size_t i = 0; i < grps.size()-1; ++i) {
for (size_t i = 0; i < grps.size(); ++i) {
if (grps[i].empty())
continue;
curr += '/';
curr += grps[i];
if (!groupExists(m_file, curr)) {
hid_t subgrp = H5Gcreate2(m_file, curr.c_str(), 0, H5P_DEFAULT, H5P_DEFAULT);
if (subgrp == H5I_INVALID_HID) {
throw std::runtime_error("HDF5File: Failed to create group '" + curr + "'");
throw std::runtime_error("Failed to create group '" + curr + "'");
}
H5Gclose(subgrp);
if (i == grps.size() - 1) {
grp = subgrp;
} else {
H5Gclose(subgrp);
}
} else if (i == grps.size() - 1) {
grp = H5Gopen2(m_file, realGroup.c_str(), H5P_DEFAULT);
}
}
grp = H5Gcreate2(m_file, group.c_str(), 0, H5P_DEFAULT, H5P_DEFAULT);
}
if (grp == H5I_INVALID_HID) {
throw std::runtime_error("HDF5File: Failed to create group '" + group + "'");
throw std::runtime_error("Failed to create group '" + realGroup + "'");
}
hsize_t size = buffer.size();
hsize_t start = 0;
hid_t space = H5Screate_simple(1, &size, nullptr);
hid_t dataset_id = H5Dcreate2(grp, dset.c_str(), H5T_NATIVE_CHAR, space,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
if (dataset_id == H5I_INVALID_HID) {
throw std::runtime_error("HDF5File: Trying to write already existing dataset '" + group + '/' + dset + "'");
if (mode == DataSetMode::PROCESS_SPLIT) {
writeSplit(grp, buffer, realGroup);
} else if (mode == DataSetMode::ROOT_ONLY) {
writeRootOnly(grp, buffer, group, dset);
}
if (size > 0) {
hid_t filespace = H5Dget_space(dataset_id);
hsize_t stride = 1;
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, &start, &stride, &size, nullptr);
hid_t memspace = H5Screate_simple(1, &size, nullptr);
H5Dwrite(dataset_id, H5T_NATIVE_CHAR, memspace, filespace, H5P_DEFAULT, buffer.data());
H5Sclose(memspace);
H5Sclose(filespace);
}
H5Dclose(dataset_id);
H5Sclose(space);
H5Gclose(grp);
OPM_END_PARALLEL_TRY_CATCH("HDF5File: Error writing data: ", comm_);
}
void HDF5File::read(const std::string& group,
const std::string& dset,
std::vector<char>& buffer) const
std::vector<char>& buffer,
DataSetMode mode) const
{
hid_t dataset_id = H5Dopen2(m_file, (group + "/"+ dset).c_str(), H5P_DEFAULT);
std::string realSet = group + '/' + dset;
if (mode == DataSetMode::PROCESS_SPLIT) {
realSet += '/' + std::to_string(comm_.rank());
}
hid_t dataset_id = H5Dopen2(m_file, realSet.c_str(), H5P_DEFAULT);
if (dataset_id == H5I_INVALID_HID) {
throw std::runtime_error("HDF5File: Trying to read non-existing dataset " + group + '/' + dset);
throw std::runtime_error("Trying to read non-existing dataset " + group + '/' + dset);
}
hid_t space = H5Dget_space(dataset_id);
@@ -157,10 +185,79 @@ std::vector<std::string> HDF5File::list(const std::string& group) const
if (H5Literate_by_name(m_file, group.c_str(),
H5_INDEX_NAME, H5_ITER_INC,
&idx, list_group, &result, H5P_DEFAULT) < 0) {
throw std::runtime_error("Failure while listing HDF5 group '" + group + "'");
throw std::runtime_error("Failure while listing group '" + group + "'");
}
return result;
}
void HDF5File::writeSplit(hid_t grp,
const std::vector<char>& buffer,
const std::string& dset) const
{
std::vector<hsize_t> proc_sizes(comm_.size());
if (comm_.size() > 1) {
hsize_t lsize = buffer.size();
comm_.allgather(&lsize, 1, proc_sizes.data());
} else {
proc_sizes[0] = buffer.size();
}
for (int i = 0; i < comm_.size(); ++i) {
hid_t space = H5Screate_simple(1, &proc_sizes[i], nullptr);
hid_t dataset_id = H5Dcreate2(grp,
std::to_string(i).c_str(),
H5T_NATIVE_CHAR, space,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
if (dataset_id == H5I_INVALID_HID) {
throw std::runtime_error("Trying to write already existing dataset '" +
dset + '/' + std::to_string(i) + "'");
}
if (i == comm_.rank()) {
hid_t filespace = H5Dget_space(dataset_id);
hsize_t stride = 1;
hsize_t start = 0;
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, &start, &stride, &proc_sizes[i], nullptr);
hid_t memspace = H5Screate_simple(1, &proc_sizes[i], nullptr);
H5Dwrite(dataset_id, H5T_NATIVE_CHAR, memspace, filespace, H5P_DEFAULT, buffer.data());
H5Sclose(memspace);
H5Sclose(filespace);
}
H5Dclose(dataset_id);
H5Sclose(space);
}
}
void HDF5File::writeRootOnly(hid_t grp,
const std::vector<char>& buffer,
const std::string& group,
const std::string& dset) const
{
hsize_t size = buffer.size();
comm_.broadcast(&size, 1, 0);
hid_t space = H5Screate_simple(1, &size, nullptr);
hid_t dataset_id = H5Dcreate2(grp,
dset.c_str(),
H5T_NATIVE_CHAR, space,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
if (dataset_id == H5I_INVALID_HID) {
throw std::runtime_error("Trying to write already existing dataset '" +
group + '/' + dset + "'");
}
if (comm_.rank() == 0) {
hid_t filespace = H5Dget_space(dataset_id);
hsize_t stride = 1;
hsize_t start = 0;
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, &start, &stride, &size, nullptr);
hid_t memspace = H5Screate_simple(1, &size, nullptr);
H5Dwrite(dataset_id, H5T_NATIVE_CHAR, memspace, filespace, H5P_DEFAULT, buffer.data());
H5Sclose(memspace);
H5Sclose(filespace);
}
H5Dclose(dataset_id);
H5Sclose(space);
}
}

View File

@@ -21,6 +21,8 @@
#ifndef HDF5_FILE_HPP
#define HDF5_FILE_HPP
#include <opm/simulators/utils/ParallelCommunication.hpp>
#include <hdf5.h>
#include <string>
@@ -38,10 +40,18 @@ public:
READ //!< Open existing file for reading
};
//! \brief Enumeration of dataset modes.
enum class DataSetMode {
ROOT_ONLY, //!< A single dataset created at the root process
PROCESS_SPLIT //!< One separate data set for each parallel process
};
//! \brief Opens HDF5 file for I/O.
//! \param fileName Name of file to open
//! \param mode Open mode for file
HDF5File(const std::string& fileName, OpenMode mode);
HDF5File(const std::string& fileName,
OpenMode mode,
Parallel::Communication comm);
//! \brief Destructor clears up any opened files.
~HDF5File();
@@ -53,7 +63,8 @@ public:
//! \details Throws exception on failure
void write(const std::string& group,
const std::string& dset,
const std::vector<char>& buffer);
const std::vector<char>& buffer,
DataSetMode mode = DataSetMode::PROCESS_SPLIT) const;
//! \brief Read a char buffer from a specified location in file.
//! \param group Group ("directory") to read data from
@@ -62,14 +73,33 @@ public:
//! \details Throws exception on failure
void read(const std::string& group,
const std::string& dset,
std::vector<char>& buffer) const;
std::vector<char>& buffer,
DataSetMode Mode = DataSetMode::PROCESS_SPLIT) const;
//! \brief Lists the entries in a given group.
//! \details Note: Both datasets and subgroups are returned
std::vector<std::string> list(const std::string& group) const;
private:
//! \brief Write data from each process to a separate dataset.
//! \param grp Handle for group to store dataset in
//! \param buffer Data to write
//! \param dset Name of dataset
void writeSplit(hid_t grp,
const std::vector<char>& buffer,
const std::string& dset) const;
//! \brief Write data from root process only.
//! \param grp Handle for group to store dataset in
//! \param buffer Data to write
//! \param dset Name of dataset
void writeRootOnly(hid_t grp,
const std::vector<char>& buffer,
const std::string& group,
const std::string& dset) const;
hid_t m_file = H5I_INVALID_HID; //!< File handle
Parallel::Communication comm_;
};
}