Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch: toggle_written_columns for multiple files #602

Merged
merged 7 commits into from
Oct 11, 2023
Merged
11 changes: 7 additions & 4 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ impl EventLogger {
std::fs::create_dir_all(&event_dir).unwrap();
self.events.spawn(async move {
let mut stream = event.stream().await.unwrap();
let mut toggle_written_columns = false;
let mut files: BTreeMap<String, tokio::fs::File> = BTreeMap::new();
let mut columns_written: BTreeMap<String, bool> = BTreeMap::new();
while let Some(Ok(log)) = stream.next().await {
let serialized = serde_json::to_string(&log).unwrap();
let deserialized: BTreeMap<String, Value> =
Expand All @@ -96,6 +96,7 @@ impl EventLogger {
let file_name = event_dir.join(format!("{}.csv", key));
let file_key = file_name.to_str().unwrap();
let file_value = files.get(file_key);
let toggle_written_columns = columns_written.get(file_key).unwrap_or(&false);
if file_value.is_none() {
files.insert(
file_key.into(),
Expand All @@ -108,9 +109,10 @@ impl EventLogger {
.unwrap(),
);
}
let file: &mut tokio::fs::File = files.get_mut(file_key).unwrap();

if toggle_written_columns {
let file = files.get_mut(file_key).unwrap();

if toggle_written_columns == &true {
let values = value
.as_object()
.unwrap()
Expand All @@ -121,7 +123,7 @@ impl EventLogger {
file.write_all(values.as_bytes()).await.unwrap();
file.write_all("\n".as_bytes()).await.unwrap();
} else {
toggle_written_columns = true;
columns_written.entry(file_key.into()).or_insert(true);
let columns = value
.as_object()
.unwrap()
Expand All @@ -139,6 +141,7 @@ impl EventLogger {
.collect::<Vec<String>>()
.join(",");
file.write_all(values.as_bytes()).await.unwrap();
file.write_all("\n".as_bytes()).await.unwrap();
}
continue;
}
Expand Down