From 109566d7b66cb2250cf772fd379974ef65a8eabc Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 23 Dec 2024 23:42:51 +0100 Subject: [PATCH] working metadata reading --- src/hurricanedata/datareader.cu | 44 +++++--- src/hurricanedata/datareader.h | 8 ++ src/hurricanedata/gpubuffer.cu | 161 +++++++++++++++++++++------ src/hurricanedata/gpubuffer.h | 5 +- src/hurricanedata/gpubufferhandler.h | 11 +- src/main.cu | 21 ++-- 6 files changed, 186 insertions(+), 64 deletions(-) diff --git a/src/hurricanedata/datareader.cu b/src/hurricanedata/datareader.cu index 6e26003..70e727d 100644 --- a/src/hurricanedata/datareader.cu +++ b/src/hurricanedata/datareader.cu @@ -1,7 +1,7 @@ #include "datareader.h" #include -#include +#include using namespace std; using namespace netCDF; @@ -10,8 +10,6 @@ DataReader::DataReader(const std::string &path, std::string variableName): filePathManager(path), variableName(variableName) { } size_t DataReader::fileLength(size_t fileIndex) { - cout << "filePathMan = " << filePathManager.getPath(fileIndex) << " bla = " << fileIndex << "\n"; - NcFile data(filePathManager.getPath(fileIndex), NcFile::read); multimap vars = data.getVars(); @@ -23,24 +21,42 @@ size_t DataReader::fileLength(size_t fileIndex) { length *= dim.getSize(); } - // TODO: Turns out c-NetCDF is not thread safe :( https://github.com/Unidata/netcdf-c/issues/1373 - // size_t length = 34933248; - std::cout << "return length" << length << "\n"; return length; } +size_t DataReader::axisLength(size_t fileIndex, const std::string& axisName) { + NcFile data(filePathManager.getPath(fileIndex), NcFile::read); + + multimap vars = data.getVars(); + + NcVar var = vars.find(axisName)->second; + + assert(var.getDimCount() == 1); + + netCDF::NcDim dim = var.getDim(0); + return dim.getSize(); +} + template void DataReader::loadFile(T* dataOut, size_t fileIndex) { - std::cout << "loading file" << fileIndex <<"\n"; - NcFile data(filePathManager.getPath(fileIndex), NcFile::read); - - // multimap vars = data.getVars(); - - // NcVar var = vars.find(variableName)->second; - - // var.getVar(dataOut); + loadFile(dataOut, fileIndex, variableName); } +template +void DataReader::loadFile(T* dataOut, size_t fileIndex, const string& variableName) { + NcFile data(filePathManager.getPath(fileIndex), NcFile::read); + + multimap vars = data.getVars(); + + NcVar var = vars.find(variableName)->second; + + var.getVar(dataOut); +} + +template void DataReader::loadFile(float* dataOut, size_t fileIndex, const string& variableName); +template void DataReader::loadFile(int* dataOut, size_t fileIndex, const string& variableName); +template void DataReader::loadFile(double* dataOut, size_t fileIndex, const string& variableName); +template void DataReader::loadFile(double* dataOut, size_t fileIndex); template void DataReader::loadFile(float* dataOut, size_t fileIndex); template void DataReader::loadFile(int* dataOut, size_t fileIndex); diff --git a/src/hurricanedata/datareader.h b/src/hurricanedata/datareader.h index eee9605..c193905 100644 --- a/src/hurricanedata/datareader.h +++ b/src/hurricanedata/datareader.h @@ -16,6 +16,14 @@ public: template void loadFile(T* dataOut, size_t fileIndex); + template + void loadFile(T* dataOut, size_t fileIndex, const std::string& variableName); + + size_t axisLength(size_t fileIndex, const std::string& axisName); + + // template + // void readAndAllocateAxis(T** axis_ptr, size_t *size, NcVar var) { + ~DataReader(); private: FilePathManager filePathManager; diff --git a/src/hurricanedata/gpubuffer.cu b/src/hurricanedata/gpubuffer.cu index e9a8f40..753f301 100644 --- a/src/hurricanedata/gpubuffer.cu +++ b/src/hurricanedata/gpubuffer.cu @@ -6,9 +6,8 @@ #include #include #include - -#include -using namespace netCDF; +#include +#include using namespace std; @@ -26,17 +25,41 @@ struct LoadFileJob { size_t bufferIndex; }; +struct GetSizeJob { + size_t fileIndex; + promise size; +}; + +struct GetAxisDoubleJob { + size_t fileIndex; + string axisName; + promise> axis; +}; + +struct GetAxisIntJob { + size_t fileIndex; + string axisName; + promise> axis; +}; + +using Job = variant; + class GPUBuffer::impl { public: impl(DataReader& dataReader); void loadFile(LoadFileJob job); + void loadFile(size_t fileIndex, size_t bufferIndex); + void getSize(GetSizeJob job); + size_t getSize(size_t fileIndex); // Most probably blocking + void getAxis(GetAxisDoubleJob job); + void getAxis(GetAxisIntJob job); ~impl(); File buffer[numBufferedFiles]; // Thread worker things void worker(); - queue jobs; + queue jobs; unique_ptr ioworker; cudaStream_t iostream; DataReader& dataReader; @@ -47,10 +70,59 @@ public: GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique(dataReader)) { } +size_t GPUBuffer::impl::getSize(size_t fileIndex) { + promise promise; + future future = promise.get_future(); + { + lock_guard lk(queuecv_m); + jobs.push(GetSizeJob{fileIndex, move(promise)}); + } + queuecv.notify_all(); + + future.wait(); + + return future.get(); +} + +template <> +pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName) { + promise> promise; + future> future = promise.get_future(); + { + lock_guard lk(pImpl->queuecv_m); + pImpl->jobs.push(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); + } + pImpl->queuecv.notify_all(); + + future.wait(); + + return future.get(); +} +template pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName); + +template <> +pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName) { + promise> promise; + future> future = promise.get_future(); + { + lock_guard lk(pImpl->queuecv_m); + pImpl->jobs.push(GetAxisIntJob{fileIndex, axisName, move(promise)}); + } + pImpl->queuecv.notify_all(); + + future.wait(); + + return future.get(); +} +template pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName); + GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { cudaStreamCreate(&iostream); - // size_t size = dataReader.fileLength(0); - size_t size = 5; + + ioworker = make_unique([this]() { worker(); }); + + size_t size = getSize(0); + cout << "size = " << size << "\n"; for (size_t i = 0; i < numBufferedFiles; i++) { { @@ -63,29 +135,18 @@ GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { file.size = size; file.valid = false; } + loadFile(i, i); { - lock_guard lk(queuecv_m); - LoadFileJob job = { - .fileIndex = i, - .bufferIndex = i - }; - jobs.push(job); + // lock_guard lk(queuecv_m); + // LoadFileJob job = { + // .fileIndex = i, + // .bufferIndex = i + // }; + // cout << "enqueue file load job\n"; + // jobs.push(job); + } - } - auto t = thread([]() { - NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read); - multimap vars = data.getVars(); - }); - t.join(); - - auto tt = thread([]() { - NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read); - multimap vars = data.getVars(); - }); - tt.join(); - - ioworker = make_unique([this]() { worker(); }); } GPUBuffer::~GPUBuffer() { } @@ -120,7 +181,32 @@ void GPUBuffer::impl::loadFile(LoadFileJob job) { file.cv.notify_all(); } +void GPUBuffer::impl::getSize(GetSizeJob job) { + size_t size = dataReader.fileLength(job.fileIndex); + job.size.set_value(size); +} + +void GPUBuffer::impl::getAxis(GetAxisDoubleJob job) { + pair array; + array.first = dataReader.axisLength(job.fileIndex, job.axisName); + cudaError_t status = cudaMallocManaged(&array.second, array.first*sizeof(double)); + dataReader.loadFile(array.second, job.fileIndex); + job.axis.set_value(array); +} + +void GPUBuffer::impl::getAxis(GetAxisIntJob job) { + pair array; + array.first = dataReader.axisLength(job.fileIndex, job.axisName); + cudaError_t status = cudaMallocManaged(&array.second, array.first*sizeof(int)); + dataReader.loadFile(array.second, job.fileIndex, job.axisName); + job.axis.set_value(array); +} + void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) { + pImpl->loadFile(fileIndex, bufferIndex); +} + +void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) { LoadFileJob job = { .fileIndex = fileIndex, .bufferIndex = bufferIndex @@ -129,7 +215,7 @@ void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) { // Main thread theoretically blocks on 2 mutexes here // but it _should_ never have to wait for them. { - File &file = pImpl->buffer[bufferIndex]; + File &file = buffer[bufferIndex]; std::unique_lock lk(file.m, std::defer_lock); bool lockval = lk.try_lock(); @@ -140,20 +226,20 @@ void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) { file.valid = false; } { - std::unique_lock lk(pImpl->queuecv_m, std::defer_lock); + std::unique_lock lk(queuecv_m, std::defer_lock); bool lockval = lk.try_lock(); if (!lockval) { cout << "waited on IOworker queue during loadFile orchestration :(\n"; lk.lock(); } - pImpl->jobs.push(job); + jobs.push(job); } - pImpl->queuecv.notify_all(); + queuecv.notify_all(); } void GPUBuffer::impl::worker() { while(workerRunning) { - LoadFileJob job; + Job job; { unique_lock lk(queuecv_m); queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); }); @@ -161,10 +247,19 @@ void GPUBuffer::impl::worker() { return; } - job = jobs.front(); + job = move(jobs.front()); jobs.pop(); } - loadFile(job); + if(holds_alternative(job)) { + loadFile(get(job)); + } else if(holds_alternative(job)) { + getSize(move(get(job))); + } else if(holds_alternative(job)) { + getAxis(move(get(job))); + } else if(holds_alternative(job)) { + getAxis(move(get(job))); + } + } } diff --git a/src/hurricanedata/gpubuffer.h b/src/hurricanedata/gpubuffer.h index 4b1b5b3..b014cd7 100644 --- a/src/hurricanedata/gpubuffer.h +++ b/src/hurricanedata/gpubuffer.h @@ -18,10 +18,13 @@ public: GPUBuffer(DataReader& dataReader); - void loadFile(size_t fileIndex, size_t bufferIndex); // Async call + void loadFile(size_t fileIndex, size_t bufferIndex); // No blocking DataHandle getDataHandle(size_t bufferIndex); // Potentially blocking + template + std::pair getAxis(size_t fileIndex, const std::string& axisName); // Most probably blocking + ~GPUBuffer(); private: class impl; diff --git a/src/hurricanedata/gpubufferhandler.h b/src/hurricanedata/gpubufferhandler.h index 259562e..75863ff 100644 --- a/src/hurricanedata/gpubufferhandler.h +++ b/src/hurricanedata/gpubufferhandler.h @@ -2,13 +2,13 @@ #define GPUBUFFERHANDLER_H #include "fielddata.h" -#include "filepathmanager.h" +#include "gpubuffer.h" #include class GPUBufferHandler { public: - GPUBufferHandler(const std::string &path, std::string variableName); + GPUBufferHandler(); FieldData nextFieldData(); @@ -17,12 +17,7 @@ public: FieldMetadata *fmd; private: - // TODO: Implement GPUBuffer - FilePathManager filePathManager; - std::string variableName; - - size_t presentTimeIndex; - size_t fileIndex; + GPUBuffer }; #endif //GPUBUFFERHANDLER_H diff --git a/src/main.cu b/src/main.cu index 09e5ce5..5072a15 100644 --- a/src/main.cu +++ b/src/main.cu @@ -54,22 +54,27 @@ int main() { auto dataHandle = buffer.getDataHandle(0); - // std::cout << "got a data handle\n"; + std::cout << "got a data handle\n"; + + auto x = buffer.getAxis(0, "time"); + std::cout << "size of x=" << x.first << "\n"; + std::cout << "x[1]= " <>>(ptr_mean, dataHandle); + computeMean<<<1, 1>>>(ptr_mean, dataHandle); - // cudaDeviceSynchronize(); + cudaDeviceSynchronize(); - // std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n"; + std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n"; - // // cudaFree(fd.valArrays[0]); - // cudaFree(ptr_mean); + // cudaFree(fd.valArrays[0]); + cudaFree(ptr_mean); return 0; }