still need to modify datareader to use netcdf on only one thread

This commit is contained in:
Robin 2024-12-23 20:48:00 +01:00
parent a7ad248c38
commit 7b3c87c656
11 changed files with 487 additions and 77 deletions

View File

@ -0,0 +1,58 @@
#include "datareader.h"
#include <netcdf>
#include <optional>
using namespace std;
using namespace netCDF;
DataReader::DataReader(const std::string &path, std::string variableName):
filePathManager(path), variableName(variableName) { }
size_t DataReader::fileLength(size_t fileIndex) {
cout << "filePathMan = " << filePathManager.getPath(fileIndex) << " bla = " << fileIndex << "\n";
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
multimap<string, NcVar> vars = data.getVars();
NcVar var = vars.find(variableName)->second;
size_t length = 1;
for (NcDim dim: var.getDims()) {
length *= dim.getSize();
}
// TODO: Turns out c-NetCDF is not thread safe :( https://github.com/Unidata/netcdf-c/issues/1373
// size_t length = 34933248;
std::cout << "return length" << length << "\n";
return length;
}
template <typename T>
void DataReader::loadFile(T* dataOut, size_t fileIndex) {
std::cout << "loading file" << fileIndex <<"\n";
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
// multimap<string, NcVar> vars = data.getVars();
// NcVar var = vars.find(variableName)->second;
// var.getVar(dataOut);
}
template void DataReader::loadFile<float>(float* dataOut, size_t fileIndex);
template void DataReader::loadFile<int>(int* dataOut, size_t fileIndex);
DataReader::~DataReader() {
}
// template <typename T>
// void DataReader::readAndAllocateAxis(T** axis_ptr, size_t *size, const string &varName) {
// assert(var.getDimCount() == 1);
// netCDF::NcDim dim = var.getDim(0);
// *size = dim.getSize();
// cudaError_t status = cudaMallocManaged(axis_ptr, *size*sizeof(T));
// var.getVar(*axis_ptr);
// }

View File

@ -0,0 +1,26 @@
#ifndef DATAREADER_H
#define DATAREADER_H
#include <string>
#include <experimental/propagate_const>
#include <memory>
#include "filepathmanager.h"
class DataReader {
public:
DataReader(const std::string &path, std::string variableName);
size_t fileLength(size_t fileIndex);
template <typename T>
void loadFile(T* dataOut, size_t fileIndex);
~DataReader();
private:
FilePathManager filePathManager;
std::string variableName;
};
#endif //DATAREADER_H

View File

@ -0,0 +1,40 @@
#include <iostream>
#include <vector>
#include <string>
#include <filesystem>
#include <algorithm>
#include "filepathmanager.h"
namespace fs = std::filesystem;
FilePathManager::FilePathManager(const std::string& path) {
// Check if the directory exists
if (!fs::exists(path) || !fs::is_directory(path)) {
std::cerr << "Error: Directory does not exist or is not valid." << std::endl;
exit(1);
}
// Iterate over directory entries
for (const auto& entry : fs::directory_iterator(path)) {
if (entry.is_regular_file() && entry.path().extension() == ".nc4") {
fileNames.push_back(entry.path().string());
}
}
// Sort file names lexicographically
std::sort(fileNames.begin(), fileNames.end());
// Print sorted file names
std::cout << "Files in directory '" << path << "' sorted lexicographically:\n";
for (const auto& fileName : fileNames) {
std::cout << fileName << std::endl;
}
}
const char* FilePathManager::getPath(size_t index) const {
return fileNames.at(index).c_str();
}
FilePathManager::~FilePathManager() { }

View File

@ -0,0 +1,19 @@
#ifndef FILEPATHMANAGER_H
#define FILEPATHMANAGER_H
#include <string>
#include <vector>
class FilePathManager {
public:
FilePathManager(const std::string& path);
size_t getNumberOfFiles();
const char* getPath(size_t index) const;
~FilePathManager();
private:
std::vector<std::string> fileNames;
};
#endif //FILEPATHMANAGER_H

