From 7b3c87c656cf45c5eae4a9869dcf623276f50641 Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 23 Dec 2024 20:48:00 +0100 Subject: [PATCH] still need to modify datareader to use netcdf on only one thread --- src/hurricanedata/datareader.cu | 58 +++++++ src/hurricanedata/datareader.h | 26 ++++ src/hurricanedata/filepathmanager.cpp | 40 +++++ src/hurricanedata/filepathmanager.h | 19 +++ src/hurricanedata/gpubuffer.cu | 213 ++++++++++++++++++++------ src/hurricanedata/gpubuffer.h | 27 ++-- src/hurricanedata/gpubufferhandler.cu | 65 ++++++++ src/hurricanedata/gpubufferhandler.h | 28 ++++ src/hurricanedata/gpubufferhelper.h | 6 +- src/main.cu | 59 +++++-- test_read.py | 23 ++- 11 files changed, 487 insertions(+), 77 deletions(-) create mode 100644 src/hurricanedata/datareader.cu create mode 100644 src/hurricanedata/datareader.h create mode 100644 src/hurricanedata/filepathmanager.cpp create mode 100644 src/hurricanedata/filepathmanager.h create mode 100644 src/hurricanedata/gpubufferhandler.cu create mode 100644 src/hurricanedata/gpubufferhandler.h diff --git a/src/hurricanedata/datareader.cu b/src/hurricanedata/datareader.cu new file mode 100644 index 0000000..6e26003 --- /dev/null +++ b/src/hurricanedata/datareader.cu @@ -0,0 +1,58 @@ +#include "datareader.h" + +#include +#include + +using namespace std; +using namespace netCDF; + +DataReader::DataReader(const std::string &path, std::string variableName): +filePathManager(path), variableName(variableName) { } + +size_t DataReader::fileLength(size_t fileIndex) { + cout << "filePathMan = " << filePathManager.getPath(fileIndex) << " bla = " << fileIndex << "\n"; + + NcFile data(filePathManager.getPath(fileIndex), NcFile::read); + + multimap vars = data.getVars(); + + NcVar var = vars.find(variableName)->second; + + size_t length = 1; + for (NcDim dim: var.getDims()) { + length *= dim.getSize(); + } + + // TODO: Turns out c-NetCDF is not thread safe :( https://github.com/Unidata/netcdf-c/issues/1373 + // size_t length = 34933248; + std::cout << "return length" << length << "\n"; + return length; +} + +template +void DataReader::loadFile(T* dataOut, size_t fileIndex) { + std::cout << "loading file" << fileIndex <<"\n"; + NcFile data(filePathManager.getPath(fileIndex), NcFile::read); + + // multimap vars = data.getVars(); + + // NcVar var = vars.find(variableName)->second; + + // var.getVar(dataOut); +} + +template void DataReader::loadFile(float* dataOut, size_t fileIndex); +template void DataReader::loadFile(int* dataOut, size_t fileIndex); + +DataReader::~DataReader() { + +} + +// template +// void DataReader::readAndAllocateAxis(T** axis_ptr, size_t *size, const string &varName) { +// assert(var.getDimCount() == 1); +// netCDF::NcDim dim = var.getDim(0); +// *size = dim.getSize(); +// cudaError_t status = cudaMallocManaged(axis_ptr, *size*sizeof(T)); +// var.getVar(*axis_ptr); +// } \ No newline at end of file diff --git a/src/hurricanedata/datareader.h b/src/hurricanedata/datareader.h new file mode 100644 index 0000000..eee9605 --- /dev/null +++ b/src/hurricanedata/datareader.h @@ -0,0 +1,26 @@ +#ifndef DATAREADER_H +#define DATAREADER_H + +#include +#include +#include + +#include "filepathmanager.h" + + +class DataReader { +public: + DataReader(const std::string &path, std::string variableName); + size_t fileLength(size_t fileIndex); + + template + void loadFile(T* dataOut, size_t fileIndex); + + ~DataReader(); +private: + FilePathManager filePathManager; + std::string variableName; +}; + + +#endif //DATAREADER_H diff --git a/src/hurricanedata/filepathmanager.cpp b/src/hurricanedata/filepathmanager.cpp new file mode 100644 index 0000000..03c7f39 --- /dev/null +++ b/src/hurricanedata/filepathmanager.cpp @@ -0,0 +1,40 @@ +#include +#include +#include +#include +#include + +#include "filepathmanager.h" + +namespace fs = std::filesystem; + +FilePathManager::FilePathManager(const std::string& path) { + // Check if the directory exists + if (!fs::exists(path) || !fs::is_directory(path)) { + std::cerr << "Error: Directory does not exist or is not valid." << std::endl; + exit(1); + } + + // Iterate over directory entries + for (const auto& entry : fs::directory_iterator(path)) { + if (entry.is_regular_file() && entry.path().extension() == ".nc4") { + fileNames.push_back(entry.path().string()); + } + } + + // Sort file names lexicographically + std::sort(fileNames.begin(), fileNames.end()); + + // Print sorted file names + std::cout << "Files in directory '" << path << "' sorted lexicographically:\n"; + for (const auto& fileName : fileNames) { + std::cout << fileName << std::endl; + } +} + +const char* FilePathManager::getPath(size_t index) const { + return fileNames.at(index).c_str(); +} + + +FilePathManager::~FilePathManager() { } \ No newline at end of file diff --git a/src/hurricanedata/filepathmanager.h b/src/hurricanedata/filepathmanager.h new file mode 100644 index 0000000..2b1304d --- /dev/null +++ b/src/hurricanedata/filepathmanager.h @@ -0,0 +1,19 @@ +#ifndef FILEPATHMANAGER_H +#define FILEPATHMANAGER_H + +#include +#include + +class FilePathManager { +public: + FilePathManager(const std::string& path); + size_t getNumberOfFiles(); + const char* getPath(size_t index) const; + + ~FilePathManager(); + +private: + std::vector fileNames; +}; + +#endif //FILEPATHMANAGER_H \ No newline at end of file diff --git a/src/hurricanedata/gpubuffer.cu b/src/hurricanedata/gpubuffer.cu index 40ab5b7..e9a8f40 100644 --- a/src/hurricanedata/gpubuffer.cu +++ b/src/hurricanedata/gpubuffer.cu @@ -1,66 +1,183 @@ #include "gpubuffer.h" -#include "fielddata.h" -#include "gpubufferhelper.h" + +#include +#include +#include +#include +#include +#include #include - - -using namespace std; using namespace netCDF; -GPUBuffer::GPUBuffer(std::string path, std::string variableName): -path(path), variableName(variableName) { - NcFile data(path, NcFile::read); +using namespace std; - multimap vars = data.getVars(); +struct File { + condition_variable cv; + mutex m; + bool valid; + size_t size; + float *h_data; // host data + float *d_data; // device data +}; - cudaMallocManaged(&fmd, sizeof(FieldMetadata)); +struct LoadFileJob { + size_t fileIndex; + size_t bufferIndex; +}; - readAndAllocateAxis(&fmd->lons, &fmd->widthSize, vars.find("lon")->second); - readAndAllocateAxis(&fmd->lats, &fmd->heightSize, vars.find("lat")->second); - readAndAllocateAxis(&fmd->levs, &fmd->depthSize, vars.find("lev")->second); -} +class GPUBuffer::impl { +public: + impl(DataReader& dataReader); + void loadFile(LoadFileJob job); + ~impl(); -FieldData GPUBuffer::nextFieldData() { - NcFile data(path, NcFile::read); + File buffer[numBufferedFiles]; - multimap vars = data.getVars(); + // Thread worker things + void worker(); + queue jobs; + unique_ptr ioworker; + cudaStream_t iostream; + DataReader& dataReader; + condition_variable queuecv; + mutex queuecv_m; + bool workerRunning = true; +}; - FieldData fd; - size_t timeSize; - readAndAllocateAxis(&fd.times, &fd.timeSize, vars.find("time")->second); +GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique(dataReader)) { } + +GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { + cudaStreamCreate(&iostream); + // size_t size = dataReader.fileLength(0); + size_t size = 5; + + for (size_t i = 0; i < numBufferedFiles; i++) { + { + File &file = buffer[i]; + lock_guard lk(file.m); + cudaMallocHost(&file.h_data, sizeof(float)*size); + cudaError_t status = cudaMalloc(&file.d_data, sizeof(float)*size); + if (status != cudaSuccess) + cerr << "Error allocating device memory. Status code: " << status << "\n"; + file.size = size; + file.valid = false; + } + { + lock_guard lk(queuecv_m); + LoadFileJob job = { + .fileIndex = i, + .bufferIndex = i + }; + jobs.push(job); + } - NcVar var = vars.find(variableName)->second; - - int length = 1; - for (NcDim dim: var.getDims()) { - length *= dim.getSize(); } + auto t = thread([]() { + NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read); + multimap vars = data.getVars(); + }); + t.join(); - // Store NetCDF variable in pinned memory on host - float *h_array; + auto tt = thread([]() { + NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read); + multimap vars = data.getVars(); + }); + tt.join(); - cudaMallocHost(&h_array, sizeof(float)*length); - - var.getVar(h_array); - - // Copy data to device - cudaError_t status = cudaMalloc(&fd.valArrays[0], sizeof(float)*length); - if (status != cudaSuccess) - cout << "Error allocating memory: " << status << "\n"; - - cudaMemcpyAsync(fd.valArrays[0], h_array, sizeof(float)*length, cudaMemcpyHostToDevice); - - cudaDeviceSynchronize(); // Heavy hammer synchronisation // TODO: Use streams - - cudaFreeHost(h_array); - - return fd; + ioworker = make_unique([this]() { worker(); }); } -GPUBuffer::~GPUBuffer() { - cudaFree(fmd->lons); - cudaFree(fmd->lats); - cudaFree(fmd->levs); - cudaFree(fmd); +GPUBuffer::~GPUBuffer() { } + +GPUBuffer::impl::~impl() { + { + lock_guard lk(queuecv_m); + workerRunning = false; + } + queuecv.notify_all(); + ioworker->join(); + for (size_t i = 0; i < numBufferedFiles; i++) { + File &file = buffer[i]; + cudaFree(file.d_data); + cudaFree(file.h_data); + } + cudaStreamDestroy(iostream); +} + +void GPUBuffer::impl::loadFile(LoadFileJob job) { + File &file = buffer[job.bufferIndex]; + + { + lock_guard lk(file.m); + assert(!file.valid); + dataReader.loadFile(file.h_data, job.fileIndex); + cudaMemcpyAsync(file.d_data, file.h_data, sizeof(float)*file.size, cudaMemcpyHostToDevice, iostream); + cudaStreamSynchronize(iostream); + buffer[job.bufferIndex].valid = true; + cout << "loaded file with index" << job.bufferIndex << "\n"; + } + file.cv.notify_all(); +} + +void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) { + LoadFileJob job = { + .fileIndex = fileIndex, + .bufferIndex = bufferIndex + }; + + // Main thread theoretically blocks on 2 mutexes here + // but it _should_ never have to wait for them. + { + File &file = pImpl->buffer[bufferIndex]; + + std::unique_lock lk(file.m, std::defer_lock); + bool lockval = lk.try_lock(); + if (!lockval) { + cout << "waited on GPUBuffer during loadFile orchestration :(\n"; + lk.lock(); + } + file.valid = false; + } + { + std::unique_lock lk(pImpl->queuecv_m, std::defer_lock); + bool lockval = lk.try_lock(); + if (!lockval) { + cout << "waited on IOworker queue during loadFile orchestration :(\n"; + lk.lock(); + } + pImpl->jobs.push(job); + } + pImpl->queuecv.notify_all(); +} + +void GPUBuffer::impl::worker() { + while(workerRunning) { + LoadFileJob job; + { + unique_lock lk(queuecv_m); + queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); }); + if (!workerRunning) { + return; + } + + job = jobs.front(); + jobs.pop(); + } + loadFile(job); + } +} + +DataHandle GPUBuffer::getDataHandle(size_t bufferIndex) { + File &file = pImpl->buffer[bufferIndex]; + + // TODO: Might be nice to measure the blocking time here. + unique_lock lk(file.m); + file.cv.wait(lk, [this, bufferIndex]{ return pImpl->buffer[bufferIndex].valid; }); + + DataHandle dh = { + .d_data = file.d_data, + .size = file.size + }; + return dh; } \ No newline at end of file diff --git a/src/hurricanedata/gpubuffer.h b/src/hurricanedata/gpubuffer.h index 06ee9c6..4b1b5b3 100644 --- a/src/hurricanedata/gpubuffer.h +++ b/src/hurricanedata/gpubuffer.h @@ -1,24 +1,31 @@ #ifndef GPUBUFFER_H #define GPUBUFFER_H -#include "fielddata.h" - #include +#include +#include + +#include "datareader.h" + +struct DataHandle { + float *d_data; + size_t size; +}; class GPUBuffer { public: - GPUBuffer(std::string path, std::string variableName); + static constexpr size_t numBufferedFiles = 3; - FieldData nextFieldData(); + GPUBuffer(DataReader& dataReader); + + void loadFile(size_t fileIndex, size_t bufferIndex); // Async call + + DataHandle getDataHandle(size_t bufferIndex); // Potentially blocking ~GPUBuffer(); - - FieldMetadata *fmd; private: - // TODO: Implement GPUBuffer - std::string path; - std::string variableName; - + class impl; + std::experimental::propagate_const> pImpl; }; #endif //GPUBUFFER_H diff --git a/src/hurricanedata/gpubufferhandler.cu b/src/hurricanedata/gpubufferhandler.cu new file mode 100644 index 0000000..f01f8c2 --- /dev/null +++ b/src/hurricanedata/gpubufferhandler.cu @@ -0,0 +1,65 @@ +#include "gpubufferhandler.h" +#include "fielddata.h" +#include "gpubufferhelper.h" + +#include + +using namespace std; +using namespace netCDF; + +GPUBufferHandler::GPUBufferHandler(const std::string &path, std::string variableName): +filePathManager(path), variableName(variableName), presentTimeIndex(0), fileIndex(0) { + NcFile data(filePathManager.getPath(fileIndex), NcFile::read); + + multimap vars = data.getVars(); + + cudaMallocManaged(&fmd, sizeof(FieldMetadata)); + + readAndAllocateAxis(&fmd->lons, &fmd->widthSize, vars.find("lon")->second); + readAndAllocateAxis(&fmd->lats, &fmd->heightSize, vars.find("lat")->second); + readAndAllocateAxis(&fmd->levs, &fmd->depthSize, vars.find("lev")->second); +} + +FieldData GPUBufferHandler::nextFieldData() { + NcFile data(filePathManager.getPath(fileIndex), NcFile::read); + + multimap vars = data.getVars(); + + FieldData fd; + size_t timeSize; + readAndAllocateAxis(&fd.times, &fd.timeSize, vars.find("time")->second); + + NcVar var = vars.find(variableName)->second; + + int length = 1; + for (NcDim dim: var.getDims()) { + length *= dim.getSize(); + } + + // Store NetCDF variable in pinned memory on host + float *h_array; + + cudaMallocHost(&h_array, sizeof(float)*length); + + var.getVar(h_array); + + // Copy data to device + cudaError_t status = cudaMalloc(&fd.valArrays[0], sizeof(float)*length); + if (status != cudaSuccess) + cout << "Error allocating memory: " << status << "\n"; + + cudaMemcpyAsync(fd.valArrays[0], h_array, sizeof(float)*length, cudaMemcpyHostToDevice); + + cudaDeviceSynchronize(); // Heavy hammer synchronisation // TODO: Use streams + + cudaFreeHost(h_array); + + return fd; +} + +GPUBufferHandler::~GPUBufferHandler() { + cudaFree(fmd->lons); + cudaFree(fmd->lats); + cudaFree(fmd->levs); + cudaFree(fmd); +} \ No newline at end of file diff --git a/src/hurricanedata/gpubufferhandler.h b/src/hurricanedata/gpubufferhandler.h new file mode 100644 index 0000000..259562e --- /dev/null +++ b/src/hurricanedata/gpubufferhandler.h @@ -0,0 +1,28 @@ +#ifndef GPUBUFFERHANDLER_H +#define GPUBUFFERHANDLER_H + +#include "fielddata.h" +#include "filepathmanager.h" + +#include + +class GPUBufferHandler { +public: + GPUBufferHandler(const std::string &path, std::string variableName); + + FieldData nextFieldData(); + + ~GPUBufferHandler(); + + FieldMetadata *fmd; + +private: + // TODO: Implement GPUBuffer + FilePathManager filePathManager; + std::string variableName; + + size_t presentTimeIndex; + size_t fileIndex; +}; + +#endif //GPUBUFFERHANDLER_H diff --git a/src/hurricanedata/gpubufferhelper.h b/src/hurricanedata/gpubufferhelper.h index 2f54634..91e7d8c 100644 --- a/src/hurricanedata/gpubufferhelper.h +++ b/src/hurricanedata/gpubufferhelper.h @@ -1,7 +1,11 @@ #include #include + +using namespace std; +using namespace netCDF; + template -void readAndAllocateAxis(T** axis_ptr, size_t *size, const netCDF::NcVar &var) { +void readAndAllocateAxis(T** axis_ptr, size_t *size, NcVar var) { assert(var.getDimCount() == 1); netCDF::NcDim dim = var.getDim(0); *size = dim.getSize(); diff --git a/src/main.cu b/src/main.cu index 5e81a5c..09e5ce5 100644 --- a/src/main.cu +++ b/src/main.cu @@ -1,19 +1,35 @@ -#include "hurricanedata/fielddata.h" +// #include "hurricanedata/fielddata.h" +// #include "hurricanedata/gpubufferhandler.h" +#include "hurricanedata/datareader.h" #include "hurricanedata/gpubuffer.h" #include #include #include #include +#include #include // Not parallel computation -__global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd) { +// __global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd) { +// float sum = 0; +// size_t num_not_masked_values = 0; +// for (int i = 0; i < fmd.widthSize; i++) { +// double xi = getVal(fmd, fd, 2, 20, 100, i); +// if (xi < 1E14) { /* If x is not missing value */ +// num_not_masked_values++; +// sum += xi; +// } +// } +// *ans = sum/num_not_masked_values; +// } + +__global__ void computeMean(float *ans, DataHandle dh) { float sum = 0; size_t num_not_masked_values = 0; - for (int i = 0; i < fmd.widthSize; i++) { - double xi = getVal(fmd, fd, 2, 20, 100, i); + for (int i = 0; i < dh.size; i++) { + double xi = dh.d_data[i]; if (xi < 1E14) { /* If x is not missing value */ num_not_masked_values++; sum += xi; @@ -23,22 +39,37 @@ __global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd) } int main() { - std::string path = "data/MERRA2_400.inst6_3d_ana_Np.20120101.nc4"; + std::string path = "data"; + std::string variable = "T"; - GPUBuffer buffer{path, variable}; - auto fd = buffer.nextFieldData(); + // std::unique_ptr dataReader = std::make_unique(path, variable); + DataReader dataReader{path, variable}; - float *ptr_mean; - cudaMallocManaged(&ptr_mean, sizeof(float)); + std::cout << "created datareader\n"; - computeMean<<<1, 1>>>(ptr_mean, *buffer.fmd, fd); + GPUBuffer buffer (dataReader); - cudaDeviceSynchronize(); + std::cout << "created buffer\n"; - std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n"; + auto dataHandle = buffer.getDataHandle(0); - cudaFree(fd.valArrays[0]); - cudaFree(ptr_mean); + // std::cout << "got a data handle\n"; + + // GPUBufferHandler buffer{path, variable}; + + // auto fd = buffer.nextFieldData(); + + // float *ptr_mean; + // cudaMallocManaged(&ptr_mean, sizeof(float)); + + // computeMean<<<1, 1>>>(ptr_mean, dataHandle); + + // cudaDeviceSynchronize(); + + // std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n"; + + // // cudaFree(fd.valArrays[0]); + // cudaFree(ptr_mean); return 0; } diff --git a/test_read.py b/test_read.py index 8a2806b..2342990 100644 --- a/test_read.py +++ b/test_read.py @@ -24,12 +24,27 @@ print("Sum of U:", U_sum) sumval = 0 row = U[2,20,100] + +print("Shape of row", row.shape) +print("Row data for first 20 entries", row[0:5]) + +row = row[0:5] +print(f"{type(row)} {row.dtype}") + n = 0 for val in row: - if not np.ma.is_masked(val): - n+=1 - sumval += val -print(f"Why does {np.mean(row)=} not equal {sumval/n=} ?!") + #if not np.ma.is_masked(val): + n+=1 + sumval += np.float64(val) +Mean1 = np.mean(row) +Mean2 = sumval/n + +print(type(Mean1)) +print(type(Mean2)) +print(Mean1) +print(Mean2) + +print(f"Why does {np.mean(row):.10f} not equal {sumval/n:.10f} ?!") # Close the NetCDF file ncfile.close() \ No newline at end of file