Skip to content

Commit

Permalink
Additional logging
Browse files Browse the repository at this point in the history
  • Loading branch information
vGsteiger committed Oct 24, 2023
1 parent aa1020b commit 07a6ab9
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 22 deletions.
8 changes: 4 additions & 4 deletions modyn/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The storage abstraction is designed to be flexible and allow for different stora

The following filesystem wrappers are currently implemented:

- `local`: Accesses the local filesystem
- `LocalFilesystemWrapper`: Accesses the local filesystem

Future filesystem wrappers may include:

Expand All @@ -41,9 +41,9 @@ The class is defined in `modyn/storage/include/internal/filesystem_wrapper/files

The following file wrappers are currently implemented:

- `single_sample`: Each file contains a single sample
- `binary`: Each file contains columns and row in a binary format
- `csv`: Each file contains columns and rows in a csv format
- `SingleSampleFileWrapper`: Each file contains a single sample
- `BinaryFileWrapper`: Each file contains columns and row in a binary format
- `CsvFileWrapper`: Each file contains columns and rows in a csv format

Future file wrappers may include:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class FileWatcher {
if (!filesystem_wrapper->exists(dataset_path) || !filesystem_wrapper->is_directory(dataset_path)) {
SPDLOG_ERROR("Dataset path {} does not exist or is not a directory.", dataset_path);
*stop_file_watcher = true;
SPDLOG_INFO("Stopping file watcher for dataset {}.", dataset_id_);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ bool StorageDatabaseConnection::add_dataset(
SPDLOG_ERROR("Error adding dataset: {}", e.what());
return false;
}
SPDLOG_INFO("Added dataset {} to database", name);
break;
case DatabaseDriver::SQLITE3:
session << "INSERT INTO datasets (name, base_path, filesystem_wrapper_type, "
Expand Down
31 changes: 21 additions & 10 deletions modyn/storage/src/internal/file_watcher/file_watcher_watchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ void FileWatcherWatchdog::start_file_watcher_thread(int64_t dataset_id, int16_t
std::unique_ptr<FileWatcher> file_watcher =
std::make_unique<FileWatcher>(config_, dataset_id, &file_watcher_thread_stop_flags_[dataset_id],
config_["storage"]["insertion_threads"].as<int16_t>());
SPDLOG_INFO("FileWatcher thread for dataset {} created", dataset_id);
if (file_watcher == nullptr || file_watcher_thread_stop_flags_[dataset_id]->load()) {
SPDLOG_ERROR("Failed to create FileWatcher for dataset {}", dataset_id);
file_watcher_dataset_retries_[dataset_id] = retries + 1;
return;
}
std::thread th(&FileWatcher::run, std::move(file_watcher));
file_watcher_threads_[dataset_id] = std::move(th);
file_watcher_dataset_retries_[dataset_id] = retries;
Expand All @@ -47,13 +53,25 @@ void FileWatcherWatchdog::stop_file_watcher_thread(int64_t dataset_id) {
file_watcher_threads_[dataset_id].join();
}
auto file_watcher_thread_it = file_watcher_threads_.find(dataset_id);
file_watcher_threads_.erase(file_watcher_thread_it);
if (file_watcher_thread_it == file_watcher_threads_.end()) {
SPDLOG_ERROR("FileWatcher thread for dataset {} not found", dataset_id);
} else {
file_watcher_threads_.erase(file_watcher_thread_it);
}

auto file_watcher_dataset_retries_it = file_watcher_dataset_retries_.find(dataset_id);
file_watcher_dataset_retries_.erase(file_watcher_dataset_retries_it);
if (file_watcher_dataset_retries_it == file_watcher_dataset_retries_.end()) {
SPDLOG_ERROR("FileWatcher thread retries for dataset {} not found", dataset_id);
} else {
file_watcher_dataset_retries_.erase(file_watcher_dataset_retries_it);
}

auto file_watcher_thread_stop_flags_it = file_watcher_thread_stop_flags_.find(dataset_id);
file_watcher_thread_stop_flags_.erase(file_watcher_thread_stop_flags_it);
if (file_watcher_thread_stop_flags_it == file_watcher_thread_stop_flags_.end()) {
SPDLOG_ERROR("FileWatcher thread stop flag for dataset {} not found", dataset_id);
} else {
file_watcher_thread_stop_flags_.erase(file_watcher_thread_stop_flags_it);
}
} else {
SPDLOG_ERROR("FileWatcher thread for dataset {} not found", dataset_id);
}
Expand All @@ -63,15 +81,11 @@ void FileWatcherWatchdog::stop_file_watcher_thread(int64_t dataset_id) {
* Watch the FileWatcher threads and start/stop them as needed
*/
void FileWatcherWatchdog::watch_file_watcher_threads() {
SPDLOG_INFO("Watching FileWatcher threads");
soci::session session = storage_database_connection_.get_session();

int64_t number_of_datasets = 0;
session << "SELECT COUNT(dataset_id) FROM datasets", soci::into(number_of_datasets);

SPDLOG_INFO("Number of FileWatcher threads registered: {}", file_watcher_threads_.size());
SPDLOG_INFO("Number of datasets in database: {}", number_of_datasets);

if (number_of_datasets == 0) {
if (file_watcher_threads_.empty()) {
// There are no FileWatcher threads running, nothing to do
Expand Down Expand Up @@ -119,8 +133,6 @@ void FileWatcherWatchdog::watch_file_watcher_threads() {
}

void FileWatcherWatchdog::run() {
SPDLOG_INFO("FileWatchdog started.");

while (true) {
if (stop_file_watcher_watchdog_->load()) {
break;
Expand All @@ -140,7 +152,6 @@ void FileWatcherWatchdog::run() {
}

std::vector<int64_t> FileWatcherWatchdog::get_running_file_watcher_threads() {
SPDLOG_INFO("Getting running FileWatcher threads");
std::vector<int64_t> running_file_watcher_threads = {};
for (const auto& pair : file_watcher_threads_) {
if (pair.second.joinable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,12 @@ TEST_F(FileWatcherWatchdogTest, TestRemovingDataset) {
// The watchdog should start a FileWatcher process for the new dataset
std::this_thread::sleep_for(std::chrono::seconds(2));

SPDLOG_INFO("Sleeping done.");

// Now remove the dataset from the database
connection.delete_dataset("test_dataset");

SPDLOG_INFO("Deleted dataset from database.");

// The watchdog should stop the FileWatcher process for the removed dataset
watchdog.watch_file_watcher_threads();

SPDLOG_INFO("Watched file watcher threads.");

const std::vector<int64_t> file_watcher_threads = watchdog.get_running_file_watcher_threads();

ASSERT_TRUE(file_watcher_threads.empty());
Expand Down

0 comments on commit 07a6ab9

Please sign in to comment.