View File

@ -1,66 +1,183 @@
#include "gpubuffer.h" #include "gpubuffer.h"
#include "fielddata.h"
#include "gpubufferhelper.h" #include <mutex>
#include <thread>
#include <queue>
#include <cassert>
#include <condition_variable>
#include <iostream>
#include <netcdf> #include <netcdf>
using namespace std;
using namespace netCDF; using namespace netCDF;
GPUBuffer::GPUBuffer(std::string path, std::string variableName): using namespace std;
path(path), variableName(variableName) {
NcFile data(path, NcFile::read);
multimap<string, NcVar> vars = data.getVars(); struct File {
condition_variable cv;
mutex m;
bool valid;
size_t size;
float *h_data; // host data
float *d_data; // device data
};
cudaMallocManaged(&fmd, sizeof(FieldMetadata)); struct LoadFileJob {
size_t fileIndex;
size_t bufferIndex;
};
readAndAllocateAxis<double>(&fmd->lons, &fmd->widthSize, vars.find("lon")->second); class GPUBuffer::impl {
readAndAllocateAxis<double>(&fmd->lats, &fmd->heightSize, vars.find("lat")->second); public:
readAndAllocateAxis<double>(&fmd->levs, &fmd->depthSize, vars.find("lev")->second); impl(DataReader& dataReader);
} void loadFile(LoadFileJob job);
~impl();
FieldData GPUBuffer::nextFieldData() { File buffer[numBufferedFiles];
NcFile data(path, NcFile::read);
multimap<string, NcVar> vars = data.getVars(); // Thread worker things
void worker();
queue<LoadFileJob> jobs;
unique_ptr<thread> ioworker;
cudaStream_t iostream;
DataReader& dataReader;
condition_variable queuecv;
mutex queuecv_m;
bool workerRunning = true;
};
FieldData fd; GPUBuffer::GPUBuffer(DataReader& dataReader): pImpl(make_unique<impl>(dataReader)) { }
size_t timeSize;
readAndAllocateAxis(&fd.times, &fd.timeSize, vars.find("time")->second);
NcVar var = vars.find(variableName)->second; GPUBuffer::impl::impl(DataReader& dataReader): dataReader(dataReader) {
cudaStreamCreate(&iostream);
// size_t size = dataReader.fileLength(0);
size_t size = 5;
for (size_t i = 0; i < numBufferedFiles; i++) {
{
File &file = buffer[i];
lock_guard<mutex> lk(file.m);
cudaMallocHost(&file.h_data, sizeof(float)*size);
cudaError_t status = cudaMalloc(&file.d_data, sizeof(float)*size);
if (status != cudaSuccess)
cerr << "Error allocating device memory. Status code: " << status << "\n";
file.size = size;
file.valid = false;
}
{
lock_guard<mutex> lk(queuecv_m);
LoadFileJob job = {
.fileIndex = i,
.bufferIndex = i
};
jobs.push(job);
}
int length = 1;
for (NcDim dim: var.getDims()) {
length *= dim.getSize();
} }
auto t = thread([]() {
NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read);
multimap<string, NcVar> vars = data.getVars();
});
t.join();
// Store NetCDF variable in pinned memory on host auto tt = thread([]() {
float *h_array; NcFile data("data/MERRA2_400.inst6_3d_ana_Np.20120911.nc4", NcFile::read);
multimap<string, NcVar> vars = data.getVars();
});
tt.join();
cudaMallocHost(&h_array, sizeof(float)*length); ioworker = make_unique<thread>([this]() { worker(); });
var.getVar(h_array);
// Copy data to device
cudaError_t status = cudaMalloc(&fd.valArrays[0], sizeof(float)*length);
if (status != cudaSuccess)
cout << "Error allocating memory: " << status << "\n";
cudaMemcpyAsync(fd.valArrays[0], h_array, sizeof(float)*length, cudaMemcpyHostToDevice);
cudaDeviceSynchronize(); // Heavy hammer synchronisation // TODO: Use streams
cudaFreeHost(h_array);
return fd;
} }
GPUBuffer::~GPUBuffer() { GPUBuffer::~GPUBuffer() { }
cudaFree(fmd->lons);
cudaFree(fmd->lats); GPUBuffer::impl::~impl() {
cudaFree(fmd->levs); {
cudaFree(fmd); lock_guard<mutex> lk(queuecv_m);
workerRunning = false;
}
queuecv.notify_all();
ioworker->join();
for (size_t i = 0; i < numBufferedFiles; i++) {
File &file = buffer[i];
cudaFree(file.d_data);
cudaFree(file.h_data);
}
cudaStreamDestroy(iostream);
}
void GPUBuffer::impl::loadFile(LoadFileJob job) {
File &file = buffer[job.bufferIndex];
{
lock_guard<mutex> lk(file.m);
assert(!file.valid);
dataReader.loadFile<float>(file.h_data, job.fileIndex);
cudaMemcpyAsync(file.d_data, file.h_data, sizeof(float)*file.size, cudaMemcpyHostToDevice, iostream);
cudaStreamSynchronize(iostream);
buffer[job.bufferIndex].valid = true;
cout << "loaded file with index" << job.bufferIndex << "\n";
}
file.cv.notify_all();
}
void GPUBuffer::loadFile(size_t fileIndex, size_t bufferIndex) {
LoadFileJob job = {
.fileIndex = fileIndex,
.bufferIndex = bufferIndex
};
// Main thread theoretically blocks on 2 mutexes here
// but it _should_ never have to wait for them.
{
File &file = pImpl->buffer[bufferIndex];
std::unique_lock<std::mutex> lk(file.m, std::defer_lock);
bool lockval = lk.try_lock();
if (!lockval) {
cout << "waited on GPUBuffer during loadFile orchestration :(\n";
lk.lock();
}
file.valid = false;
}
{
std::unique_lock<std::mutex> lk(pImpl->queuecv_m, std::defer_lock);
bool lockval = lk.try_lock();
if (!lockval) {
cout << "waited on IOworker queue during loadFile orchestration :(\n";
lk.lock();
}
pImpl->jobs.push(job);
}
pImpl->queuecv.notify_all();
}
void GPUBuffer::impl::worker() {
while(workerRunning) {
LoadFileJob job;
{
unique_lock<mutex> lk(queuecv_m);
queuecv.wait(lk, [this]{ return !workerRunning || !jobs.empty(); });
if (!workerRunning) {
return;
}
job = jobs.front();
jobs.pop();
}
loadFile(job);
}
}
DataHandle GPUBuffer::getDataHandle(size_t bufferIndex) {
File &file = pImpl->buffer[bufferIndex];
// TODO: Might be nice to measure the blocking time here.
unique_lock<mutex> lk(file.m);
file.cv.wait(lk, [this, bufferIndex]{ return pImpl->buffer[bufferIndex].valid; });
DataHandle dh = {
.d_data = file.d_data,
.size = file.size
};
return dh;
} }

