buffer multiple files

This commit is contained in:
Robin 2025-01-08 09:49:29 +01:00
parent dbebb6d9f2
commit be0659c06f
5 changed files with 149 additions and 98 deletions

View File

@ -1,4 +1,5 @@
#include "gpubuffer.h" #include "gpubuffer.h"
#include "worker.h"
#include <mutex> #include <mutex>
#include <thread> #include <thread>
@ -20,11 +21,6 @@ struct File {
float *d_data; // device data float *d_data; // device data
}; };
struct LoadFileJob {
size_t fileIndex;
size_t bufferIndex;
};
struct GetSizeJob { struct GetSizeJob {
size_t fileIndex; size_t fileIndex;
promise<size_t> size; promise<size_t> size;
@ -42,12 +38,10 @@ struct GetAxisIntJob {
promise<pair<size_t, int *>> axis; promise<pair<size_t, int *>> axis;
}; };
using Job = variant<LoadFileJob, GetSizeJob, GetAxisIntJob, GetAxisDoubleJob>;
class GPUBuffer::impl { class GPUBuffer::impl {
public: public:
impl(DataReader& dataReader); impl(DataReader& dataReader);
void loadFile(LoadFileJob job); void enqueueLoadFileJob(size_t fileIndex, size_t bufferIndex);
void loadFile(size_t fileIndex, size_t bufferIndex); void loadFile(size_t fileIndex, size_t bufferIndex);
void getSize(GetSizeJob job); void getSize(GetSizeJob job);
size_t getSize(size_t fileIndex); // Most probably blocking size_t getSize(size_t fileIndex); // Most probably blocking
@ -61,26 +55,30 @@ public:
File buffer[numBufferedFiles]; File buffer[numBufferedFiles];
// Thread worker things // Thread worker things
void worker(); cudaStream_t iostream; // TODO: Make this static?
queue<Job> jobs;
unique_ptr<thread> ioworker;
cudaStream_t iostream;
DataReader& dataReader; DataReader& dataReader;
condition_variable queuecv; static Worker worker;
mutex queuecv_m;
bool workerRunning = true;
}; };
Worker GPUBuffer::impl::worker;
GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique<impl>(dataReader)) { } GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique<impl>(dataReader)) { }
size_t GPUBuffer::impl::getSize(size_t fileIndex) { size_t GPUBuffer::impl::getSize(size_t fileIndex) {
promise<size_t> promise; promise<size_t> promise;
future<size_t> future = promise.get_future(); future<size_t> future = promise.get_future();
{
lock_guard<mutex> lk(queuecv_m); // worker.enqueueJob(std::make_unique<std::function<void()>>(
jobs.push(GetSizeJob{fileIndex, move(promise)}); // [this, fileIndex, promise = move(promise)]() mutable {
} // getSize(GetSizeJob{fileIndex, move(promise)});
queuecv.notify_all(); // })
// );
auto task = std::packaged_task<void()>(
[this, fileIndex, promise = move(promise)]() mutable {
getSize(GetSizeJob{fileIndex, move(promise)});
}
);
worker.enqueueJob(move(task));
future.wait(); future.wait();
@ -98,11 +96,20 @@ template <>
pair<size_t, double *> GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) { pair<size_t, double *> GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) {
promise<pair<size_t, double *>> promise; promise<pair<size_t, double *>> promise;
future<pair<size_t, double *>> future = promise.get_future(); future<pair<size_t, double *>> future = promise.get_future();
{
lock_guard<mutex> lk(queuecv_m);
jobs.push(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); // worker.enqueueJob(std::make_unique<std::function<void()>>(
} // [this, fileIndex, axisName, promise = move(promise)]() mutable {
queuecv.notify_all(); // getAxis(GetAxisDoubleJob{fileIndex, axisName, move(promise)});
// })
// );
// worker.enqueueJob([this, fileIndex, axisName, promise = move(promise)]() mutable { getAxis(GetAxisDoubleJob{fileIndex, axisName, move(promise)}); });
auto task = std::packaged_task<void()>(
[this, fileIndex, axisName, promise = move(promise)]() mutable {
getAxis(GetAxisDoubleJob{fileIndex, axisName, move(promise)});
}
);
worker.enqueueJob(move(task));
future.wait(); future.wait();
@ -114,11 +121,12 @@ template <>
pair<size_t, int *> GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) { pair<size_t, int *> GPUBuffer::impl::getAxis(size_t fileIndex, const string& axisName) {
promise<pair<size_t, int *>> promise; promise<pair<size_t, int *>> promise;
future<pair<size_t, int *>> future = promise.get_future(); future<pair<size_t, int *>> future = promise.get_future();
{ auto task = std::packaged_task<void()>(
lock_guard<mutex> lk(queuecv_m); [this, fileIndex, axisName, promise = move(promise)]() mutable {
jobs.push(GetAxisIntJob{fileIndex, axisName, move(promise)}); getAxis(GetAxisIntJob{fileIndex, axisName, move(promise)});
} }
queuecv.notify_all(); );
worker.enqueueJob(move(task));
future.wait(); future.wait();
@ -129,8 +137,6 @@ template pair<size_t, int *> GPUBuffer::impl::getAxis<int>(size_t fileIndex, con
GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) { GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) {
cudaStreamCreate(&iostream); cudaStreamCreate(&iostream);
ioworker = make_unique<thread>([this]() { worker(); });
size_t size = getSize(0); size_t size = getSize(0);
auto x = getAxis<int>(0, "time"); auto x = getAxis<int>(0, "time");
size_t sizeTime = x.first; size_t sizeTime = x.first;
@ -144,7 +150,6 @@ GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) {
file.size = size; file.size = size;
file.valid = false; file.valid = false;
} }
// loadFile(i, i);
} }
} }
@ -152,30 +157,26 @@ GPUBuffer::~GPUBuffer() {
} }
GPUBuffer::impl::~impl() { GPUBuffer::impl::~impl() {
{ worker.stop();
lock_guard<mutex> lk(queuecv_m);
workerRunning = false;
}
queuecv.notify_all();
ioworker->join();
for (size_t i = 0; i < numBufferedFiles; i++) { for (size_t i = 0; i < numBufferedFiles; i++) {
File &file = buffer[i]; File &file = buffer[i];
cudaFree(file.d_data); cudaFree(file.d_data);
cudaFreeHost(file.h_data); cudaFreeHost(file.h_data);
} }
cudaStreamDestroy(iostream); cudaStreamDestroy(iostream);
} }
void GPUBuffer::impl::loadFile(LoadFileJob job) { void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) {
File &file = buffer[job.bufferIndex]; File &file = buffer[bufferIndex];
{ {
lock_guard<mutex> lk(file.m); lock_guard<mutex> lk(file.m);
cout << "loading file with index " << fileIndex << "\n";
assert(!file.valid); assert(!file.valid);
cout << "loading file with index " << job.fileIndex << "\n"; dataReader.loadFile<float>(file.h_data, fileIndex);
dataReader.loadFile<float>(file.h_data, job.fileIndex);
cudaMemcpyAsync(file.d_data, file.h_data, sizeof(float)*file.size, cudaMemcpyHostToDevice, iostream); cudaMemcpyAsync(file.d_data, file.h_data, sizeof(float)*file.size, cudaMemcpyHostToDevice, iostream);
cudaStreamSynchronize(iostream); cudaStreamSynchronize(iostream);
buffer[job.bufferIndex].valid = true; buffer[bufferIndex].valid = true;
} }
file.cv.notify_all(); file.cv.notify_all();
} }
@ -202,17 +203,10 @@ void GPUBuffer::impl::getAxis(GetAxisIntJob job) {
} }
void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) { void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) {
pImpl->loadFile(fileIndex, bufferIndex); pImpl->enqueueLoadFileJob(fileIndex, bufferIndex);
} }
void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) { void GPUBuffer::impl::enqueueLoadFileJob(size_t fileIndex, size_t bufferIndex) {
LoadFileJob job = {
.fileIndex = fileIndex,
.bufferIndex = bufferIndex
};
// Main thread theoretically blocks on 2 mutexes here
// but it _should_ never have to wait for them.
{ {
File &file = buffer[bufferIndex]; File &file = buffer[bufferIndex];
@ -224,42 +218,15 @@ void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) {
} }
file.valid = false; file.valid = false;
} }
{
std::unique_lock<std::mutex> lk(queuecv_m, std::defer_lock);
bool lockval = lk.try_lock();
if (!lockval) {
cout << "waited on IOworker queue during loadFile orchestration :(\n";
lk.lock();
}
jobs.push(job);
}
queuecv.notify_all();
}
void GPUBuffer::impl::worker() { auto task = std::packaged_task<void()>(bind(
while(workerRunning) { &GPUBuffer::impl::impl::loadFile,
Job job; this,
{ fileIndex,
unique_lock<mutex> lk(queuecv_m); bufferIndex
queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); }); ));
if (!workerRunning) { worker.enqueueJob(move(task));
return; // worker.enqueueJob([this, fileIndex, bufferIndex](){ loadFile(fileIndex, bufferIndex); });
}
job = move(jobs.front());
jobs.pop();
}
if(holds_alternative<LoadFileJob>(job)) {
loadFile(get<LoadFileJob>(job));
} else if(holds_alternative<GetSizeJob>(job)) {
getSize(move(get<GetSizeJob>(job)));
} else if(holds_alternative<GetAxisDoubleJob>(job)) {
getAxis(move(get<GetAxisDoubleJob>(job)));
} else if(holds_alternative<GetAxisIntJob>(job)) {
getAxis(move(get<GetAxisIntJob>(job)));
}
}
} }
DataHandle GPUBuffer::getDataHandle(size_t bufferIndex) { DataHandle GPUBuffer::getDataHandle(size_t bufferIndex) {

View File

@ -48,8 +48,6 @@ public:
*/ */
FieldMetadata *fmd; FieldMetadata *fmd;
static void freeFieldData();
private: private:
FieldData setupField(size_t endBufferInd); FieldData setupField(size_t endBufferInd);
GPUBuffer& gpuBuffer; GPUBuffer& gpuBuffer;

View File

@ -0,0 +1,49 @@
#include "worker.h"
#include <iostream>
using namespace std;
Worker::Worker(): workerRunning(false) {
ioworker = thread([this]() { jobRunner(); });
}
void Worker::enqueueJob(Job job) {
{
lock_guard<mutex> lk(queuecv_m);
jobs.push(move(job));
}
queuecv.notify_all();
}
void Worker::jobRunner() {
workerRunning = true;
while(workerRunning) {
Job job;
{
unique_lock<mutex> lk(queuecv_m);
queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); });
if (!workerRunning) {
return;
}
job = move(jobs.front());
jobs.pop();
}
job();
}
}
void Worker::stop() {
if (workerRunning == false) return;
{
lock_guard<mutex> lk(queuecv_m);
workerRunning = false;
}
queuecv.notify_all();
ioworker.join();
}
Worker::~Worker() {
stop();
}

