working metadata reading

This commit is contained in:
Robin 2024-12-23 23:42:51 +01:00
parent 7b3c87c656
commit 109566d7b6
6 changed files with 186 additions and 64 deletions

View File

@ -1,7 +1,7 @@
#include "datareader.h"
#include <netcdf>
#include <optional>
#include <cassert>
using namespace std;
using namespace netCDF;
@ -10,8 +10,6 @@ DataReader::DataReader(const std::string &path, std::string variableName):
filePathManager(path), variableName(variableName) { }
size_t DataReader::fileLength(size_t fileIndex) {
cout << "filePathMan = " << filePathManager.getPath(fileIndex) << " bla = " << fileIndex << "\n";
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
multimap<string, NcVar> vars = data.getVars();
@ -23,24 +21,42 @@ size_t DataReader::fileLength(size_t fileIndex) {
length *= dim.getSize();
}
// TODO: Turns out c-NetCDF is not thread safe :( https://github.com/Unidata/netcdf-c/issues/1373
// size_t length = 34933248;
std::cout << "return length" << length << "\n";
return length;
}
size_t DataReader::axisLength(size_t fileIndex, const std::string& axisName) {
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
multimap<string, NcVar> vars = data.getVars();
NcVar var = vars.find(axisName)->second;
assert(var.getDimCount() == 1);
netCDF::NcDim dim = var.getDim(0);
return dim.getSize();
}
template <typename T>
void DataReader::loadFile(T* dataOut, size_t fileIndex) {
std::cout << "loading file" << fileIndex <<"\n";
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
// multimap<string, NcVar> vars = data.getVars();
// NcVar var = vars.find(variableName)->second;
// var.getVar(dataOut);
loadFile(dataOut, fileIndex, variableName);
}
template <typename T>
void DataReader::loadFile(T* dataOut, size_t fileIndex, const string& variableName) {
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
multimap<string, NcVar> vars = data.getVars();
NcVar var = vars.find(variableName)->second;
var.getVar(dataOut);
}
template void DataReader::loadFile<float>(float* dataOut, size_t fileIndex, const string& variableName);
template void DataReader::loadFile<int>(int* dataOut, size_t fileIndex, const string& variableName);
template void DataReader::loadFile<double>(double* dataOut, size_t fileIndex, const string& variableName);
template void DataReader::loadFile<double>(double* dataOut, size_t fileIndex);
template void DataReader::loadFile<float>(float* dataOut, size_t fileIndex);
template void DataReader::loadFile<int>(int* dataOut, size_t fileIndex);

View File

@ -16,6 +16,14 @@ public:
template <typename T>
void loadFile(T* dataOut, size_t fileIndex);
template <typename T>
void loadFile(T* dataOut, size_t fileIndex, const std::string& variableName);
size_t axisLength(size_t fileIndex, const std::string& axisName);
// template <typename T>
// void readAndAllocateAxis(T** axis_ptr, size_t *size, NcVar var) {
~DataReader();
private:
FilePathManager filePathManager;

View File

