Prototype implementation of CUDA aware MPI

This commit is contained in:
Georg Zitzlsberger 2024-02-01 11:15:25 +01:00 committed by Tobias Meyer Andersen
parent d3b22323f1
commit eb6f9dc1f9
5 changed files with 255 additions and 1 deletions

View File

@ -97,7 +97,7 @@ public:
* @param[in] source
* @param[out] dest
*/
void copyOwnerToAll(const X& source, X& dest) const
void copyOwnerToAll_orig(const X& source, X& dest) const
{
// TODO: [perf] Can we reduce copying from the GPU here?
// TODO: [perf] Maybe create a global buffer instead?
@ -107,6 +107,106 @@ public:
dest.copyFromHost(destAsDuneVector);
}
// Georgs new code intended to use GPU direct
void copyOwnerToAll(const X& source, X& dest) const
{
printf("\n\nGPU DIRECT CODE IS RUN\n\n");
printf("Compile time check:\n");
#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
printf("This MPI library has CUDA-aware support.\n", MPIX_CUDA_AWARE_SUPPORT);
#elif defined(MPIX_CUDA_AWARE_SUPPORT) && !MPIX_CUDA_AWARE_SUPPORT
printf("This MPI library does not have CUDA-aware support.\n");
#else
printf("This MPI library cannot determine if there is CUDA-aware support.\n");
#endif /* MPIX_CUDA_AWARE_SUPPORT */
printf("Run time check:\n");
#if defined(MPIX_CUDA_AWARE_SUPPORT)
if (1 == MPIX_Query_cuda_support()) {
printf("This MPI library has CUDA-aware support.\n");
} else {
printf("This MPI library does not have CUDA-aware support.\n");
}
#else /* !defined(MPIX_CUDA_AWARE_SUPPORT) */
printf("This MPI library cannot determine if there is CUDA-aware support.\n");
#endif /* MPIX_CUDA_AWARE_SUPPORT */
assert(&source == &dest); // In this context, source == dest!!!
std::call_once(m_initializedIndices, [&]() { initIndexSet(); });
int rank;
MPI_Comm_rank(m_cpuOwnerOverlapCopy.communicator(), &rank);
dest.prepareSendBuf(*m_GPUSendBuf, *m_commpair_indicesOwner);
// Start MPI stuff here...
// Note: This has been taken from DUNE's parallel/communicator.hh
MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
size_t numberOfRealRecvRequests = 0;
typedef typename InformationMap::const_iterator const_iterator;
const const_iterator end = messageInformation_.end();
size_t i=0;
int* processMap = new int[messageInformation_.size()];
for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
processMap[i]=info->first;
if(info->second.second.size_) {
MPI_Irecv(m_GPURecvBuf->data()+info->second.second.start_,
info->second.second.size_,
MPI_BYTE,
info->first,
commTag_,
m_cpuOwnerOverlapCopy.communicator(),
&recvRequests[i]);
numberOfRealRecvRequests += 1;
} else {
recvRequests[i]=MPI_REQUEST_NULL;
}
}
i=0;
for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
if(info->second.first.size_) {
MPI_Issend(m_GPUSendBuf->data()+info->second.first.start_,
info->second.first.size_,
MPI_BYTE,
info->first,
commTag_,
m_cpuOwnerOverlapCopy.communicator(),
&sendRequests[i]);
} else {
sendRequests[i]=MPI_REQUEST_NULL;
}
}
i=0;
int finished = MPI_UNDEFINED;
MPI_Status status;
for(i=0; i< numberOfRealRecvRequests; i++) {
status.MPI_ERROR=MPI_SUCCESS;
MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
if(status.MPI_ERROR!=MPI_SUCCESS) {
std::cerr<< rank << ": MPI_Error occurred while receiving message from "<< processMap[finished] << std::endl;
assert(false);
}
}
MPI_Status recvStatus;
for(i=0; i< messageInformation_.size(); i++) {
if(MPI_SUCCESS!=MPI_Wait(&sendRequests[i], &recvStatus)) {
std::cerr << rank << ": MPI_Error occurred while sending message to " << processMap[finished] << std::endl;
assert(false);
}
}
delete[] processMap;
delete[] sendRequests;
delete[] recvRequests;
// ...End of MPI stuff
dest.syncFromRecvBuf(*m_GPURecvBuf, *m_commpair_indicesCopy);
}
private:
@ -119,6 +219,96 @@ private:
mutable std::unique_ptr<CuVector<int>> m_indicesCopy;
mutable std::unique_ptr<CuVector<int>> m_indicesOwner;
mutable std::unique_ptr<CuVector<int>> m_commpair_indicesCopy;
mutable std::unique_ptr<CuVector<int>> m_commpair_indicesOwner;
mutable std::unique_ptr<CuVector<field_type>> m_GPUSendBuf;
mutable std::unique_ptr<CuVector<field_type>> m_GPURecvBuf;
struct MessageInformation
{
MessageInformation() : start_(0), size_(0) {}
MessageInformation(size_t start, size_t size) : start_(start), size_(size) {}
size_t start_; // offset in elements of "field_type"
size_t size_; // size in bytes
};
typedef std::map<int,std::pair<MessageInformation,MessageInformation> > InformationMap;
mutable InformationMap messageInformation_;
typedef std::map<int,std::pair<std::vector<int>,std::vector<int> > > IM;
mutable IM m_im;
constexpr static int commTag_ = 0; // So says DUNE
void buildCommPairIdxs() const
{
int rank;
MPI_Comm_rank(m_cpuOwnerOverlapCopy.communicator(), &rank);
auto &ri = m_cpuOwnerOverlapCopy.remoteIndices();
auto end = ri.end();
std::vector<int> commpair_indicesCopyOnCPU;
std::vector<int> commpair_indicesOwnerCPU;
for(auto process = ri.begin(); process != end; ++process) {
int size = 0;
m_im[process->first] = std::pair(std::vector<int>(), std::vector<int>());
for(int send = 0; send < 2; ++send) {
auto remoteEnd = send ? process->second.first->end()
: process->second.second->end();
auto remote = send ? process->second.first->begin()
: process->second.second->begin();
while(remote != remoteEnd) {
if (send ? (remote->localIndexPair().local().attribute() == 1)
: (remote->attribute() == 1)) {
++size;
if (send) {
m_im[process->first].first.push_back(remote->localIndexPair().local().local());
} else {
m_im[process->first].second.push_back(remote->localIndexPair().local().local());
}
}
++remote;
}
}
}
int sendBufIdx = 0;
int recvBufIdx = 0;
for (auto it = m_im.begin(); it != m_im.end(); it++) {
int noSend = it->second.first.size();
int noRecv = it->second.second.size();
if (noSend + noRecv > 0) {
messageInformation_.insert(
std::make_pair(it->first,
std::make_pair(MessageInformation(
sendBufIdx * block_size,
noSend * block_size * sizeof(field_type)),
MessageInformation(
recvBufIdx * block_size,
noRecv * block_size * sizeof(field_type)))));
for(int x = 0; x < noSend; x++) {
for(int bs = 0; bs < block_size; bs++) {
commpair_indicesOwnerCPU.push_back(it->second.first[x] * block_size + bs);
}
}
for(int x = 0; x < noRecv; x++) {
for(int bs = 0; bs < block_size; bs++) {
commpair_indicesCopyOnCPU.push_back(it->second.second[x] * block_size + bs);
}
}
sendBufIdx += noSend;
recvBufIdx += noRecv;
}
}
m_commpair_indicesCopy = std::make_unique<CuVector<int>>(commpair_indicesCopyOnCPU);
m_commpair_indicesOwner = std::make_unique<CuVector<int>>(commpair_indicesOwnerCPU);
m_GPUSendBuf = std::make_unique<CuVector<field_type>>(sendBufIdx * block_size);
m_GPURecvBuf = std::make_unique<CuVector<field_type>>(recvBufIdx * block_size);
}
void initIndexSet() const
{
@ -143,6 +333,8 @@ private:
m_indicesCopy = std::make_unique<CuVector<int>>(indicesCopyOnCPU);
m_indicesOwner = std::make_unique<CuVector<int>>(indicesOwnerCPU);
buildCommPairIdxs();
}
};
} // namespace Opm::cuistl

