From eb6f9dc1f9aa5a3a7416a1b6e6f375e138edd4dd Mon Sep 17 00:00:00 2001 From: Georg Zitzlsberger Date: Thu, 1 Feb 2024 11:15:25 +0100 Subject: [PATCH] Prototype implementation of CUDA aware MPI --- .../linalg/cuistl/CuOwnerOverlapCopy.hpp | 194 +++++++++++++++++- opm/simulators/linalg/cuistl/CuVector.cpp | 14 ++ opm/simulators/linalg/cuistl/CuVector.hpp | 3 + .../linalg/cuistl/detail/vector_operations.cu | 40 ++++ .../cuistl/detail/vector_operations.hpp | 5 + 5 files changed, 255 insertions(+), 1 deletion(-) diff --git a/opm/simulators/linalg/cuistl/CuOwnerOverlapCopy.hpp b/opm/simulators/linalg/cuistl/CuOwnerOverlapCopy.hpp index a39b98b5e..36b41dcac 100644 --- a/opm/simulators/linalg/cuistl/CuOwnerOverlapCopy.hpp +++ b/opm/simulators/linalg/cuistl/CuOwnerOverlapCopy.hpp @@ -97,7 +97,7 @@ public: * @param[in] source * @param[out] dest */ - void copyOwnerToAll(const X& source, X& dest) const + void copyOwnerToAll_orig(const X& source, X& dest) const { // TODO: [perf] Can we reduce copying from the GPU here? // TODO: [perf] Maybe create a global buffer instead? @@ -107,6 +107,106 @@ public: dest.copyFromHost(destAsDuneVector); } + // Georgs new code intended to use GPU direct + void copyOwnerToAll(const X& source, X& dest) const + { + + printf("\n\nGPU DIRECT CODE IS RUN\n\n"); + printf("Compile time check:\n"); +#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT + printf("This MPI library has CUDA-aware support.\n", MPIX_CUDA_AWARE_SUPPORT); +#elif defined(MPIX_CUDA_AWARE_SUPPORT) && !MPIX_CUDA_AWARE_SUPPORT + printf("This MPI library does not have CUDA-aware support.\n"); +#else + printf("This MPI library cannot determine if there is CUDA-aware support.\n"); +#endif /* MPIX_CUDA_AWARE_SUPPORT */ + + printf("Run time check:\n"); +#if defined(MPIX_CUDA_AWARE_SUPPORT) + if (1 == MPIX_Query_cuda_support()) { + printf("This MPI library has CUDA-aware support.\n"); + } else { + printf("This MPI library does not have CUDA-aware support.\n"); + } +#else /* !defined(MPIX_CUDA_AWARE_SUPPORT) */ + printf("This MPI library cannot determine if there is CUDA-aware support.\n"); +#endif /* MPIX_CUDA_AWARE_SUPPORT */ + + + assert(&source == &dest); // In this context, source == dest!!! + std::call_once(m_initializedIndices, [&]() { initIndexSet(); }); + + int rank; + MPI_Comm_rank(m_cpuOwnerOverlapCopy.communicator(), &rank); + dest.prepareSendBuf(*m_GPUSendBuf, *m_commpair_indicesOwner); + + // Start MPI stuff here... + // Note: This has been taken from DUNE's parallel/communicator.hh + MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()]; + MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()]; + size_t numberOfRealRecvRequests = 0; + + typedef typename InformationMap::const_iterator const_iterator; + const const_iterator end = messageInformation_.end(); + size_t i=0; + int* processMap = new int[messageInformation_.size()]; + for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) { + processMap[i]=info->first; + if(info->second.second.size_) { + MPI_Irecv(m_GPURecvBuf->data()+info->second.second.start_, + info->second.second.size_, + MPI_BYTE, + info->first, + commTag_, + m_cpuOwnerOverlapCopy.communicator(), + &recvRequests[i]); + numberOfRealRecvRequests += 1; + } else { + recvRequests[i]=MPI_REQUEST_NULL; + } + } + + i=0; + for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) { + if(info->second.first.size_) { + MPI_Issend(m_GPUSendBuf->data()+info->second.first.start_, + info->second.first.size_, + MPI_BYTE, + info->first, + commTag_, + m_cpuOwnerOverlapCopy.communicator(), + &sendRequests[i]); + } else { + sendRequests[i]=MPI_REQUEST_NULL; + } + } + i=0; + int finished = MPI_UNDEFINED; + MPI_Status status; + for(i=0; i< numberOfRealRecvRequests; i++) { + status.MPI_ERROR=MPI_SUCCESS; + MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status); + + if(status.MPI_ERROR!=MPI_SUCCESS) { + std::cerr<< rank << ": MPI_Error occurred while receiving message from "<< processMap[finished] << std::endl; + assert(false); + } + } + MPI_Status recvStatus; + for(i=0; i< messageInformation_.size(); i++) { + if(MPI_SUCCESS!=MPI_Wait(&sendRequests[i], &recvStatus)) { + std::cerr << rank << ": MPI_Error occurred while sending message to " << processMap[finished] << std::endl; + assert(false); + } + } + delete[] processMap; + delete[] sendRequests; + delete[] recvRequests; + // ...End of MPI stuff + + dest.syncFromRecvBuf(*m_GPURecvBuf, *m_commpair_indicesCopy); + } + private: @@ -119,6 +219,96 @@ private: mutable std::unique_ptr> m_indicesCopy; mutable std::unique_ptr> m_indicesOwner; + mutable std::unique_ptr> m_commpair_indicesCopy; + mutable std::unique_ptr> m_commpair_indicesOwner; + mutable std::unique_ptr> m_GPUSendBuf; + mutable std::unique_ptr> m_GPURecvBuf; + + struct MessageInformation + { + MessageInformation() : start_(0), size_(0) {} + MessageInformation(size_t start, size_t size) : start_(start), size_(size) {} + size_t start_; // offset in elements of "field_type" + size_t size_; // size in bytes + }; + + typedef std::map > InformationMap; + mutable InformationMap messageInformation_; + typedef std::map,std::vector > > IM; + mutable IM m_im; + + constexpr static int commTag_ = 0; // So says DUNE + + void buildCommPairIdxs() const + { + int rank; + MPI_Comm_rank(m_cpuOwnerOverlapCopy.communicator(), &rank); + auto &ri = m_cpuOwnerOverlapCopy.remoteIndices(); + auto end = ri.end(); + std::vector commpair_indicesCopyOnCPU; + std::vector commpair_indicesOwnerCPU; + + for(auto process = ri.begin(); process != end; ++process) { + int size = 0; + m_im[process->first] = std::pair(std::vector(), std::vector()); + for(int send = 0; send < 2; ++send) { + auto remoteEnd = send ? process->second.first->end() + : process->second.second->end(); + auto remote = send ? process->second.first->begin() + : process->second.second->begin(); + + while(remote != remoteEnd) { + if (send ? (remote->localIndexPair().local().attribute() == 1) + : (remote->attribute() == 1)) { + ++size; + if (send) { + m_im[process->first].first.push_back(remote->localIndexPair().local().local()); + } else { + m_im[process->first].second.push_back(remote->localIndexPair().local().local()); + } + } + ++remote; + } + } + } + + int sendBufIdx = 0; + int recvBufIdx = 0; + for (auto it = m_im.begin(); it != m_im.end(); it++) { + int noSend = it->second.first.size(); + int noRecv = it->second.second.size(); + + if (noSend + noRecv > 0) { + messageInformation_.insert( + std::make_pair(it->first, + std::make_pair(MessageInformation( + sendBufIdx * block_size, + noSend * block_size * sizeof(field_type)), + MessageInformation( + recvBufIdx * block_size, + noRecv * block_size * sizeof(field_type))))); + + for(int x = 0; x < noSend; x++) { + for(int bs = 0; bs < block_size; bs++) { + commpair_indicesOwnerCPU.push_back(it->second.first[x] * block_size + bs); + } + } + for(int x = 0; x < noRecv; x++) { + for(int bs = 0; bs < block_size; bs++) { + commpair_indicesCopyOnCPU.push_back(it->second.second[x] * block_size + bs); + } + } + sendBufIdx += noSend; + recvBufIdx += noRecv; + } + } + + m_commpair_indicesCopy = std::make_unique>(commpair_indicesCopyOnCPU); + m_commpair_indicesOwner = std::make_unique>(commpair_indicesOwnerCPU); + + m_GPUSendBuf = std::make_unique>(sendBufIdx * block_size); + m_GPURecvBuf = std::make_unique>(recvBufIdx * block_size); + } void initIndexSet() const { @@ -143,6 +333,8 @@ private: m_indicesCopy = std::make_unique>(indicesCopyOnCPU); m_indicesOwner = std::make_unique>(indicesOwnerCPU); + + buildCommPairIdxs(); } }; } // namespace Opm::cuistl diff --git a/opm/simulators/linalg/cuistl/CuVector.cpp b/opm/simulators/linalg/cuistl/CuVector.cpp index baf9b6fa8..cbcde5084 100644 --- a/opm/simulators/linalg/cuistl/CuVector.cpp +++ b/opm/simulators/linalg/cuistl/CuVector.cpp @@ -286,6 +286,20 @@ CuVector::copyToHost(std::vector& data) const { copyToHost(data.data(), data.size()); } + +template +void +CuVector::prepareSendBuf(CuVector& buffer, const CuVector& indexSet) const +{ + return detail::prepareSendBuf(m_dataOnDevice, buffer.data(), indexSet.dim(), indexSet.data()); +} +template +void +CuVector::syncFromRecvBuf(CuVector& buffer, const CuVector& indexSet) const +{ + return detail::syncFromRecvBuf(m_dataOnDevice, buffer.data(), indexSet.dim(), indexSet.data()); +} + template class CuVector; template class CuVector; template class CuVector; diff --git a/opm/simulators/linalg/cuistl/CuVector.hpp b/opm/simulators/linalg/cuistl/CuVector.hpp index 4ab8392f6..004014315 100644 --- a/opm/simulators/linalg/cuistl/CuVector.hpp +++ b/opm/simulators/linalg/cuistl/CuVector.hpp @@ -231,6 +231,9 @@ public: */ void copyToHost(std::vector& data) const; + void prepareSendBuf(CuVector& buffer, const CuVector& indexSet) const; + void syncFromRecvBuf(CuVector& buffer, const CuVector& indexSet) const; + /** * @brief operator *= multiplies every element by scalar * @param scalar the scalar to with which to multiply every element diff --git a/opm/simulators/linalg/cuistl/detail/vector_operations.cu b/opm/simulators/linalg/cuistl/detail/vector_operations.cu index 18ccf2ac2..b3e9716ee 100644 --- a/opm/simulators/linalg/cuistl/detail/vector_operations.cu +++ b/opm/simulators/linalg/cuistl/detail/vector_operations.cu @@ -91,6 +91,27 @@ namespace const auto threads = getThreads(numberOfElements); return (numberOfElements + threads - 1) / threads; } + + template + __global__ void + prepareSendBufKernel(const T* a, T* buffer, size_t numberOfElements, const int* indices) + { + const auto globalIndex = blockDim.x * blockIdx.x + threadIdx.x; + + if (globalIndex < numberOfElements) { + buffer[globalIndex] = a[indices[globalIndex]]; + } + } + template + __global__ void + syncFromRecvBufKernel(T* a, T* buffer, size_t numberOfElements, const int* indices) + { + const auto globalIndex = blockDim.x * blockIdx.x + threadIdx.x; + + if (globalIndex < numberOfElements) { + a[indices[globalIndex]] = buffer[globalIndex]; + } + } } // namespace template @@ -132,6 +153,25 @@ template double innerProductAtIndices(const double*, const double*, double* buff template float innerProductAtIndices(const float*, const float*, float* buffer, size_t, const int*); template int innerProductAtIndices(const int*, const int*, int* buffer, size_t, const int*); +template +void prepareSendBuf(const T* deviceA, T* buffer, size_t numberOfElements, const int* indices) +{ + prepareSendBufKernel<<>>(deviceA, buffer, numberOfElements, indices); + cudaDeviceSynchronize(); // The buffers are prepared for MPI. Wait for them to finish. +} +template void prepareSendBuf(const double* deviceA, double* buffer, size_t numberOfElements, const int* indices); +template void prepareSendBuf(const float* deviceA, float* buffer, size_t numberOfElements, const int* indices); +template void prepareSendBuf(const int* deviceA, int* buffer, size_t numberOfElements, const int* indices); + +template +void syncFromRecvBuf(T* deviceA, T* buffer, size_t numberOfElements, const int* indices) +{ + syncFromRecvBufKernel<<>>(deviceA, buffer, numberOfElements, indices); + //cudaDeviceSynchronize(); // Not needed, I guess... +} +template void syncFromRecvBuf(double* deviceA, double* buffer, size_t numberOfElements, const int* indices); +template void syncFromRecvBuf(float* deviceA, float* buffer, size_t numberOfElements, const int* indices); +template void syncFromRecvBuf(int* deviceA, int* buffer, size_t numberOfElements, const int* indices); template void diff --git a/opm/simulators/linalg/cuistl/detail/vector_operations.hpp b/opm/simulators/linalg/cuistl/detail/vector_operations.hpp index 0ea08155b..58e8c3f25 100644 --- a/opm/simulators/linalg/cuistl/detail/vector_operations.hpp +++ b/opm/simulators/linalg/cuistl/detail/vector_operations.hpp @@ -55,6 +55,11 @@ void setZeroAtIndexSet(T* deviceData, size_t numberOfElements, const int* indice template T innerProductAtIndices(const T* deviceA, const T* deviceB, T* buffer, size_t numberOfElements, const int* indices); +template +void prepareSendBuf(const T* deviceA, T* buffer, size_t numberOfElements, const int* indices); +template +void syncFromRecvBuf(T* deviceA, T* buffer, size_t numberOfElements, const int* indices); + /** * @brief Compue the weighted matrix vector product where the matrix is diagonal, the diagonal is a vector, meaning we * compute the Hadamard product.