@ -6,9 +6,8 @@
#include <cassert>
#include <condition_variable>
#include <iostream>
#include <netcdf>
using namespace netCDF;
#include <future>
#include <variant>
using namespace std;
@ -26,17 +25,41 @@ struct LoadFileJob {
size_t bufferIndex;
};
struct GetSizeJob {
size_t fileIndex;
promise<size_t> size;
};
struct GetAxisDoubleJob {
size_t fileIndex;
string axisName;
promise<pair<size_t, double *>> axis;
};
struct GetAxisIntJob {
size_t fileIndex;
string axisName;
promise<pair<size_t, int *>> axis;
};
using Job = variant<LoadFileJob, GetSizeJob, GetAxisIntJob, GetAxisDoubleJob>;
class GPUBuffer::impl {
public:
impl(DataReader& dataReader);
void loadFile(LoadFileJob job);
void loadFile(size_t fileIndex, size_t bufferIndex);
void getSize(GetSizeJob job);
size_t getSize(size_t fileIndex); // Most probably blocking
void getAxis(GetAxisDoubleJob job);
void getAxis(GetAxisIntJob job);
~impl();
File buffer[numBufferedFiles];
// Thread worker things
void worker();
queue<LoadFileJob> jobs;
queue<Job> jobs;
unique_ptr<thread> ioworker;
cudaStream_t iostream;
DataReader& dataReader;
@ -47,10 +70,59 @@ public:
GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique<impl>(dataReader)) { }
size_t GPUBuffer::impl::getSize(size_t fileIndex) {
promise<size_t> promise;
future<size_t> future = promise.get_future();
{
lock_guard<mutex> lk(queuecv_m);
jobs.push(GetSizeJob{fileIndex, move(promise)});
}
queuecv.notify_all();
future.wait();
return future.get();
}
template <>
pair<size_t, double *> GPUBuffer::getAxis(size_t fileIndex, const string& axisName) {
promise<pair<size_t, double *>> promise;
future<pair<size_t, double *>> future = promise.get_future();
{
lock_guard<mutex> lk(pImpl->queuecv_m);
pImpl->jobs.push(GetAxisDoubleJob{fileIndex, axisName, move(promise)});
}
pImpl->queuecv.notify_all();
future.wait();
return future.get();
}
template pair<size_t, double *> GPUBuffer::getAxis<double>(size_t fileIndex, const string& axisName);
template <>
pair<size_t, int *> GPUBuffer::getAxis(size_t fileIndex, const string& axisName) {
promise<pair<size_t, int *>> promise;
future<pair<size_t, int *>> future = promise.get_future();
{
lock_guard<mutex> lk(pImpl->queuecv_m);
pImpl->jobs.push(GetAxisIntJob{fileIndex, axisName, move(promise)});
}
pImpl->queuecv.notify_all();
future.wait();
return future.get();
}
template pair<size_t, int *> GPUBuffer::getAxis<int>(size_t fileIndex, const string& axisName);
GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) {
cudaStreamCreate(&iostream);
// size_t size = dataReader.fileLength(0);
size_t size = 5;
ioworker = make_unique<thread>([this]() { worker(); });
size_t size = getSize(0);
cout << "size = " << size << "\n";
for (size_t i = 0; i < numBufferedFiles; i++) {
{
@ -63,29 +135,18 @@ GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) {
file.size = size;
file.valid = false;
}
loadFile(i, i);
{
lock_guard<mutex> lk(queuecv_m);
LoadFileJob job = {
.fileIndex = i,
.bufferIndex = i
};
jobs.push(job);
// lock_guard<mutex> lk(queuecv_m);
// LoadFileJob job = {
// .fileIndex = i,
// .bufferIndex = i
// };
// cout << "enqueue file load job\n";
// jobs.push(job);
}
}
auto t = thread([]() {
NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read);
multimap<string, NcVar> vars = data.getVars();
});
t.join();
auto tt = thread([]() {
NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read);
multimap<string, NcVar> vars = data.getVars();
});
tt.join();
ioworker = make_unique<thread>([this]() { worker(); });
}
GPUBuffer::~GPUBuffer() { }
@ -120,7 +181,32 @@ void GPUBuffer::impl::loadFile(LoadFileJob job) {
file.cv.notify_all();
}
void GPUBuffer::impl::getSize(GetSizeJob job) {
size_t size = dataReader.fileLength(job.fileIndex);
job.size.set_value(size);
}
void GPUBuffer::impl::getAxis(GetAxisDoubleJob job) {
pair<size_t, double *> array;
array.first = dataReader.axisLength(job.fileIndex, job.axisName);
cudaError_t status = cudaMallocManaged(&array.second, array.first*sizeof(double));
dataReader.loadFile<double>(array.second, job.fileIndex);
job.axis.set_value(array);
}
void GPUBuffer::impl::getAxis(GetAxisIntJob job) {
pair<size_t, int *> array;
array.first = dataReader.axisLength(job.fileIndex, job.axisName);
cudaError_t status = cudaMallocManaged(&array.second, array.first*sizeof(int));
dataReader.loadFile<int>(array.second, job.fileIndex, job.axisName);
job.axis.set_value(array);
}
void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) {
pImpl->loadFile(fileIndex, bufferIndex);
}
void GPUBuffer::impl::loadFile(size_t fileIndex, size_t bufferIndex) {
LoadFileJob job = {
.fileIndex = fileIndex,
.bufferIndex = bufferIndex
@ -129,7 +215,7 @@ void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) {
// Main thread theoretically blocks on 2 mutexes here
// but it _should_ never have to wait for them.
{
File &file = pImpl->buffer[bufferIndex];
File &file = buffer[bufferIndex];
std::unique_lock<std::mutex> lk(file.m, std::defer_lock);
bool lockval = lk.try_lock();
@ -140,20 +226,20 @@ void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) {
file.valid = false;
}
{
std::unique_lock<std::mutex> lk(pImpl->queuecv_m, std::defer_lock);
std::unique_lock<std::mutex> lk(queuecv_m, std::defer_lock);
bool lockval = lk.try_lock();
if (!lockval) {
cout << "waited on IOworker queue during loadFile orchestration :(\n";
lk.lock();
}
pImpl->jobs.push(job);
jobs.push(job);
}
pImpl->queuecv.notify_all();
queuecv.notify_all();
}
void GPUBuffer::impl::worker() {
while(workerRunning) {
LoadFileJob job;
Job job;
{
unique_lock<mutex> lk(queuecv_m);
queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); });
@ -161,10 +247,19 @@ void GPUBuffer::impl::worker() {
return;
}
job = jobs.front();
job = move(jobs.front());
jobs.pop();
}
loadFile(job);
if(holds_alternative<LoadFileJob>(job)) {
loadFile(get<LoadFileJob>(job));
} else if(holds_alternative<GetSizeJob>(job)) {
getSize(move(get<GetSizeJob>(job)));
} else if(holds_alternative<GetAxisDoubleJob>(job)) {
getAxis(move(get<GetAxisDoubleJob>(job)));
} else if(holds_alternative<GetAxisIntJob>(job)) {
getAxis(move(get<GetAxisIntJob>(job)));
}
}
}

