Skip to content

Commit

Permalink
logging errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pragmaxim committed Jul 1, 2024
1 parent 0dae943 commit 9433759
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ futures = { version = "0.3", features = ["async-await"] }
tokio = { version = "1.38.0", features = ["full"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
pin-project-lite = "0.2"
chrono = "0.4.38"

[dev-dependencies]
tokio = { version = "1.38.0", features = ["full"] }
Expand Down
6 changes: 4 additions & 2 deletions benches/broadcast_sink_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ impl MultiplyX {
}

impl Consumer<u64> for MultiplyX {
fn consume(&mut self, _: &u64) {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
let mut x = self.state.x.write().unwrap();
*x *= 5;
println!("Consumer 1 processed item");
Ok(())
}
}

Expand All @@ -39,10 +40,11 @@ impl MultiplyY {
}

impl Consumer<u64> for MultiplyY {
fn consume(&mut self, _: &u64) {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
let mut y = self.state.y.write().unwrap();
*y *= 10;
println!("Consumer 2 processed item");
Ok(())
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ extern crate doc_comment;
#[cfg(test)]
doctest!("../README.md");

mod logger;

use core::future::Future;
use core::marker::PhantomPinned;
use core::pin::Pin;
Expand All @@ -19,7 +21,7 @@ use tokio::task;
use tokio_stream::wrappers::BroadcastStream;

pub trait Consumer<T>: Send + Sync {
fn consume(&mut self, item: &T);
fn consume(&mut self, item: &T) -> Result<(), &'static str>;
}

pin_project! {
Expand Down Expand Up @@ -60,7 +62,9 @@ where
let mut stream = BroadcastStream::new(rx);
while let Some(Ok(item)) = stream.next().await {
let mut consumer = consumer.lock().await;
consumer.consume(&item);
if let Err(e) = consumer.consume(&item) {
error!("BroadcastSink consumer error occurred: {:?}", e);
}
barrier_clone.wait().await;
active_count_clone.fetch_sub(1, Ordering::SeqCst);
}
Expand Down Expand Up @@ -144,10 +148,11 @@ mod tests {
}

impl Consumer<u64> for MultiplyX {
fn consume(&mut self, _: &u64) {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
let mut x = self.state.x.write().unwrap();
*x *= 5;
println!("Consumer X processed item");
Ok(())
}
}

Expand All @@ -162,10 +167,11 @@ mod tests {
}

impl Consumer<u64> for MultiplyY {
fn consume(&mut self, _: &u64) {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
let mut y = self.state.y.write().unwrap();
*y *= 10;
println!("Consumer Y processed item");
Ok(())
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use chrono::Local;
use std::fmt;

pub fn error(args: fmt::Arguments) {
let now = Local::now();
println!("[{}] ERROR {}", now.format("%Y-%m-%d %H:%M:%S"), args);
}

#[macro_export]
macro_rules! error {
($($arg:tt)*) => {
$crate::logger::error(format_args!($($arg)*))
};
}

0 comments on commit 9433759

Please sign in to comment.