From 9433759eaff67eaaa51677685022faf841e7a5c6 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Mon, 1 Jul 2024 15:11:37 +0200 Subject: [PATCH] logging errors --- Cargo.toml | 1 + benches/broadcast_sink_benchmark.rs | 6 ++++-- src/lib.rs | 14 ++++++++++---- src/logger.rs | 14 ++++++++++++++ 4 files changed, 29 insertions(+), 6 deletions(-) create mode 100644 src/logger.rs diff --git a/Cargo.toml b/Cargo.toml index b8fef54..6f017ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ futures = { version = "0.3", features = ["async-await"] } tokio = { version = "1.38.0", features = ["full"] } tokio-stream = { version = "0.1.15", features = ["sync"] } pin-project-lite = "0.2" +chrono = "0.4.38" [dev-dependencies] tokio = { version = "1.38.0", features = ["full"] } diff --git a/benches/broadcast_sink_benchmark.rs b/benches/broadcast_sink_benchmark.rs index 7e4bf81..3b4e189 100644 --- a/benches/broadcast_sink_benchmark.rs +++ b/benches/broadcast_sink_benchmark.rs @@ -21,10 +21,11 @@ impl MultiplyX { } impl Consumer for MultiplyX { - fn consume(&mut self, _: &u64) { + fn consume(&mut self, _: &u64) -> Result<(), &'static str> { let mut x = self.state.x.write().unwrap(); *x *= 5; println!("Consumer 1 processed item"); + Ok(()) } } @@ -39,10 +40,11 @@ impl MultiplyY { } impl Consumer for MultiplyY { - fn consume(&mut self, _: &u64) { + fn consume(&mut self, _: &u64) -> Result<(), &'static str> { let mut y = self.state.y.write().unwrap(); *y *= 10; println!("Consumer 2 processed item"); + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 3c10030..e743031 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ extern crate doc_comment; #[cfg(test)] doctest!("../README.md"); +mod logger; + use core::future::Future; use core::marker::PhantomPinned; use core::pin::Pin; @@ -19,7 +21,7 @@ use tokio::task; use tokio_stream::wrappers::BroadcastStream; pub trait Consumer: Send + Sync { - fn consume(&mut self, item: &T); + fn consume(&mut self, item: &T) -> Result<(), &'static str>; } pin_project! { @@ -60,7 +62,9 @@ where let mut stream = BroadcastStream::new(rx); while let Some(Ok(item)) = stream.next().await { let mut consumer = consumer.lock().await; - consumer.consume(&item); + if let Err(e) = consumer.consume(&item) { + error!("BroadcastSink consumer error occurred: {:?}", e); + } barrier_clone.wait().await; active_count_clone.fetch_sub(1, Ordering::SeqCst); } @@ -144,10 +148,11 @@ mod tests { } impl Consumer for MultiplyX { - fn consume(&mut self, _: &u64) { + fn consume(&mut self, _: &u64) -> Result<(), &'static str> { let mut x = self.state.x.write().unwrap(); *x *= 5; println!("Consumer X processed item"); + Ok(()) } } @@ -162,10 +167,11 @@ mod tests { } impl Consumer for MultiplyY { - fn consume(&mut self, _: &u64) { + fn consume(&mut self, _: &u64) -> Result<(), &'static str> { let mut y = self.state.y.write().unwrap(); *y *= 10; println!("Consumer Y processed item"); + Ok(()) } } diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..20b8c99 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,14 @@ +use chrono::Local; +use std::fmt; + +pub fn error(args: fmt::Arguments) { + let now = Local::now(); + println!("[{}] ERROR {}", now.format("%Y-%m-%d %H:%M:%S"), args); +} + +#[macro_export] +macro_rules! error { + ($($arg:tt)*) => { + $crate::logger::error(format_args!($($arg)*)) + }; +}