View File

@ -286,6 +286,20 @@ CuVector<T>::copyToHost(std::vector<T>& data) const
{
copyToHost(data.data(), data.size());
}
template <typename T>
void
CuVector<T>::prepareSendBuf(CuVector<T>& buffer, const CuVector<int>& indexSet) const
{
return detail::prepareSendBuf(m_dataOnDevice, buffer.data(), indexSet.dim(), indexSet.data());
}
template <typename T>
void
CuVector<T>::syncFromRecvBuf(CuVector<T>& buffer, const CuVector<int>& indexSet) const
{
return detail::syncFromRecvBuf(m_dataOnDevice, buffer.data(), indexSet.dim(), indexSet.data());
}
template class CuVector<double>;
template class CuVector<float>;
template class CuVector<int>;

View File

@ -231,6 +231,9 @@ public:
*/
void copyToHost(std::vector<T>& data) const;
void prepareSendBuf(CuVector<T>& buffer, const CuVector<int>& indexSet) const;
void syncFromRecvBuf(CuVector<T>& buffer, const CuVector<int>& indexSet) const;
/**
* @brief operator *= multiplies every element by scalar
* @param scalar the scalar to with which to multiply every element

View File

@ -91,6 +91,27 @@ namespace
const auto threads = getThreads(numberOfElements);
return (numberOfElements + threads - 1) / threads;
}
template <class T>
__global__ void
prepareSendBufKernel(const T* a, T* buffer, size_t numberOfElements, const int* indices)
{
const auto globalIndex = blockDim.x * blockIdx.x + threadIdx.x;
if (globalIndex < numberOfElements) {
buffer[globalIndex] = a[indices[globalIndex]];
}
}
template <class T>
__global__ void
syncFromRecvBufKernel(T* a, T* buffer, size_t numberOfElements, const int* indices)
{
const auto globalIndex = blockDim.x * blockIdx.x + threadIdx.x;
if (globalIndex < numberOfElements) {
a[indices[globalIndex]] = buffer[globalIndex];
}
}
} // namespace
template <class T>
@ -132,6 +153,25 @@ template double innerProductAtIndices(const double*, const double*, double* buff
template float innerProductAtIndices(const float*, const float*, float* buffer, size_t, const int*);
template int innerProductAtIndices(const int*, const int*, int* buffer, size_t, const int*);
template <class T>
void prepareSendBuf(const T* deviceA, T* buffer, size_t numberOfElements, const int* indices)
{
prepareSendBufKernel<<<getBlocks(numberOfElements), getThreads(numberOfElements)>>>(deviceA, buffer, numberOfElements, indices);
cudaDeviceSynchronize(); // The buffers are prepared for MPI. Wait for them to finish.
}
template void prepareSendBuf(const double* deviceA, double* buffer, size_t numberOfElements, const int* indices);
template void prepareSendBuf(const float* deviceA, float* buffer, size_t numberOfElements, const int* indices);
template void prepareSendBuf(const int* deviceA, int* buffer, size_t numberOfElements, const int* indices);
template <class T>
void syncFromRecvBuf(T* deviceA, T* buffer, size_t numberOfElements, const int* indices)
{
syncFromRecvBufKernel<<<getBlocks(numberOfElements), getThreads(numberOfElements)>>>(deviceA, buffer, numberOfElements, indices);
//cudaDeviceSynchronize(); // Not needed, I guess...
}
template void syncFromRecvBuf(double* deviceA, double* buffer, size_t numberOfElements, const int* indices);
template void syncFromRecvBuf(float* deviceA, float* buffer, size_t numberOfElements, const int* indices);
template void syncFromRecvBuf(int* deviceA, int* buffer, size_t numberOfElements, const int* indices);
template <class T>
void

View File

@ -55,6 +55,11 @@ void setZeroAtIndexSet(T* deviceData, size_t numberOfElements, const int* indice
template <class T>
T innerProductAtIndices(const T* deviceA, const T* deviceB, T* buffer, size_t numberOfElements, const int* indices);
template <class T>
void prepareSendBuf(const T* deviceA, T* buffer, size_t numberOfElements, const int* indices);
template <class T>
void syncFromRecvBuf(T* deviceA, T* buffer, size_t numberOfElements, const int* indices);
/**
* @brief Compue the weighted matrix vector product where the matrix is diagonal, the diagonal is a vector, meaning we
* compute the Hadamard product.