View File

@ -18,10 +18,13 @@ public:
GPUBuffer(DataReader& dataReader);
void loadFile(size_t fileIndex, size_t bufferIndex); // Async call
void loadFile(size_t fileIndex, size_t bufferIndex); // No blocking
DataHandle getDataHandle(size_t bufferIndex); // Potentially blocking
template <typename T>
std::pair<size_t, T *> getAxis(size_t fileIndex, const std::string& axisName); // Most probably blocking
~GPUBuffer();
private:
class impl;

View File

@ -2,13 +2,13 @@
#define GPUBUFFERHANDLER_H
#include "fielddata.h"
#include "filepathmanager.h"
#include "gpubuffer.h"
#include <string>
class GPUBufferHandler {
public:
GPUBufferHandler(const std::string &path, std::string variableName);
GPUBufferHandler();
FieldData nextFieldData();
@ -17,12 +17,7 @@ public:
FieldMetadata *fmd;
private:
// TODO: Implement GPUBuffer
FilePathManager filePathManager;
std::string variableName;
size_t presentTimeIndex;
size_t fileIndex;
GPUBuffer
};
#endif //GPUBUFFERHANDLER_H

View File

@ -54,22 +54,27 @@ int main() {
auto dataHandle = buffer.getDataHandle(0);
// std::cout << "got a data handle\n";
std::cout << "got a data handle\n";
auto x = buffer.getAxis<int>(0, "time");
std::cout << "size of x=" << x.first << "\n";
std::cout << "x[1]= " <<x.second[1] << "\n";
// GPUBufferHandler buffer{path, variable};
// auto fd = buffer.nextFieldData();
// float *ptr_mean;
// cudaMallocManaged(&ptr_mean, sizeof(float));
float *ptr_mean;
cudaMallocManaged(&ptr_mean, sizeof(float));
// computeMean<<<1, 1>>>(ptr_mean, dataHandle);
computeMean<<<1, 1>>>(ptr_mean, dataHandle);
// cudaDeviceSynchronize();
cudaDeviceSynchronize();
// std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n";
std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n";
// // cudaFree(fd.valArrays[0]);
// cudaFree(ptr_mean);
// cudaFree(fd.valArrays[0]);
cudaFree(ptr_mean);
return 0;
}