Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write prebuffered data #18

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ endif()
# Dependencies
# =============================================================================

find_package(HDF5)
if(SONATA_REPORT_ENABLE_MPI)
find_package(MPI REQUIRED)
if (MPI_FOUND)
# With MPI we prefer HDF5 parallel
set(HDF5_PREFER_PARALLEL "ON")
endif()

find_package(HDF5)
if (MPI_FOUND)
if (HDF5_FOUND)
if (HDF5_IS_PARALLEL)
Expand All @@ -102,6 +101,7 @@ if(SONATA_REPORT_ENABLE_MPI)
endif()
endif()
else()
find_package(HDF5)
jorblancoa marked this conversation as resolved.
Show resolved Hide resolved
if (HDF5_IS_PARALLEL)
message(FATAL_ERROR "HDF5 parallel IO found without MPI")
else()
Expand Down
9 changes: 9 additions & 0 deletions include/bbp/sonata/reports.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ int sonata_record_node_data(double step,
*/
int sonata_record_data(double step);

/**
* \brief Write steps_to_write steps given a pre-buffered data
* \return -3 if the Sonata report doesn't exist, -1 if the report name doesn't exist, 0 otherwise
*/
int sonata_write_buffered_data(const char* report_name,
const float* buffered_data,
size_t buffered_data_size,
uint32_t steps_to_write);

/**
* \brief Check status of the recordings/buffers and flush if necessary
* \return 0
Expand Down
55 changes: 30 additions & 25 deletions src/data/sonata_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ SonataData::SonataData(const std::string& report_name,
time_ = {round(tstart / dt) * dt, tend, dt};

reporting_period_ = static_cast<int>(dt / SonataReport::atomic_step_);
last_step_recorded_ = tstart / SonataReport::atomic_step_;
last_step_ = tend / SonataReport::atomic_step_;
previous_step_recorded_ = tstart / SonataReport::atomic_step_;
final_step_ = tend / SonataReport::atomic_step_;
jorblancoa marked this conversation as resolved.
Show resolved Hide resolved
}

SonataData::SonataData(const std::string& report_name)
Expand Down Expand Up @@ -96,30 +96,30 @@ void SonataData::prepare_buffer(size_t max_buffer_size) {

bool SonataData::is_due_to_report(double step) const noexcept {
// Dont record data if current step < tstart
if (step < last_step_recorded_) {
if (step < previous_step_recorded_) {
return false;
// Dont record data if current step > tend
} else if (step > last_step_) {
} else if (step > final_step_) {
return false;
// Dont record data if is not a reporting step (step%period)
} else if (static_cast<int>(step - last_step_recorded_) % reporting_period_ != 0) {
} else if (static_cast<int>(step - previous_step_recorded_) % reporting_period_ != 0) {
return false;
}
return true;
}

void SonataData::record_data(double step, const std::vector<uint64_t>& node_ids) {
// Calculate the offset to write into the buffer
uint32_t offset = static_cast<uint32_t>((step - last_step_recorded_) / reporting_period_);
uint32_t local_position = last_position_ + total_elements_ * offset;
uint32_t offset = static_cast<uint32_t>((step - previous_step_recorded_) / reporting_period_);
uint32_t local_position = previous_position_ + total_elements_ * offset;
if (SonataReport::rank_ == 0) {
logger->trace(
"RANK={} Recording data for step={} last_step_recorded={} first node_id={} "
"RANK={} Recording data for step={} previous_step_recorded={} first node_id={} "
"buffer_size={} "
"and offset={}",
SonataReport::rank_,
step,
last_step_recorded_,
previous_step_recorded_,
node_ids[0],
report_buffer_.size(),
local_position);
Expand All @@ -145,13 +145,14 @@ void SonataData::record_data(double step, const std::vector<uint64_t>& node_ids)
}

void SonataData::record_data(double step) {
uint32_t local_position = last_position_;
uint32_t local_position = previous_position_;
if (SonataReport::rank_ == 0) {
logger->trace(
"RANK={} Recording data for step={} last_step_recorded={} buffer_size={} and offset={}",
"RANK={} Recording data for step={} previous_step_recorded={} buffer_size={} and "
"offset={}",
SonataReport::rank_,
step,
last_step_recorded_,
previous_step_recorded_,
report_buffer_.size(),
local_position);
}
Expand All @@ -160,11 +161,11 @@ void SonataData::record_data(double step) {
local_position += kv.second->get_num_elements();
}
current_step_++;
last_position_ += total_elements_;
last_step_recorded_ += reporting_period_;
previous_position_ += total_elements_;
previous_step_recorded_ += reporting_period_;

if (current_step_ == steps_to_write_) {
write_data();
write_data(report_buffer_, current_step_);
jorblancoa marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -177,8 +178,8 @@ void SonataData::check_and_write(double timestep) {
logger->trace("Updating timestep t={}", timestep);
}
current_step_ += steps_recorded_;
last_position_ += total_elements_ * steps_recorded_;
last_step_recorded_ += reporting_period_ * steps_recorded_;
previous_position_ += total_elements_ * steps_recorded_;
previous_step_recorded_ += reporting_period_ * steps_recorded_;
nodes_recorded_.clear();
// Write when buffer is full, finish all remaining recordings or when record several steps in a
// row
Expand All @@ -194,7 +195,7 @@ void SonataData::check_and_write(double timestep) {
remaining_steps_,
steps_recorded_);
}
write_data();
write_data(report_buffer_, current_step_);
}
steps_recorded_ = 0;
}
Expand Down Expand Up @@ -303,24 +304,28 @@ void SonataData::write_spike_populations() {
}
}

void SonataData::write_data() {
void SonataData::write_data(const std::vector<float>& buffered_data, uint32_t steps_to_write) {
if (remaining_steps_ <= 0) { // Nothing left to write
return;
}
if (current_step_ >= remaining_steps_) { // Avoid writing out of bounds
current_step_ = remaining_steps_;
if (steps_to_write >= remaining_steps_) { // Avoid writing out of bounds
steps_to_write = remaining_steps_;
}
hdf5_writer_->write_2D(report_buffer_, current_step_, total_elements_);
remaining_steps_ -= current_step_;
hdf5_writer_->write_2D(buffered_data, steps_to_write, total_elements_);
remaining_steps_ -= steps_to_write;
if (SonataReport::rank_ == 0) {
logger->debug("Writing timestep data to file {}", report_name_);
logger->debug("-Steps written: {}", current_step_);
logger->debug("-Steps written: {}", steps_to_write);
logger->debug("-Remaining steps: {}", remaining_steps_);
}
last_position_ = 0;
previous_position_ = 0;
current_step_ = 0;
}

void SonataData::flush() {
write_data(report_buffer_, current_step_);
}

void SonataData::close() {
hdf5_writer_->close();
}
Expand Down
9 changes: 5 additions & 4 deletions src/data/sonata_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class SonataData
void write_spike_populations();
void add_population(std::unique_ptr<Population>&& population);

void write_data();
void write_data(const std::vector<float>& buffered_data, uint32_t steps_to_write);
void flush();
void close();

bool is_due_to_report(double step) const noexcept;
Expand Down Expand Up @@ -106,11 +107,11 @@ class SonataData
uint32_t steps_to_write_ = 0;
uint32_t current_step_ = 0;
uint32_t steps_recorded_ = 0;
uint32_t last_position_ = 0;
uint32_t previous_position_ = 0;
uint32_t remaining_steps_ = 0;
uint32_t reporting_period_ = 0;
double last_step_recorded_ = 0.;
double last_step_ = 0.;
double previous_step_recorded_ = 0.;
double final_step_ = 0.;

std::vector<uint64_t> node_ids_;
std::vector<uint64_t> index_pointers_;
Expand Down
8 changes: 7 additions & 1 deletion src/library/report.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ void Report::record_data(double step) {
}
}

void Report::write_buffered_data(const std::vector<float>& buffered_data, uint32_t steps_to_write) {
for (const auto& sonata_data : sonata_populations_) {
sonata_data->write_data(buffered_data, steps_to_write);
}
}

void Report::check_and_flush(double timestep) {
for (const auto& sonata_data : sonata_populations_) {
sonata_data->check_and_write(timestep);
Expand All @@ -115,7 +121,7 @@ void Report::flush(double time) {
}
for (const auto& sonata_data : sonata_populations_) {
// Write if there are any remaining steps to write
sonata_data->write_data();
sonata_data->flush();
if (time - tend_ + dt_ / 2 > 1e-6) {
if (!report_is_closed_) {
sonata_data->close();
Expand Down
2 changes: 2 additions & 0 deletions src/library/report.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class Report

virtual void record_data(double step, const std::vector<uint64_t>& node_ids);
virtual void record_data(double step);
virtual void write_buffered_data(const std::vector<float>& buffered_data,
uint32_t steps_to_write);
virtual void check_and_flush(double timestep);
virtual void flush(double time);
void refresh_pointers(std::function<double*(double*)> refresh_function);
Expand Down
16 changes: 16 additions & 0 deletions src/library/reports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ int sonata_record_data(double step) {
return 0;
}

int sonata_write_buffered_data(const char* report_name,
const float* buffered_data,
size_t buffered_data_size,
uint32_t steps_to_write) {
if (sonata_report.is_empty()) {
return -3;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] You could also use standard errno values:
https://man7.org/linux/man-pages/man3/errno.3.html

}
if (!sonata_report.report_exists(report_name)) {
return -1;
}
const std::vector<float> data(buffered_data, buffered_data + buffered_data_size);
auto report = sonata_report.get_report(report_name);
report->write_buffered_data(data, steps_to_write);
return 0;
}

int sonata_check_and_flush(double timestep) {
auto functor = std::mem_fn(&bbp::sonata::Report::check_and_flush);
sonata_report.apply_all(functor, timestep);
Expand Down
Loading