View File

@ -0,0 +1,27 @@
#ifndef WORKER_H
#define WORKER_H
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
class Worker {
public:
using Job = std::packaged_task<void()>;
Worker();
void enqueueJob(Job job);
void stop();
~Worker();
private:
void jobRunner();
bool workerRunning;
std::queue<Job> jobs;
std::condition_variable queuecv;
std::mutex queuecv_m;
std::thread ioworker;
};
#endif //WORKER_H

View File

@ -19,29 +19,39 @@ __global__ void middleOfTwoValues(float *ans, const FieldMetadata &fmd, FieldDat
int main() { int main() {
std::string path = "data/atmosphere_MERRA-wind-speed[179253532]"; std::string path = "data/atmosphere_MERRA-wind-speed[179253532]";
std::string variable = "T"; DataReader dataReaderU{path, "U"};
DataReader dataReader{path, variable}; DataReader dataReaderV{path, "V"};
std::cout << "created datareader\n"; std::cout << "created datareader\n";
GPUBuffer buffer (dataReader); GPUBuffer bufferU (dataReaderU);
GPUBuffer bufferV (dataReaderV);
std::cout << "created buffer\n"; std::cout << "created buffer\n";
GPUBufferHandler bufferHandler(buffer); GPUBufferHandler bufferHandlerU(bufferU);
GPUBufferHandler bufferHandlerV(bufferV);
float *ptr_test_read; float *ptr_test_read;
cudaMallocManaged(&ptr_test_read, sizeof(float)); cudaMallocManaged(&ptr_test_read, sizeof(float));
std::cout << "created buffer handler\n"; std::cout << "created buffer handler\n";
for (int i = 0; i < 10; i++) { for (int i = 0; i < 20; i++) {
FieldData fd = bufferHandler.nextFieldData(); FieldData fdU = bufferHandlerU.nextFieldData();
FieldData fdV = bufferHandlerV.nextFieldData();
middleOfTwoValues<<<1, 1>>>(ptr_test_read, *bufferHandler.fmd, fd); middleOfTwoValues<<<1, 1>>>(ptr_test_read, *bufferHandlerU.fmd, fdU);
cudaDeviceSynchronize(); cudaDeviceSynchronize();
std::cout << "ptr_test_read = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n"; std::cout << "ptr_test_read U = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n";
middleOfTwoValues<<<1, 1>>>(ptr_test_read, *bufferHandlerV.fmd, fdV);
cudaDeviceSynchronize();
std::cout << "ptr_test_read V = " << std::fixed << std::setprecision(6) << *ptr_test_read << "\n";
} }
// TODO: measure data transfer time in this example code. // TODO: measure data transfer time in this example code.