diff --git a/examples/subscribe.rs b/examples/subscribe.rs index 6b9e0a36..f89466f0 100644 --- a/examples/subscribe.rs +++ b/examples/subscribe.rs @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box> { channel_groups: None, options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]), }); - subscription.subscribe(None); + subscription.subscribe(); let subscription_clone = subscription.clone_empty(); // Attach connection status to the PubNub client instance. @@ -130,7 +130,9 @@ async fn main() -> Result<(), Box> { // drop(subscription_clone); println!( - "\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." ); + "\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \ + `subscription.subscribe_with_timetoken(Some()) call." + ); // Clean up before complete work with PubNub client instance. pubnub.unsubscribe_all(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; diff --git a/examples/subscribe_with_presence_state.rs b/examples/subscribe_with_presence_state.rs index 10e85ce1..423ef7ac 100644 --- a/examples/subscribe_with_presence_state.rs +++ b/examples/subscribe_with_presence_state.rs @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box> { channel_groups: None, options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]), }); - subscription.subscribe(None); + subscription.subscribe(); let subscription_clone = subscription.clone_empty(); // Attach connection status to the PubNub client instance. @@ -145,7 +145,9 @@ async fn main() -> Result<(), Box> { // drop(subscription_clone); println!( - "\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." ); + "\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \ + `subscription.subscribe_with_timetoken(Some()) call." + ); // Clean up before complete work with PubNub client instance. pubnub.unsubscribe_all(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; diff --git a/src/dx/presence/event_engine/effects/heartbeat.rs b/src/dx/presence/event_engine/effects/heartbeat.rs index e987b76d..ecd14cb2 100644 --- a/src/dx/presence/event_engine/effects/heartbeat.rs +++ b/src/dx/presence/event_engine/effects/heartbeat.rs @@ -46,7 +46,14 @@ pub(super) async fn execute( effect_id, }) .map_ok_or_else( - |error| vec![PresenceEvent::HeartbeatFailure { reason: error }], + |error| { + log::error!("Handshake error: {:?}", error); + + // Cancel is possible and no retries should be done. + (!matches!(error, PubNubError::EffectCanceled)) + .then(|| vec![PresenceEvent::HeartbeatFailure { reason: error }]) + .unwrap_or(vec![]) + }, |_| vec![PresenceEvent::HeartbeatSuccess], ) .await @@ -183,6 +190,37 @@ mod it_should { )); } + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mocked_heartbeat_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &PresenceInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + 5, + Some(PubNubError::Transport { + details: "test".into(), + response: Some(Box::new(TransportResponse { + status: 500, + ..Default::default() + })), + }), + "id", + &RequestRetryConfiguration::Linear { + max_retry: 5, + delay: 2, + excluded_endpoints: None, + }, + &mocked_heartbeat_function, + ) + .await; + + assert!(result.is_empty()); + } + #[tokio::test] async fn return_heartbeat_give_up_event_on_error_with_none_auto_retry_policy() { let mocked_heartbeat_function: Arc = Arc::new(move |_| { diff --git a/src/dx/subscribe/event_engine/effects/handshake.rs b/src/dx/subscribe/event_engine/effects/handshake.rs index cc0a3315..c0fab74b 100644 --- a/src/dx/subscribe/event_engine/effects/handshake.rs +++ b/src/dx/subscribe/event_engine/effects/handshake.rs @@ -1,6 +1,7 @@ use futures::TryFutureExt; use log::info; +use crate::core::PubNubError; use crate::subscribe::SubscriptionCursor; use crate::{ dx::subscribe::event_engine::{ @@ -36,7 +37,11 @@ pub(super) async fn execute( .map_ok_or_else( |error| { log::error!("Handshake error: {:?}", error); - vec![SubscribeEvent::HandshakeFailure { reason: error }] + + // Cancel is possible and no retries should be done. + (!matches!(error, PubNubError::EffectCanceled)) + .then(|| vec![SubscribeEvent::HandshakeFailure { reason: error }]) + .unwrap_or(vec![]) }, |subscribe_result| { let cursor = { @@ -126,4 +131,23 @@ mod should { SubscribeEvent::HandshakeFailure { .. } )); } + + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_handshake_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &None, + "id", + &mock_handshake_function, + ) + .await; + + assert!(result.is_empty()); + } } diff --git a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs index 8a89ba9f..17651655 100644 --- a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs @@ -21,7 +21,9 @@ pub(super) async fn execute( retry_policy: &RequestRetryConfiguration, executor: &Arc, ) -> Vec { - if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) { + if !matches!(reason, PubNubError::EffectCanceled) + && !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) + { return vec![SubscribeEvent::HandshakeReconnectGiveUp { reason }]; } @@ -176,6 +178,38 @@ mod should { )); } + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_handshake_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &None, + 1, + PubNubError::Transport { + details: "test".into(), + response: Some(Box::new(TransportResponse { + status: 500, + ..Default::default() + })), + }, + "id", + &RequestRetryConfiguration::Linear { + delay: 0, + max_retry: 1, + excluded_endpoints: None, + }, + &mock_handshake_function, + ) + .await; + + assert!(result.is_empty()); + } + #[tokio::test] async fn return_handshake_reconnect_give_up_event_on_err_with_none_auto_retry_policy() { let mock_handshake_function: Arc = Arc::new(move |_| { diff --git a/src/dx/subscribe/event_engine/effects/receive.rs b/src/dx/subscribe/event_engine/effects/receive.rs index c3dc2402..fdd4cd42 100644 --- a/src/dx/subscribe/event_engine/effects/receive.rs +++ b/src/dx/subscribe/event_engine/effects/receive.rs @@ -128,4 +128,23 @@ mod should { SubscribeEvent::ReceiveFailure { .. } )); } + + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_receive_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &Default::default(), + "id", + &mock_receive_function, + ) + .await; + + assert!(result.is_empty()); + } } diff --git a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs index ca7efa7d..5e0d8e36 100644 --- a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs @@ -23,7 +23,9 @@ pub(crate) async fn execute( retry_policy: &RequestRetryConfiguration, executor: &Arc, ) -> Vec { - if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) { + if !matches!(reason, PubNubError::EffectCanceled) + && !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) + { return vec![SubscribeEvent::ReceiveReconnectGiveUp { reason }]; } @@ -225,6 +227,38 @@ mod should { )); } + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_receive_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &Default::default(), + 10, + PubNubError::Transport { + details: "test".into(), + response: Some(Box::new(TransportResponse { + status: 500, + ..Default::default() + })), + }, + "id", + &RequestRetryConfiguration::Linear { + max_retry: 20, + delay: 0, + excluded_endpoints: None, + }, + &mock_receive_function, + ) + .await; + + assert!(result.is_empty()); + } + #[tokio::test] async fn return_receive_reconnect_give_up_event_on_err_with_none_auto_retry_policy() { let mock_receive_function: Arc = Arc::new(move |_| { diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs index 2644d803..27c4c8e0 100644 --- a/src/dx/subscribe/mod.rs +++ b/src/dx/subscribe/mod.rs @@ -915,7 +915,7 @@ mod should { channel_groups: Some(&["group_a"]), options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]), }); - subscription.subscribe(None); + subscription.subscribe(); let status = client.status_stream().next().await.unwrap(); let _ = subscription.messages_stream().next().await.unwrap(); diff --git a/src/dx/subscribe/subscription.rs b/src/dx/subscribe/subscription.rs index 6eb2704e..9bca92ff 100644 --- a/src/dx/subscribe/subscription.rs +++ b/src/dx/subscribe/subscription.rs @@ -425,6 +425,29 @@ where *self.is_subscribed.read() } + /// Register `Subscription` within `SubscriptionManager`. + /// + /// # Arguments + /// + /// - `cursor` - Subscription real-time events catch up cursor. + fn register_with_cursor(&self, cursor: Option) { + let Some(client) = self.client.upgrade().clone() else { + return; + }; + + { + if let Some(manager) = client.subscription_manager(true).write().as_mut() { + // Mark entities as "in use" by subscription. + self.entity.increase_subscriptions_count(); + + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.register(&handler, cursor); + } + } + } + } + /// Filters the given list of `Update` events based on the subscription /// input and the current timetoken. /// @@ -482,36 +505,44 @@ where T: Transport + Send + Sync + 'static, D: Deserializer + Send + Sync + 'static, { - fn subscribe(&self, cursor: Option) { + fn subscribe(&self) { let mut is_subscribed = self.is_subscribed.write(); if *is_subscribed { return; } *is_subscribed = true; - if cursor.is_some() { - let mut cursor_slot = self.cursor.write(); - if let Some(current_cursor) = cursor_slot.as_ref() { - let catchup_cursor = cursor.clone().unwrap_or_default(); - catchup_cursor - .gt(current_cursor) - .then(|| *cursor_slot = Some(catchup_cursor)); - } else { - *cursor_slot = cursor.clone(); - } + self.register_with_cursor(self.cursor.read().clone()); + } + + fn subscribe_with_timetoken(&self, cursor: SC) + where + SC: Into, + { + let mut is_subscribed = self.is_subscribed.write(); + if *is_subscribed { + return; } + *is_subscribed = true; - if let Some(client) = self.client().upgrade().clone() { - if let Some(manager) = client.subscription_manager(true).write().as_mut() { - // Mark entities as "in use" by subscription. - self.entity.increase_subscriptions_count(); + let user_cursor = cursor.into(); + let cursor = user_cursor.is_valid().then_some(user_cursor); - if let Some((_, handler)) = self.clones.read().iter().next() { - let handler: Weak + Send + Sync> = handler.clone(); - manager.register(&handler, cursor); + { + if cursor.is_some() { + let mut cursor_slot = self.cursor.write(); + if let Some(current_cursor) = cursor_slot.as_ref() { + let catchup_cursor = cursor.clone().unwrap_or_default(); + catchup_cursor + .gt(current_cursor) + .then(|| *cursor_slot = Some(catchup_cursor)); + } else { + *cursor_slot = cursor.clone(); } } } + + self.register_with_cursor(cursor); } fn unsubscribe(&self) { diff --git a/src/dx/subscribe/subscription_set.rs b/src/dx/subscribe/subscription_set.rs index 5c2a917f..9f8347f1 100644 --- a/src/dx/subscribe/subscription_set.rs +++ b/src/dx/subscribe/subscription_set.rs @@ -692,6 +692,32 @@ where *self.is_subscribed.read() } + /// Register `Subscription` within `SubscriptionManager`. + /// + /// # Arguments + /// + /// - `cursor` - Subscription real-time events catch up cursor. + fn register_with_cursor(&self, cursor: Option) { + let Some(client) = self.client().upgrade().clone() else { + return; + }; + + { + let manager = client.subscription_manager(true); + if let Some(manager) = manager.write().as_mut() { + // Mark entities as "in-use" by subscription. + self.subscriptions.read().iter().for_each(|subscription| { + subscription.entity.increase_subscriptions_count(); + }); + + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.register(&handler, cursor); + } + }; + } + } + /// Filters the given list of `Update` events based on the subscription /// input and the current timetoken. /// @@ -749,43 +775,44 @@ where T: Transport + Send + Sync + 'static, D: Deserializer + Send + Sync + 'static, { - fn subscribe(&self, cursor: Option) { + fn subscribe(&self) { let mut is_subscribed = self.is_subscribed.write(); if *is_subscribed { return; } *is_subscribed = true; - if cursor.is_some() { - let mut cursor_slot = self.cursor.write(); - if let Some(current_cursor) = cursor_slot.as_ref() { - let catchup_cursor = cursor.clone().unwrap_or_default(); - catchup_cursor - .gt(current_cursor) - .then(|| *cursor_slot = Some(catchup_cursor)); - } else { - *cursor_slot = cursor.clone(); - } - } + self.register_with_cursor(self.cursor.read().clone()) + } - let Some(client) = self.client().upgrade().clone() else { + fn subscribe_with_timetoken(&self, cursor: SC) + where + SC: Into, + { + let mut is_subscribed = self.is_subscribed.write(); + if *is_subscribed { return; - }; + } + *is_subscribed = true; - { - let manager = client.subscription_manager(true); - if let Some(manager) = manager.write().as_mut() { - // Mark entities as "in-use" by subscription. - self.subscriptions.read().iter().for_each(|subscription| { - subscription.entity.increase_subscriptions_count(); - }); + let user_cursor = cursor.into(); + let cursor = user_cursor.is_valid().then_some(user_cursor); - if let Some((_, handler)) = self.clones.read().iter().next() { - let handler: Weak + Send + Sync> = handler.clone(); - manager.register(&handler, cursor); + { + if cursor.is_some() { + let mut cursor_slot = self.cursor.write(); + if let Some(current_cursor) = cursor_slot.as_ref() { + let catchup_cursor = cursor.clone().unwrap_or_default(); + catchup_cursor + .gt(current_cursor) + .then(|| *cursor_slot = Some(catchup_cursor)); + } else { + *cursor_slot = cursor.clone(); } - }; + } } + + self.register_with_cursor(cursor); } fn unsubscribe(&self) { diff --git a/src/dx/subscribe/traits/event_subscribe.rs b/src/dx/subscribe/traits/event_subscribe.rs index 9778fee4..2f01b7a1 100644 --- a/src/dx/subscribe/traits/event_subscribe.rs +++ b/src/dx/subscribe/traits/event_subscribe.rs @@ -11,8 +11,21 @@ use crate::subscribe::SubscriptionCursor; /// Types that implement this trait can change activity of real-time events /// processing for specific or set of entities. pub trait EventSubscriber { - /// Use receiver to subscribe for real-time updates. - fn subscribe(&self, cursor: Option); + /// Use the receiver to subscribe for real-time updates. + fn subscribe(&self); + + /// Use the receiver to subscribe for real-time updates starting at a + /// specific time. + /// + /// # Arguments + /// + /// - `cursor` - `SubscriptionCursor` from which, the client should try to + /// catch up on real-time events. `cursor` also can be provided as `usize` + /// or `String` with a 17-digit PubNub timetoken. If `cursor` doesn't + /// satisfy the requirements, it will be ignored. + fn subscribe_with_timetoken(&self, cursor: SC) + where + SC: Into; /// Use receiver to stop receiving real-time updates. fn unsubscribe(&self); diff --git a/src/dx/subscribe/types.rs b/src/dx/subscribe/types.rs index 6dfb20b4..12709f37 100644 --- a/src/dx/subscribe/types.rs +++ b/src/dx/subscribe/types.rs @@ -128,52 +128,6 @@ pub struct SubscriptionCursor { pub region: u32, } -impl PartialOrd for SubscriptionCursor { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } - - fn lt(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs < rhs - } - - fn le(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs <= rhs - } - - fn gt(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs > rhs - } - - fn ge(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs >= rhs - } -} - -impl Ord for SubscriptionCursor { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap_or(Ordering::Equal) - } -} - -impl Debug for SubscriptionCursor { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - write!( - f, - "SubscriptionCursor {{ timetoken: {}, region: {} }}", - self.timetoken, self.region - ) - } -} - /// Subscription statuses. #[derive(Clone, PartialEq)] pub enum ConnectionStatus { @@ -602,6 +556,20 @@ pub enum MessageActionEvent { Delete, } +impl SubscriptionCursor { + /// Checks if the `timetoken` is valid. + /// + /// A valid `timetoken` should have a length of 17 and contain only numeric + /// characters. + /// + /// # Returns + /// + /// Returns `true` if the `timetoken` is valid, otherwise `false`. + pub(crate) fn is_valid(&self) -> bool { + self.timetoken.len() == 17 && self.timetoken.chars().all(char::is_numeric) + } +} + impl Default for SubscriptionCursor { fn default() -> Self { Self { @@ -611,10 +579,89 @@ impl Default for SubscriptionCursor { } } +impl PartialOrd for SubscriptionCursor { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + + fn lt(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs < rhs + } + + fn le(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs <= rhs + } + + fn gt(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs > rhs + } + + fn ge(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs >= rhs + } +} + +impl Ord for SubscriptionCursor { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) + } +} + +impl Debug for SubscriptionCursor { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + write!( + f, + "SubscriptionCursor {{ timetoken: {}, region: {} }}", + self.timetoken, self.region + ) + } +} + impl From for SubscriptionCursor { fn from(value: String) -> Self { - Self { - timetoken: value, + let mut timetoken = value; + if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) { + timetoken = "-1".into(); + } + + SubscriptionCursor { + timetoken, + ..Default::default() + } + } +} + +impl From for SubscriptionCursor { + fn from(value: usize) -> Self { + let mut timetoken = value.to_string(); + if timetoken.len() != 17 { + timetoken = "-1".into(); + } + + SubscriptionCursor { + timetoken, + ..Default::default() + } + } +} + +impl From for SubscriptionCursor { + fn from(value: u64) -> Self { + let mut timetoken = value.to_string(); + if timetoken.len() != 17 { + timetoken = "-1".into(); + } + + SubscriptionCursor { + timetoken, ..Default::default() } } diff --git a/tests/presence/presence_steps.rs b/tests/presence/presence_steps.rs index 51a6fa41..ad8e7404 100644 --- a/tests/presence/presence_steps.rs +++ b/tests/presence/presence_steps.rs @@ -138,7 +138,7 @@ async fn join( subscriptions.values().cloned().collect(), options.clone(), ); - subscription.subscribe(None); + subscription.subscribe(); world.subscription = Some(subscription); world.subscriptions = Some(subscriptions); } diff --git a/tests/subscribe/subscribe_steps.rs b/tests/subscribe/subscribe_steps.rs index 529c39eb..b5d05aca 100644 --- a/tests/subscribe/subscribe_steps.rs +++ b/tests/subscribe/subscribe_steps.rs @@ -4,7 +4,7 @@ use cucumber::gherkin::Table; use cucumber::{codegen::Regex, gherkin::Step, then, when}; use futures::{select_biased, FutureExt, StreamExt}; use pubnub::core::RequestRetryConfiguration; -use pubnub::subscribe::{EventEmitter, EventSubscriber, SubscriptionParams}; +use pubnub::subscribe::{EventEmitter, EventSubscriber, SubscriptionCursor, SubscriptionParams}; use std::fs::read_to_string; /// Extract list of events and invocations from log. @@ -122,7 +122,7 @@ async fn subscribe(world: &mut PubNubWorld) { channel_groups: None, options: None, }); - subscription.subscribe(None); + subscription.subscribe(); world.subscription = Some(subscription); }); } @@ -139,7 +139,11 @@ async fn subscribe_with_timetoken(world: &mut PubNubWorld, timetoken: u64) { channel_groups: None, options: None, }); - subscription.subscribe(Some(timetoken.to_string().into())); + + subscription.subscribe_with_timetoken(SubscriptionCursor { + timetoken: timetoken.to_string(), + ..Default::default() + }); world.subscription = Some(subscription); }); }