Skip to content

Commit

Permalink
add option --num-threads, set it to 1 in E2E tests to avoid concurren…
Browse files Browse the repository at this point in the history
…cy issues with the std::stringstream buffer used to capture stdout and stderr
  • Loading branch information
patrickbr committed Oct 16, 2024
1 parent 7e3e294 commit e4c7cab
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 40 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ add_compile_options(-DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=0)

add_compile_options(-march=native)

set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address,undefined")

# Enable fast-math
add_compile_options(-ffast-math)

Expand Down
8 changes: 8 additions & 0 deletions apps/osm2rdf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
#include "osm2rdf/util/Ram.h"
#include "osm2rdf/util/Time.h"

#if defined(_OPENMP)
#include "omp.h"
#endif

// ____________________________________________________________________________
template <typename T>
void run(const osm2rdf::config::Config& config) {
Expand Down Expand Up @@ -71,6 +75,10 @@ int main(int argc, char** argv) {
(osm2rdf::util::ram::GIGA * 1.0)
<< "G" << std::endl;

#if defined(_OPENMP)
omp_set_num_threads(config.numThreads);
#endif

try {
if (config.outputFormat == "qlever") {
run<osm2rdf::ttl::format::QLEVER>(config);
Expand Down
3 changes: 3 additions & 0 deletions include/osm2rdf/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <unordered_set>
#include <vector>
#include <thread>

#include "osm2rdf/config/Constants.h"
#include "osm2rdf/ttl/Format.h"
Expand Down Expand Up @@ -69,6 +70,8 @@ struct Config {
bool addWayNodeSpatialMetadata = false;
bool skipWikiLinks = false;

int numThreads = std::thread::hardware_concurrency();

// Default settings for data
std::unordered_set<std::string> semicolonTagKeys;

Expand Down
12 changes: 9 additions & 3 deletions include/osm2rdf/config/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,15 @@ const static inline std::string SEMICOLON_TAG_KEYS_OPTION_HELP = "";
const static inline std::string AUX_GEO_FILES_INFO =
"Auxiliary geo files for computing spatial relations";
const static inline std::string AUX_GEO_FILES_OPTION_SHORT = "";
const static inline std::string AUX_GEO_FILES_OPTION_LONG =
"aux-geo-files";
const static inline std::string AUX_GEO_FILES_OPTION_HELP = "";
const static inline std::string AUX_GEO_FILES_OPTION_LONG = "aux-geo-files";
const static inline std::string AUX_GEO_FILES_OPTION_HELP =
"Auxiliary geo files for computing spatial relations";

const static inline std::string NUM_THREADS_INFO = "Number of threads to use";
const static inline std::string NUM_THREADS_OPTION_SHORT = "";
const static inline std::string NUM_THREADS_OPTION_LONG = "num-threads";
const static inline std::string NUM_THREADS_OPTION_HELP =
"Number of threads to use";

const static inline std::string WKT_PRECISION_INFO =
"Dumping WKT with precision: ";
Expand Down
10 changes: 10 additions & 0 deletions src/config/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ std::string osm2rdf::config::Config::getInfo(std::string_view prefix) const {
}
}
oss << "\n" << prefix << osm2rdf::config::constants::SECTION_MISCELLANEOUS;
oss << "\n"
<< prefix << "Num Threads: " << numThreads;

if (!storeLocationsOnDisk.empty()) {
oss << "\n"
Expand Down Expand Up @@ -294,6 +296,12 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
osm2rdf::config::constants::AUX_GEO_FILES_OPTION_LONG,
osm2rdf::config::constants::AUX_GEO_FILES_OPTION_HELP);

auto numThreadsOp =
parser.add<popl::Value<int>, popl::Attribute::advanced>(
osm2rdf::config::constants::NUM_THREADS_OPTION_SHORT,
osm2rdf::config::constants::NUM_THREADS_OPTION_LONG,
osm2rdf::config::constants::NUM_THREADS_OPTION_HELP, numThreads);

auto semicolonTagKeysOp =
parser.add<popl::Value<std::string>, popl::Attribute::advanced>(
osm2rdf::config::constants::SEMICOLON_TAG_KEYS_OPTION_SHORT,
Expand Down Expand Up @@ -458,6 +466,8 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
}
}

if (numThreadsOp->is_set()) numThreads = numThreadsOp->value();

writeRDFStatistics = writeRDFStatisticsOp->is_set();

// Output
Expand Down
8 changes: 4 additions & 4 deletions src/osm/GeometryHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ GeometryHandler<W>::GeometryHandler(const osm2rdf::config::Config& config,
: _config(config),
_writer(writer),
_sweeper(
{static_cast<size_t>(omp_get_max_threads()),
static_cast<size_t>(omp_get_max_threads()),
{static_cast<size_t>(config.numThreads),
static_cast<size_t>(config.numThreads),
"",
osm2rdf::ttl::constants::IRI__OPENGIS_INTERSECTS,
osm2rdf::ttl::constants::IRI__OPENGIS_CONTAINS,
Expand All @@ -85,7 +85,7 @@ GeometryHandler<W>::GeometryHandler(const osm2rdf::config::Config& config,
{},
[this](size_t progr) { this->progressCb(progr); }},
config.cache, ""),
_parseBatches(omp_get_max_threads()) {}
_parseBatches(config.numThreads) {}

// ___________________________________________________________________________
template <typename W>
Expand Down Expand Up @@ -275,7 +275,7 @@ void GeometryHandler<W>::calculateRelations() {
}

::util::JobQueue<ParseBatch> jobs(1000); // the WKT parse jobs
std::vector<std::thread> thrds(omp_get_max_threads()); // the parse workers
std::vector<std::thread> thrds(_config.numThreads); // the parse workers
for (size_t i = 0; i < thrds.size(); i++)
thrds[i] = std::thread(&processQueue, &jobs, i, &_sweeper);

Expand Down
25 changes: 17 additions & 8 deletions src/osm/OsmiumHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#include "osmium/io/reader_with_progress_bar.hpp"
#include "osmium/util/memory.hpp"

#if defined(_OPENMP)
#include "omp.h"
#endif

// ____________________________________________________________________________
template <typename W>
osm2rdf::osm::OsmiumHandler<W>::OsmiumHandler(
Expand Down Expand Up @@ -62,9 +66,7 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
osmium::osm_entity_bits::object};
{
while (auto buf = reader.read()) {
osmium::apply(buf, mp_manager,
_relationHandler,
countHandler);
osmium::apply(buf, mp_manager, _relationHandler, countHandler);
}
}
reader.close();
Expand All @@ -79,7 +81,15 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
std::cerr << std::endl;
std::cerr << osm2rdf::util::currentTimeFormatted()
<< "OSM Pass 2 ... (dump)" << std::endl;
osmium::io::Reader reader{input_file, osmium::osm_entity_bits::object};
osmium::thread::Pool pool(std::max(_config.numThreads - 2, 1),
osmium::thread::Pool::default_queue_size);

#if defined(_OPENMP)
omp_set_num_threads(_config.numThreads);
#endif

osmium::io::Reader reader{input_file, osmium::osm_entity_bits::object,
pool};
osm2rdf::osm::LocationHandler* locationHandler =
osm2rdf::osm::LocationHandler::create(_config);
_relationHandler.setLocationHandler(locationHandler);
Expand All @@ -99,10 +109,10 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
numTasks += countHandler.numRelations();
}
if (!_config.noFacts && !_config.noWayFacts) {
numTasks += countHandler.numWays();
numTasks += countHandler.numWays();
}
if (!_config.noGeometricRelations && !_config.noWayGeometricRelations) {
numTasks += countHandler.numWays();
numTasks += countHandler.numWays();
}

_progressBar = osm2rdf::util::ProgressBar{numTasks, true};
Expand All @@ -114,8 +124,7 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
{
while (auto buf = reader.read()) {
osmium::apply(
buf, *locationHandler,
_relationHandler,
buf, *locationHandler, _relationHandler,
mp_manager.handler([&](osmium::memory::Buffer&& buffer) {
osmium::apply(buffer, *this);
}),
Expand Down
7 changes: 1 addition & 6 deletions src/ttl/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,7 @@ osm2rdf::ttl::Writer<T>::Writer(const osm2rdf::config::Config& config,
osm2rdf::ttl::constants::LITERAL__YES = generateLiteral("yes", "");

// Prepare statistic variables
#if defined(_OPENMP)
_numOuts = std::max(std::thread::hardware_concurrency(),
static_cast<unsigned int>(omp_get_max_threads()) + 1);
#else
_numOuts = std::thread::hardware_concurrency() + 1;
#endif
_numOuts = config.numThreads + 1;
_blankNodeCount = new uint64_t[_numOuts];
_headerLines = new uint64_t[_numOuts];
_lineCount = new uint64_t[_numOuts];
Expand Down
32 changes: 13 additions & 19 deletions src/util/Output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,13 @@

#include "osm2rdf/util/Time.h"

#if defined(_OPENMP)
#include "omp.h"
#endif
#include "osm2rdf/config/Config.h"
#include "osm2rdf/util/Output.h"

// ____________________________________________________________________________
osm2rdf::util::Output::Output(const osm2rdf::config::Config& config,
const std::string& prefix)
: Output(config, prefix,
#if defined(_OPENMP)
std::max(std::thread::hardware_concurrency(),
static_cast<unsigned int>(omp_get_max_threads()) + 1)
#else
std::thread::hardware_concurrency() + 1
#endif
) {
: Output(config, prefix, config.numThreads + 1) {
}

// ____________________________________________________________________________
Expand Down Expand Up @@ -113,7 +103,7 @@ void osm2rdf::util::Output::close() {
}

if (_toStdOut) {
return;
// nothing to do
} else if (_config.outputCompress) {
#pragma omp parallel for
for (size_t i = 0; i < _partCount; ++i) {
Expand Down Expand Up @@ -209,12 +199,7 @@ void osm2rdf::util::Output::concatenate() {
// ____________________________________________________________________________
void osm2rdf::util::Output::writeNewLine(size_t part) {
write('\n', part);

if (_toStdOut) {
_outBuffers[part][_outBufPos[part]] = 0;
std::cout << _outBuffers[part];
_outBufPos[part] = 0;
}
if (_toStdOut) flush(part);
}

// ____________________________________________________________________________
Expand Down Expand Up @@ -302,6 +287,14 @@ void osm2rdf::util::Output::write(const char c, size_t t) {
}
}

if (_outBufPos[t] + 2 >= BUFFER_S) {
std::cerr << osm2rdf::util::currentTimeFormatted()
<< "Buffer size: " << BUFFER_S << std::endl;
std::cerr << osm2rdf::util::currentTimeFormatted()
<< "Buffer pos: " << _outBufPos[t] << std::endl;
throw std::runtime_error("Write buffer too small to write 1 byte");
}

*(_outBuffers[t] + _outBufPos[t]) = c;
_outBufPos[t] += 1;
}
Expand All @@ -316,7 +309,8 @@ void osm2rdf::util::Output::flush() {
// ____________________________________________________________________________
void osm2rdf::util::Output::flush(size_t i) {
if (_toStdOut) {
// on output to stdout, we only flush on newlines
_outBuffers[i][_outBufPos[i]] = '\0';
std::cout << reinterpret_cast<const char*>(_outBuffers[i]);
} else if (_config.outputCompress) {
int err = 0;
BZ2_bzWrite(&err, _files[i], _outBuffers[i], _outBufPos[i]);
Expand Down
18 changes: 18 additions & 0 deletions tests/E2E.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ TEST(E2E, singleNode) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;

Expand Down Expand Up @@ -119,6 +121,8 @@ TEST(E2E, singleNodeWithTags) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;

Expand Down Expand Up @@ -235,6 +239,8 @@ TEST(E2E, singleWayWithTagsAndNodes) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;

Expand Down Expand Up @@ -338,6 +344,8 @@ TEST(E2E, osmWikiExample) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;

Expand Down Expand Up @@ -432,6 +440,8 @@ TEST(E2E, building51NT) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.addAreaWayLinestrings = true;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;
Expand Down Expand Up @@ -599,6 +609,8 @@ TEST(E2E, building51TTL) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.addAreaWayLinestrings = true;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;
Expand Down Expand Up @@ -729,6 +741,8 @@ TEST(E2E, building51QLEVER) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.addAreaWayLinestrings = true;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;
Expand Down Expand Up @@ -859,6 +873,8 @@ TEST(E2E, tf) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.addAreaWayLinestrings = true;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;
Expand Down Expand Up @@ -950,6 +966,8 @@ TEST(E2E, building51inTF) {

osm2rdf::config::Config config;
config.output = "";
config.numThreads = 1; // set to one to avoid concurrency issues with the
// stringstream read buffer
config.outputCompress = false;
config.addAreaWayLinestrings = true;
config.mergeOutput = osm2rdf::util::OutputMergeMode::NONE;
Expand Down

0 comments on commit e4c7cab

Please sign in to comment.