View File

@ -1,24 +1,31 @@
#ifndef GPUBUFFER_H #ifndef GPUBUFFER_H
#define GPUBUFFER_H #define GPUBUFFER_H
#include "fielddata.h"
#include <string> #include <string>
#include <memory>
#include <experimental/propagate_const>
#include "datareader.h"
struct DataHandle {
float *d_data;
size_t size;
};
class GPUBuffer { class GPUBuffer {
public: public:
GPUBuffer(std::string path, std::string variableName); static constexpr size_t numBufferedFiles = 3;
FieldData nextFieldData(); GPUBuffer(DataReader& dataReader);
void loadFile(size_t fileIndex, size_t bufferIndex); // Async call
DataHandle getDataHandle(size_t bufferIndex); // Potentially blocking
~GPUBuffer(); ~GPUBuffer();
FieldMetadata *fmd;
private: private:
// TODO: Implement GPUBuffer class impl;
std::string path; std::experimental::propagate_const<std::unique_ptr<impl>> pImpl;
std::string variableName;
}; };
#endif //GPUBUFFER_H #endif //GPUBUFFER_H

View File

@ -0,0 +1,65 @@
#include "gpubufferhandler.h"
#include "fielddata.h"
#include "gpubufferhelper.h"
#include <netcdf>
using namespace std;
using namespace netCDF;
GPUBufferHandler::GPUBufferHandler(const std::string &path, std::string variableName):
filePathManager(path), variableName(variableName), presentTimeIndex(0), fileIndex(0) {
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
multimap<string, NcVar> vars = data.getVars();
cudaMallocManaged(&fmd, sizeof(FieldMetadata));
readAndAllocateAxis<double>(&fmd->lons, &fmd->widthSize, vars.find("lon")->second);
readAndAllocateAxis<double>(&fmd->lats, &fmd->heightSize, vars.find("lat")->second);
readAndAllocateAxis<double>(&fmd->levs, &fmd->depthSize, vars.find("lev")->second);
}
FieldData GPUBufferHandler::nextFieldData() {
NcFile data(filePathManager.getPath(fileIndex), NcFile::read);
multimap<string, NcVar> vars = data.getVars();
FieldData fd;
size_t timeSize;
readAndAllocateAxis(&fd.times, &fd.timeSize, vars.find("time")->second);
NcVar var = vars.find(variableName)->second;
int length = 1;
for (NcDim dim: var.getDims()) {
length *= dim.getSize();
}
// Store NetCDF variable in pinned memory on host
float *h_array;
cudaMallocHost(&h_array, sizeof(float)*length);
var.getVar(h_array);
// Copy data to device
cudaError_t status = cudaMalloc(&fd.valArrays[0], sizeof(float)*length);
if (status != cudaSuccess)
cout << "Error allocating memory: " << status << "\n";
cudaMemcpyAsync(fd.valArrays[0], h_array, sizeof(float)*length, cudaMemcpyHostToDevice);
cudaDeviceSynchronize(); // Heavy hammer synchronisation // TODO: Use streams
cudaFreeHost(h_array);
return fd;
}
GPUBufferHandler::~GPUBufferHandler() {
cudaFree(fmd->lons);
cudaFree(fmd->lats);
cudaFree(fmd->levs);
cudaFree(fmd);
}

