From c4a2ce1b44b86ba8e2d8612b1013edfe4100abcc Mon Sep 17 00:00:00 2001 From: Robin Date: Thu, 26 Dec 2024 12:33:07 +0100 Subject: [PATCH] working buffering --- Makefile | 2 +- src/hurricanedata/datareader.cu | 5 +- src/hurricanedata/datareader.h | 3 - src/hurricanedata/fielddata.cu | 6 +- src/hurricanedata/fielddata.h | 17 ++-- src/hurricanedata/gpubuffer.cu | 86 ++++++++++--------- src/hurricanedata/gpubuffer.h | 4 +- src/hurricanedata/gpubufferhandler.cu | 117 +++++++++++++++++--------- src/hurricanedata/gpubufferhandler.h | 11 ++- src/hurricanedata/gpubufferhelper.h | 14 --- src/main.cu | 58 ++++--------- test_read.py | 1 + test_read2.py | 18 ++++ 13 files changed, 187 insertions(+), 155 deletions(-) delete mode 100644 src/hurricanedata/gpubufferhelper.h create mode 100644 test_read2.py diff --git a/Makefile b/Makefile index b728fc5..ad3faba 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Compiler and flags NVCC = nvcc -CXXFLAGS = -I./src -I./hurricanedata -std=c++17 $(shell nc-config --cxx4flags) $(shell nc-config --cxx4libs) +CXXFLAGS = -I./src -I./hurricanedata -std=c++17 $(shell nc-config --cxx4flags) $(shell nc-config --cxx4libs) -g -G COMPILE_OBJ_FLAGS = --device-c # Directories diff --git a/src/hurricanedata/datareader.cu b/src/hurricanedata/datareader.cu index 70e727d..50619ba 100644 --- a/src/hurricanedata/datareader.cu +++ b/src/hurricanedata/datareader.cu @@ -43,12 +43,13 @@ void DataReader::loadFile(T* dataOut, size_t fileIndex) { } template -void DataReader::loadFile(T* dataOut, size_t fileIndex, const string& variableName) { +void DataReader::loadFile(T* dataOut, size_t fileIndex, const string& varName) { NcFile data(filePathManager.getPath(fileIndex), NcFile::read); multimap vars = data.getVars(); - NcVar var = vars.find(variableName)->second; + NcVar var = vars.find(varName)->second; + cout << "var = " << varName << "with size = " << var.getDim(0).getSize() << "\n"; var.getVar(dataOut); } diff --git a/src/hurricanedata/datareader.h b/src/hurricanedata/datareader.h index c193905..209132f 100644 --- a/src/hurricanedata/datareader.h +++ b/src/hurricanedata/datareader.h @@ -21,9 +21,6 @@ public: size_t axisLength(size_t fileIndex, const std::string& axisName); - // template - // void readAndAllocateAxis(T** axis_ptr, size_t *size, NcVar var) { - ~DataReader(); private: FilePathManager filePathManager; diff --git a/src/hurricanedata/fielddata.cu b/src/hurricanedata/fielddata.cu index c53adc4..38b0d25 100644 --- a/src/hurricanedata/fielddata.cu +++ b/src/hurricanedata/fielddata.cu @@ -8,10 +8,12 @@ __device__ float getVal( const size_t &latInd, const size_t &lonInd ) { + size_t valArrInd = (d.fieldInd + timeInd) / md.numberOfTimeStepsPerFile; + size_t trueTimeInd = (d.fieldInd + timeInd) % md.numberOfTimeStepsPerFile; size_t sizeSpatialData = md.widthSize*md.heightSize*md.depthSize; size_t size2DMapData = md.widthSize*md.heightSize; - return d.valArrays[0][ - timeInd*sizeSpatialData + return d.valArrays[valArrInd][ + trueTimeInd*sizeSpatialData + levInd*size2DMapData + latInd*md.widthSize + lonInd diff --git a/src/hurricanedata/fielddata.h b/src/hurricanedata/fielddata.h index 4633784..e3073a3 100644 --- a/src/hurricanedata/fielddata.h +++ b/src/hurricanedata/fielddata.h @@ -14,6 +14,10 @@ struct FieldMetadata { double *lons; double *lats; double *levs; + + size_t timeSize; // Number of different times + + size_t numberOfTimeStepsPerFile; }; using FieldMetadata = FieldMetadata; @@ -21,16 +25,15 @@ using FieldMetadata = FieldMetadata; struct FieldData { static constexpr size_t FILESNUM = 2; // Number of files stored in a FieldData struct. - // An array of length FILESNUM storing pointers to 4D arrays stored in device memory. - float *valArrays[FILESNUM]; + // A uniform array of length FILESNUM storing pointers to 4D arrays stored in device memory. + float **valArrays; - size_t timeSize; // Number of different times - // times is a managed Unified Memory array of size timeSize that indicates - // that getVal(md, d, t, i, j, k) is a value at time times[t]. - int *times; + size_t fieldInd; + + // times is a managed Unified Memory array of size (FILESNUM, numberOfTimeStepsPerFile) + int **times; }; -using FieldData = FieldData; extern __device__ float getVal( const FieldMetadata &md, diff --git a/src/hurricanedata/gpubuffer.cu b/src/hurricanedata/gpubuffer.cu index 753f301..c0392ea 100644 --- a/src/hurricanedata/gpubuffer.cu +++ b/src/hurricanedata/gpubuffer.cu @@ -18,6 +18,8 @@ struct File { size_t size; float *h_data; // host data float *d_data; // device data + int *times; + size_t timeSize; }; struct LoadFileJob { @@ -53,6 +55,9 @@ public: size_t getSize(size_t fileIndex); // Most probably blocking void getAxis(GetAxisDoubleJob job); void getAxis(GetAxisIntJob job); + + template + std::pair getAxis(size_t fileIndex, const std::string& axisName); // Most probably blocking ~impl(); File buffer[numBufferedFiles]; @@ -84,37 +89,44 @@ size_t GPUBuffer::impl::getSize(size_t fileIndex) { return future.get(); } -template <> -pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName) { - promise> promise; - future> future = promise.get_future(); - { - lock_guard lk(pImpl->queuecv_m); - pImpl->jobs.push(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); - } - pImpl->queuecv.notify_all(); - - future.wait(); - - return future.get(); +template +pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName) { + return pImpl->getAxis(fileIndex, axisName); } +template pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName); template pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName); template <> -pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName) { - promise> promise; - future> future = promise.get_future(); +pair GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) { + promise> promise; + future> future = promise.get_future(); { - lock_guard lk(pImpl->queuecv_m); - pImpl->jobs.push(GetAxisIntJob{fileIndex, axisName, move(promise)}); + lock_guard lk(queuecv_m); + jobs.push(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); } - pImpl->queuecv.notify_all(); + queuecv.notify_all(); future.wait(); return future.get(); } -template pair GPUBuffer::getAxis(size_t fileIndex, const string& axisName); +template pair GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName); + +template <> +pair GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) { + promise> promise; + future> future = promise.get_future(); + { + lock_guard lk(queuecv_m); + jobs.push(GetAxisIntJob{fileIndex, axisName, move(promise)}); + } + queuecv.notify_all(); + + future.wait(); + + return future.get(); +} +template pair GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName); GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { cudaStreamCreate(&iostream); @@ -122,34 +134,26 @@ GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { ioworker = make_unique([this]() { worker(); }); size_t size = getSize(0); - cout << "size = " << size << "\n"; - + auto x = getAxis(0, "time"); + size_t sizeTime = x.first; + cudaFree(x.second); for (size_t i = 0; i < numBufferedFiles; i++) { { File &file = buffer[i]; lock_guard lk(file.m); cudaMallocHost(&file.h_data, sizeof(float)*size); - cudaError_t status = cudaMalloc(&file.d_data, sizeof(float)*size); - if (status != cudaSuccess) - cerr << "Error allocating device memory. Status code: " << status << "\n"; + cudaMalloc(&file.d_data, sizeof(float)*size); + cudaMallocManaged(&file.times, sizeof(int)*sizeTime); file.size = size; file.valid = false; + file.timeSize = sizeTime; } - loadFile(i, i); - { - // lock_guard lk(queuecv_m); - // LoadFileJob job = { - // .fileIndex = i, - // .bufferIndex = i - // }; - // cout << "enqueue file load job\n"; - // jobs.push(job); - - } + // loadFile(i, i); } } -GPUBuffer::~GPUBuffer() { } +GPUBuffer::~GPUBuffer() { +} GPUBuffer::impl::~impl() { { @@ -162,6 +166,7 @@ GPUBuffer::impl::~impl() { File &file = buffer[i]; cudaFree(file.d_data); cudaFree(file.h_data); + cudaFree(file.times); } cudaStreamDestroy(iostream); } @@ -172,11 +177,12 @@ void GPUBuffer::impl::loadFile(LoadFileJob job) { { lock_guard lk(file.m); assert(!file.valid); + dataReader.loadFile(file.times, job.fileIndex, "time"); // TODO: Times dont store anything useful :( + cout << "times[1] (inside inside) " << file.times[1] << " for file with fileindex = " << job.fileIndex << "\n"; dataReader.loadFile(file.h_data, job.fileIndex); cudaMemcpyAsync(file.d_data, file.h_data, sizeof(float)*file.size, cudaMemcpyHostToDevice, iostream); cudaStreamSynchronize(iostream); buffer[job.bufferIndex].valid = true; - cout << "loaded file with index" << job.bufferIndex << "\n"; } file.cv.notify_all(); } @@ -190,7 +196,7 @@ void GPUBuffer::impl::getAxis(GetAxisDoubleJob job) { pair array; array.first = dataReader.axisLength(job.fileIndex, job.axisName); cudaError_t status = cudaMallocManaged(&array.second, array.first*sizeof(double)); - dataReader.loadFile(array.second, job.fileIndex); + dataReader.loadFile(array.second, job.fileIndex, job.axisName); job.axis.set_value(array); } @@ -272,6 +278,8 @@ DataHandle GPUBuffer::getDataHandle(size_t bufferIndex) { DataHandle dh = { .d_data = file.d_data, + .times = file.times, + .timeSize = file.timeSize, .size = file.size }; return dh; diff --git a/src/hurricanedata/gpubuffer.h b/src/hurricanedata/gpubuffer.h index b014cd7..46d031d 100644 --- a/src/hurricanedata/gpubuffer.h +++ b/src/hurricanedata/gpubuffer.h @@ -8,7 +8,9 @@ #include "datareader.h" struct DataHandle { - float *d_data; + float *d_data; // Device memory + int* times; // Uniform memory + size_t timeSize; size_t size; }; diff --git a/src/hurricanedata/gpubufferhandler.cu b/src/hurricanedata/gpubufferhandler.cu index f01f8c2..9eb7eb7 100644 --- a/src/hurricanedata/gpubufferhandler.cu +++ b/src/hurricanedata/gpubufferhandler.cu @@ -1,60 +1,93 @@ #include "gpubufferhandler.h" #include "fielddata.h" -#include "gpubufferhelper.h" -#include +#include using namespace std; -using namespace netCDF; - -GPUBufferHandler::GPUBufferHandler(const std::string &path, std::string variableName): -filePathManager(path), variableName(variableName), presentTimeIndex(0), fileIndex(0) { - NcFile data(filePathManager.getPath(fileIndex), NcFile::read); - - multimap vars = data.getVars(); +GPUBufferHandler::GPUBufferHandler(GPUBuffer& gpuBuffer): +gpuBuffer(gpuBuffer), fieldInd(0), bufferInd(0), fileInd(0) { cudaMallocManaged(&fmd, sizeof(FieldMetadata)); - readAndAllocateAxis(&fmd->lons, &fmd->widthSize, vars.find("lon")->second); - readAndAllocateAxis(&fmd->lats, &fmd->heightSize, vars.find("lat")->second); - readAndAllocateAxis(&fmd->levs, &fmd->depthSize, vars.find("lev")->second); + auto [widthSize, lons] = gpuBuffer.getAxis(0, "lon"); + fmd->widthSize = widthSize; + fmd->lons = lons; + + auto [heightSize, lats] = gpuBuffer.getAxis(0, "lat"); + fmd->heightSize = heightSize; + fmd->lats = lats; + + auto [depthSize, levs] = gpuBuffer.getAxis(0, "lev"); + fmd->depthSize = depthSize; + fmd->levs = levs; + + + for (size_t i = 0; i < GPUBuffer::numBufferedFiles; i++) { + gpuBuffer.loadFile(fileInd,fileInd); + fileInd++; + } + + fmd->numberOfTimeStepsPerFile = 4; // TODO: Maybe find a better way to do this. + fmd->timeSize = GPUBufferHandler::numberOfTimeStepsPerField; +} + +FieldData GPUBufferHandler::setupField(size_t newEndBufferInd) { + + FieldData fd; + cudaMallocManaged(&fd.valArrays, sizeof(sizeof(float *)*FieldData::FILESNUM)); + cudaMallocManaged(&fd.times, sizeof(sizeof(int *)*FieldData::FILESNUM)); + size_t fieldDataInd = 0; + cout << "getting field from files " << bufferInd << " to " << newEndBufferInd << "\n"; + for (int i = bufferInd; i <= newEndBufferInd; i++) { + cout << "getting handle for " << i << "\n"; + DataHandle x = gpuBuffer.getDataHandle(i); + fd.valArrays[fieldDataInd] = x.d_data; + fd.times[fieldDataInd] = x.times; + fieldDataInd++; + } + fd.fieldInd = fieldInd; + + return fd; } FieldData GPUBufferHandler::nextFieldData() { - NcFile data(filePathManager.getPath(fileIndex), NcFile::read); - - multimap vars = data.getVars(); - - FieldData fd; - size_t timeSize; - readAndAllocateAxis(&fd.times, &fd.timeSize, vars.find("time")->second); - - NcVar var = vars.find(variableName)->second; - int length = 1; - for (NcDim dim: var.getDims()) { - length *= dim.getSize(); + DataHandle x = gpuBuffer.getDataHandle(bufferInd); + size_t newFieldInd = (fieldInd + 1) % fmd->numberOfTimeStepsPerFile; + size_t newBufferInd = (bufferInd + ((fieldInd + 1) / fmd->numberOfTimeStepsPerFile)) % GPUBuffer::numBufferedFiles; + + size_t endFieldInd = (fieldInd + GPUBufferHandler::numberOfTimeStepsPerField - 1) % fmd->numberOfTimeStepsPerFile; + size_t endBufferInd = (bufferInd + (fieldInd + GPUBufferHandler::numberOfTimeStepsPerField - 1)/fmd->numberOfTimeStepsPerFile) % GPUBuffer::numBufferedFiles; + + size_t newEndFieldInd = (endFieldInd + 1) % fmd->numberOfTimeStepsPerFile; + size_t newEndBufferInd = (endBufferInd + ((endFieldInd + 1) / fmd->numberOfTimeStepsPerFile)) % GPUBuffer::numBufferedFiles; + + // size_t newBufferInd = (bufferInd + 1) % GPUBuffer::numBufferedFiles; + // size_t newFieldInd = (fieldInd + ((bufferInd + 1) / 4)) % x.timeSize; + + // size_t endBufferInd = (bufferInd + GPUBufferHandler::numberOfTimeStepsPerField) % GPUBuffer::numBufferedFiles; + // size_t endFieldInd = (fieldInd + ((bufferInd + GPUBufferHandler::numberOfTimeStepsPerField) / 4)) % x.timeSize; + + // size_t newEndBufferInd = (endBufferInd + 1) % GPUBuffer::numBufferedFiles; + // size_t newEndFieldInd = (endFieldInd + ((endBufferInd + 1) / 4)) % x.timeSize; + + if(firstTimeStep) { + firstTimeStep = false; + return setupField(endFieldInd); + } + + if (newBufferInd != bufferInd) { + fileInd++; + gpuBuffer.loadFile(fileInd, bufferInd); + bufferInd = newBufferInd; + fieldInd = newFieldInd; } - // Store NetCDF variable in pinned memory on host - float *h_array; + if (newEndBufferInd != endBufferInd) { + // maybe dont do things? + } - cudaMallocHost(&h_array, sizeof(float)*length); - - var.getVar(h_array); - - // Copy data to device - cudaError_t status = cudaMalloc(&fd.valArrays[0], sizeof(float)*length); - if (status != cudaSuccess) - cout << "Error allocating memory: " << status << "\n"; - - cudaMemcpyAsync(fd.valArrays[0], h_array, sizeof(float)*length, cudaMemcpyHostToDevice); - - cudaDeviceSynchronize(); // Heavy hammer synchronisation // TODO: Use streams - - cudaFreeHost(h_array); - - return fd; + return setupField(newEndBufferInd); } GPUBufferHandler::~GPUBufferHandler() { diff --git a/src/hurricanedata/gpubufferhandler.h b/src/hurricanedata/gpubufferhandler.h index 75863ff..9210eea 100644 --- a/src/hurricanedata/gpubufferhandler.h +++ b/src/hurricanedata/gpubufferhandler.h @@ -8,7 +8,7 @@ class GPUBufferHandler { public: - GPUBufferHandler(); + GPUBufferHandler(GPUBuffer& gpuBuffer); FieldData nextFieldData(); @@ -16,8 +16,15 @@ public: FieldMetadata *fmd; + static constexpr size_t numberOfTimeStepsPerField = 2; // TODO: Move this to fielddata + private: - GPUBuffer + FieldData setupField(size_t endBufferInd); + GPUBuffer& gpuBuffer; + size_t fileInd; + size_t bufferInd; + size_t fieldInd; + bool firstTimeStep = true; }; #endif //GPUBUFFERHANDLER_H diff --git a/src/hurricanedata/gpubufferhelper.h b/src/hurricanedata/gpubufferhelper.h deleted file mode 100644 index 91e7d8c..0000000 --- a/src/hurricanedata/gpubufferhelper.h +++ /dev/null @@ -1,14 +0,0 @@ -#include -#include - -using namespace std; -using namespace netCDF; - -template -void readAndAllocateAxis(T** axis_ptr, size_t *size, NcVar var) { - assert(var.getDimCount() == 1); - netCDF::NcDim dim = var.getDim(0); - *size = dim.getSize(); - cudaError_t status = cudaMallocManaged(axis_ptr, *size*sizeof(T)); - var.getVar(*axis_ptr); -} \ No newline at end of file diff --git a/src/main.cu b/src/main.cu index 5072a15..7eeadf9 100644 --- a/src/main.cu +++ b/src/main.cu @@ -1,5 +1,5 @@ // #include "hurricanedata/fielddata.h" -// #include "hurricanedata/gpubufferhandler.h" +#include "hurricanedata/gpubufferhandler.h" #include "hurricanedata/datareader.h" #include "hurricanedata/gpubuffer.h" @@ -10,32 +10,9 @@ #include #include - -// Not parallel computation -// __global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd) { -// float sum = 0; -// size_t num_not_masked_values = 0; -// for (int i = 0; i < fmd.widthSize; i++) { -// double xi = getVal(fmd, fd, 2, 20, 100, i); -// if (xi < 1E14) { /* If x is not missing value */ -// num_not_masked_values++; -// sum += xi; -// } -// } -// *ans = sum/num_not_masked_values; -// } - -__global__ void computeMean(float *ans, DataHandle dh) { - float sum = 0; - size_t num_not_masked_values = 0; - for (int i = 0; i < dh.size; i++) { - double xi = dh.d_data[i]; - if (xi < 1E14) { /* If x is not missing value */ - num_not_masked_values++; - sum += xi; - } - } - *ans = sum/num_not_masked_values; +__global__ void getSingleValue(float *ans, const FieldMetadata &fmd, FieldData fd) { + float xi = getVal(fmd, fd, 1, 20, 100, 100); + *ans = xi; } int main() { @@ -52,29 +29,26 @@ int main() { std::cout << "created buffer\n"; - auto dataHandle = buffer.getDataHandle(0); + GPUBufferHandler bufferHandler(buffer); - std::cout << "got a data handle\n"; + std::cout << "created buffer handler\n"; - auto x = buffer.getAxis(0, "time"); - std::cout << "size of x=" << x.first << "\n"; - std::cout << "x[1]= " <>>(ptr_mean, dataHandle); + getSingleValue<<<1, 1>>>(ptr_test_read, *bufferHandler.fmd, fd); cudaDeviceSynchronize(); - std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n"; + std::cout << "ptr_test_read = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n"; - // cudaFree(fd.valArrays[0]); - cudaFree(ptr_mean); + // TODO: Write a example loop using buffering and measure it. + + cudaFree(fd.valArrays[0]); // TODO: Free data properly in FieldData (maybe make an iterator) + cudaFree(ptr_test_read); return 0; } diff --git a/test_read.py b/test_read.py index 2342990..e37c4a3 100644 --- a/test_read.py +++ b/test_read.py @@ -46,5 +46,6 @@ print(Mean2) print(f"Why does {np.mean(row):.10f} not equal {sumval/n:.10f} ?!") + # Close the NetCDF file ncfile.close() \ No newline at end of file diff --git a/test_read2.py b/test_read2.py new file mode 100644 index 0000000..b137d14 --- /dev/null +++ b/test_read2.py @@ -0,0 +1,18 @@ +import numpy as np +from netCDF4 import Dataset + +# Load the NetCDF file +file_path = 'data/MERRA2_400.inst6_3d_ana_Np.20120101.nc4' +ncfile = Dataset(file_path, 'r') + +# Check the available variables in the file +print(ncfile.variables.keys()) + +Temp = ncfile.variables['T'][:] + +print(f"{Temp[1, 20, 100, 100]=}") +print(f"{Temp.flat[12949732]=}") + + +# Close the NetCDF file +ncfile.close() \ No newline at end of file