From be0659c06fbe726738101eb4856c1f645918efa2 Mon Sep 17 00:00:00 2001 From: Robin Date: Wed, 8 Jan 2025 09:49:29 +0100 Subject: [PATCH] buffer multiple files --- src/hurricanedata/gpubuffer.cu | 143 +++++++++++---------------- src/hurricanedata/gpubufferhandler.h | 2 - src/hurricanedata/worker.cpp | 49 +++++++++ src/hurricanedata/worker.h | 27 +++++ src/main.cu | 26 +++-- 5 files changed, 149 insertions(+), 98 deletions(-) create mode 100644 src/hurricanedata/worker.cpp create mode 100644 src/hurricanedata/worker.h diff --git a/src/hurricanedata/gpubuffer.cu b/src/hurricanedata/gpubuffer.cu index 7edac38..b14bce1 100644 --- a/src/hurricanedata/gpubuffer.cu +++ b/src/hurricanedata/gpubuffer.cu @@ -1,4 +1,5 @@ #include "gpubuffer.h" +#include "worker.h" #include #include @@ -20,11 +21,6 @@ struct File { float *d_data; // device data }; -struct LoadFileJob { - size_t fileIndex; - size_t bufferIndex; -}; - struct GetSizeJob { size_t fileIndex; promise size; @@ -42,12 +38,10 @@ struct GetAxisIntJob { promise> axis; }; -using Job = variant; - class GPUBuffer::impl { public: impl(DataReader& dataReader); - void loadFile(LoadFileJob job); + void enqueueLoadFileJob(size_t fileIndex, size_t bufferIndex); void loadFile(size_t fileIndex, size_t bufferIndex); void getSize(GetSizeJob job); size_t getSize(size_t fileIndex); // Most probably blocking @@ -61,26 +55,30 @@ public: File buffer[numBufferedFiles]; // Thread worker things - void worker(); - queue jobs; - unique_ptr ioworker; - cudaStream_t iostream; + cudaStream_t iostream; // TODO: Make this static? DataReader& dataReader; - condition_variable queuecv; - mutex queuecv_m; - bool workerRunning = true; + static Worker worker; }; +Worker GPUBuffer::impl::worker; + GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique(dataReader)) { } size_t GPUBuffer::impl::getSize(size_t fileIndex) { promise promise; future future = promise.get_future(); - { - lock_guard lk(queuecv_m); - jobs.push(GetSizeJob{fileIndex, move(promise)}); - } - queuecv.notify_all(); + + // worker.enqueueJob(std::make_unique>( + // [this, fileIndex, promise = move(promise)]() mutable { + // getSize(GetSizeJob{fileIndex, move(promise)}); + // }) + // ); + auto task = std::packaged_task( + [this, fileIndex, promise = move(promise)]() mutable { + getSize(GetSizeJob{fileIndex, move(promise)}); + } + ); + worker.enqueueJob(move(task)); future.wait(); @@ -98,11 +96,20 @@ template <> pair GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) { promise> promise; future> future = promise.get_future(); - { - lock_guard lk(queuecv_m); - jobs.push(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); - } - queuecv.notify_all(); + + + // worker.enqueueJob(std::make_unique>( + // [this, fileIndex, axisName, promise = move(promise)]() mutable { + // getAxis(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); + // }) + // ); + // worker.enqueueJob([this, fileIndex, axisName, promise = move(promise)]() mutable { getAxis(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); }); + auto task = std::packaged_task( + [this, fileIndex, axisName, promise = move(promise)]() mutable { + getAxis(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); + } + ); + worker.enqueueJob(move(task)); future.wait(); @@ -114,11 +121,12 @@ template <> pair GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) { promise> promise; future> future = promise.get_future(); - { - lock_guard lk(queuecv_m); - jobs.push(GetAxisIntJob{fileIndex, axisName, move(promise)}); - } - queuecv.notify_all(); + auto task = std::packaged_task( + [this, fileIndex, axisName, promise = move(promise)]() mutable { + getAxis(GetAxisIntJob{fileIndex, axisName, move(promise)}); + } + ); + worker.enqueueJob(move(task)); future.wait(); @@ -129,8 +137,6 @@ template pair GPUBuffer::impl::getAxis(size_t fileIndex, con GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { cudaStreamCreate(&iostream); - ioworker = make_unique([this]() { worker(); }); - size_t size = getSize(0); auto x = getAxis(0, "time"); size_t sizeTime = x.first; @@ -144,7 +150,6 @@ GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { file.size = size; file.valid = false; } - // loadFile(i, i); } } @@ -152,30 +157,26 @@ GPUBuffer::~GPUBuffer() { } GPUBuffer::impl::~impl() { - { - lock_guard lk(queuecv_m); - workerRunning = false; - } - queuecv.notify_all(); - ioworker->join(); + worker.stop(); for (size_t i = 0; i < numBufferedFiles; i++) { File &file = buffer[i]; cudaFree(file.d_data); + cudaFreeHost(file.h_data); } cudaStreamDestroy(iostream); } -void GPUBuffer::impl::loadFile(LoadFileJob job) { - File &file = buffer[job.bufferIndex]; +void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) { + File &file = buffer[bufferIndex]; { lock_guard lk(file.m); + cout << "loading file with index " << fileIndex << "\n"; assert(!file.valid); - cout << "loading file with index " << job.fileIndex << "\n"; - dataReader.loadFile(file.h_data, job.fileIndex); + dataReader.loadFile(file.h_data, fileIndex); cudaMemcpyAsync(file.d_data, file.h_data, sizeof(float)*file.size, cudaMemcpyHostToDevice, iostream); cudaStreamSynchronize(iostream); - buffer[job.bufferIndex].valid = true; + buffer[bufferIndex].valid = true; } file.cv.notify_all(); } @@ -202,17 +203,10 @@ void GPUBuffer::impl::getAxis(GetAxisIntJob job) { } void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) { - pImpl->loadFile(fileIndex, bufferIndex); + pImpl->enqueueLoadFileJob(fileIndex, bufferIndex); } -void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) { - LoadFileJob job = { - .fileIndex = fileIndex, - .bufferIndex = bufferIndex - }; - - // Main thread theoretically blocks on 2 mutexes here - // but it _should_ never have to wait for them. +void GPUBuffer::impl::enqueueLoadFileJob(size_t fileIndex, size_t bufferIndex) { { File &file = buffer[bufferIndex]; @@ -224,42 +218,15 @@ void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) { } file.valid = false; } - { - std::unique_lock lk(queuecv_m, std::defer_lock); - bool lockval = lk.try_lock(); - if (!lockval) { - cout << "waited on IOworker queue during loadFile orchestration :(\n"; - lk.lock(); - } - jobs.push(job); - } - queuecv.notify_all(); -} -void GPUBuffer::impl::worker() { - while(workerRunning) { - Job job; - { - unique_lock lk(queuecv_m); - queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); }); - if (!workerRunning) { - return; - } - - job = move(jobs.front()); - jobs.pop(); - } - if(holds_alternative(job)) { - loadFile(get(job)); - } else if(holds_alternative(job)) { - getSize(move(get(job))); - } else if(holds_alternative(job)) { - getAxis(move(get(job))); - } else if(holds_alternative(job)) { - getAxis(move(get(job))); - } - - } + auto task = std::packaged_task(bind( + &GPUBuffer::impl::impl::loadFile, + this, + fileIndex, + bufferIndex + )); + worker.enqueueJob(move(task)); + // worker.enqueueJob([this, fileIndex, bufferIndex](){ loadFile(fileIndex, bufferIndex); }); } DataHandle GPUBuffer::getDataHandle(size_t bufferIndex) { diff --git a/src/hurricanedata/gpubufferhandler.h b/src/hurricanedata/gpubufferhandler.h index 2b4338b..e02ae91 100644 --- a/src/hurricanedata/gpubufferhandler.h +++ b/src/hurricanedata/gpubufferhandler.h @@ -48,8 +48,6 @@ public: */ FieldMetadata *fmd; - static void freeFieldData(); - private: FieldData setupField(size_t endBufferInd); GPUBuffer& gpuBuffer; diff --git a/src/hurricanedata/worker.cpp b/src/hurricanedata/worker.cpp new file mode 100644 index 0000000..e5d2f18 --- /dev/null +++ b/src/hurricanedata/worker.cpp @@ -0,0 +1,49 @@ +#include "worker.h" +#include + +using namespace std; + +Worker::Worker(): workerRunning(false) { + ioworker = thread([this]() { jobRunner(); }); +} + +void Worker::enqueueJob(Job job) { + { + lock_guard lk(queuecv_m); + jobs.push(move(job)); + } + queuecv.notify_all(); +} + +void Worker::jobRunner() { + workerRunning = true; + while(workerRunning) { + Job job; + { + unique_lock lk(queuecv_m); + queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); }); + if (!workerRunning) { + return; + } + + job = move(jobs.front()); + jobs.pop(); + } + + job(); + } +} + +void Worker::stop() { + if (workerRunning == false) return; + { + lock_guard lk(queuecv_m); + workerRunning = false; + } + queuecv.notify_all(); + ioworker.join(); +} + +Worker::~Worker() { + stop(); +} \ No newline at end of file diff --git a/src/hurricanedata/worker.h b/src/hurricanedata/worker.h new file mode 100644 index 0000000..edb6dd1 --- /dev/null +++ b/src/hurricanedata/worker.h @@ -0,0 +1,27 @@ +#ifndef WORKER_H +#define WORKER_H + +#include +#include +#include +#include +#include +#include + +class Worker { +public: + using Job = std::packaged_task; + Worker(); + void enqueueJob(Job job); + void stop(); + ~Worker(); +private: + void jobRunner(); + bool workerRunning; + std::queue jobs; + std::condition_variable queuecv; + std::mutex queuecv_m; + std::thread ioworker; +}; + +#endif //WORKER_H diff --git a/src/main.cu b/src/main.cu index ddc9569..95cbe83 100644 --- a/src/main.cu +++ b/src/main.cu @@ -19,29 +19,39 @@ __global__ void middleOfTwoValues(float *ans, const FieldMetadata &fmd, FieldDat int main() { std::string path = "data/atmosphere_MERRA-wind-speed[179253532]"; - std::string variable = "T"; + DataReader dataReaderU{path, "U"}; - DataReader dataReader{path, variable}; + DataReader dataReaderV{path, "V"}; std::cout << "created datareader\n"; - GPUBuffer buffer (dataReader); + GPUBuffer bufferU (dataReaderU); + + GPUBuffer bufferV (dataReaderV); std::cout << "created buffer\n"; - GPUBufferHandler bufferHandler(buffer); + GPUBufferHandler bufferHandlerU(bufferU); + + GPUBufferHandler bufferHandlerV(bufferV); float *ptr_test_read; cudaMallocManaged(&ptr_test_read, sizeof(float)); std::cout << "created buffer handler\n"; - for (int i = 0; i < 10; i++) { - FieldData fd = bufferHandler.nextFieldData(); + for (int i = 0; i < 20; i++) { + FieldData fdU = bufferHandlerU.nextFieldData(); + FieldData fdV = bufferHandlerV.nextFieldData(); - middleOfTwoValues<<<1, 1>>>(ptr_test_read, *bufferHandler.fmd, fd); + middleOfTwoValues<<<1, 1>>>(ptr_test_read, *bufferHandlerU.fmd, fdU); cudaDeviceSynchronize(); - std::cout << "ptr_test_read = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n"; + std::cout << "ptr_test_read U = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n"; + + middleOfTwoValues<<<1, 1>>>(ptr_test_read, *bufferHandlerV.fmd, fdV); + + cudaDeviceSynchronize(); + std::cout << "ptr_test_read V = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n"; } // TODO: measure data transfer time in this example code.