View File

@ -0,0 +1,28 @@
#ifndef GPUBUFFERHANDLER_H
#define GPUBUFFERHANDLER_H
#include "fielddata.h"
#include "filepathmanager.h"
#include <string>
class GPUBufferHandler {
public:
GPUBufferHandler(const std::string &path, std::string variableName);
FieldData nextFieldData();
~GPUBufferHandler();
FieldMetadata *fmd;
private:
// TODO: Implement GPUBuffer
FilePathManager filePathManager;
std::string variableName;
size_t presentTimeIndex;
size_t fileIndex;
};
#endif //GPUBUFFERHANDLER_H

View File

@ -1,7 +1,11 @@
#include <netcdf> #include <netcdf>
#include <cassert> #include <cassert>
using namespace std;
using namespace netCDF;
template <typename T> template <typename T>
void readAndAllocateAxis(T** axis_ptr, size_t *size, const netCDF::NcVar &var) { void readAndAllocateAxis(T** axis_ptr, size_t *size, NcVar var) {
assert(var.getDimCount() == 1); assert(var.getDimCount() == 1);
netCDF::NcDim dim = var.getDim(0); netCDF::NcDim dim = var.getDim(0);
*size = dim.getSize(); *size = dim.getSize();

View File

@ -1,19 +1,35 @@
#include "hurricanedata/fielddata.h" // #include "hurricanedata/fielddata.h"
// #include "hurricanedata/gpubufferhandler.h"
#include "hurricanedata/datareader.h"
#include "hurricanedata/gpubuffer.h" #include "hurricanedata/gpubuffer.h"
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <device_launch_parameters.h> #include <device_launch_parameters.h>
#include <iostream> #include <iostream>
#include <cmath> #include <cmath>
#include <memory>
#include <iomanip> #include <iomanip>
// Not parallel computation // Not parallel computation
__global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd) { // __global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd) {
// float sum = 0;
// size_t num_not_masked_values = 0;
// for (int i = 0; i < fmd.widthSize; i++) {
// double xi = getVal(fmd, fd, 2, 20, 100, i);
// if (xi < 1E14) { /* If x is not missing value */
// num_not_masked_values++;
// sum += xi;
// }
// }
// *ans = sum/num_not_masked_values;
// }
__global__ void computeMean(float *ans, DataHandle dh) {
float sum = 0; float sum = 0;
size_t num_not_masked_values = 0; size_t num_not_masked_values = 0;
for (int i = 0; i < fmd.widthSize; i++) { for (int i = 0; i < dh.size; i++) {
double xi = getVal(fmd, fd, 2, 20, 100, i); double xi = dh.d_data[i];
if (xi < 1E14) { /* If x is not missing value */ if (xi < 1E14) { /* If x is not missing value */
num_not_masked_values++; num_not_masked_values++;
sum += xi; sum += xi;
@ -23,22 +39,37 @@ __global__ void computeMean(float *ans, const FieldMetadata &fmd, FieldData fd)
} }
int main() { int main() {
std::string path = "data/MERRA2_400.inst6_3d_ana_Np.20120101.nc4"; std::string path = "data";
std::string variable = "T"; std::string variable = "T";
GPUBuffer buffer{path, variable};
auto fd = buffer.nextFieldData(); // std::unique_ptr<DataReader> dataReader = std::make_unique<DataReader>(path, variable);
DataReader dataReader{path, variable};
float *ptr_mean; std::cout << "created datareader\n";
cudaMallocManaged(&ptr_mean, sizeof(float));
computeMean<<<1, 1>>>(ptr_mean, *buffer.fmd, fd); GPUBuffer buffer (dataReader);
cudaDeviceSynchronize(); std::cout << "created buffer\n";
std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n"; auto dataHandle = buffer.getDataHandle(0);
cudaFree(fd.valArrays[0]); // std::cout << "got a data handle\n";
cudaFree(ptr_mean);
// GPUBufferHandler buffer{path, variable};
// auto fd = buffer.nextFieldData();
// float *ptr_mean;
// cudaMallocManaged(&ptr_mean, sizeof(float));
// computeMean<<<1, 1>>>(ptr_mean, dataHandle);
// cudaDeviceSynchronize();
// std::cout << "Mean = " << std::fixed << std::setprecision(6) << *ptr_mean << "\n";
// // cudaFree(fd.valArrays[0]);
// cudaFree(ptr_mean);
return 0; return 0;
} }

View File

@ -24,12 +24,27 @@ print("Sum of U:", U_sum)
sumval = 0 sumval = 0
row = U[2,20,100] row = U[2,20,100]
print("Shape of row", row.shape)
print("Row data for first 20 entries", row[0:5])
row = row[0:5]
print(f"{type(row)} {row.dtype}")
n = 0 n = 0
for val in row: for val in row:
if not np.ma.is_masked(val): #if not np.ma.is_masked(val):
n+=1 n+=1
sumval += val sumval += np.float64(val)
print(f"Why does {np.mean(row)=} not equal {sumval/n=} ?!") Mean1 = np.mean(row)
Mean2 = sumval/n
print(type(Mean1))
print(type(Mean2))
print(Mean1)
print(Mean2)
print(f"Why does {np.mean(row):.10f} not equal {sumval/n:.10f} ?!")
# Close the NetCDF file # Close the NetCDF file
ncfile.close() ncfile.close()