Skip to content

Commit

Permalink
refactor(subscribe): add subscription token validation
Browse files Browse the repository at this point in the history
Added a method to validate the provided subscription token to conform to PubNub time token
requirements with precision.

refactor(subscription): split `subscribe` method

Split the `subscribe` method into two: `subscribe` and `subscribe_with_timetoken`.
  • Loading branch information
parfeon committed Feb 5, 2024
1 parent 8fa66c8 commit a0ee380
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 106 deletions.
6 changes: 4 additions & 2 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
channel_groups: None,
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
});
subscription.subscribe(None);
subscription.subscribe();
let subscription_clone = subscription.clone_empty();

// Attach connection status to the PubNub client instance.
Expand Down Expand Up @@ -130,7 +130,9 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
// drop(subscription_clone);

println!(
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
"\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \
`subscription.subscribe_with_timetoken(Some(<timetoken>)) call."
);
// Clean up before complete work with PubNub client instance.
pubnub.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Expand Down
6 changes: 4 additions & 2 deletions examples/subscribe_with_presence_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
channel_groups: None,
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
});
subscription.subscribe(None);
subscription.subscribe();
let subscription_clone = subscription.clone_empty();

// Attach connection status to the PubNub client instance.
Expand Down Expand Up @@ -145,7 +145,9 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
// drop(subscription_clone);

println!(
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
"\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \
`subscription.subscribe_with_timetoken(Some(<timetoken>)) call."
);
// Clean up before complete work with PubNub client instance.
pubnub.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Expand Down
40 changes: 39 additions & 1 deletion src/dx/presence/event_engine/effects/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ pub(super) async fn execute(
effect_id,
})
.map_ok_or_else(
|error| vec![PresenceEvent::HeartbeatFailure { reason: error }],
|error| {
log::error!("Handshake error: {:?}", error);

// Cancel is possible and no retries should be done.
(!matches!(error, PubNubError::EffectCanceled))
.then(|| vec![PresenceEvent::HeartbeatFailure { reason: error }])
.unwrap_or(vec![])
},
|_| vec![PresenceEvent::HeartbeatSuccess],
)
.await
Expand Down Expand Up @@ -183,6 +190,37 @@ mod it_should {
));
}

#[tokio::test]
async fn return_empty_event_on_effect_cancel_err() {
let mocked_heartbeat_function: Arc<HeartbeatEffectExecutor> =
Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());

let result = execute(
&PresenceInput::new(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
),
5,
Some(PubNubError::Transport {
details: "test".into(),
response: Some(Box::new(TransportResponse {
status: 500,
..Default::default()
})),
}),
"id",
&RequestRetryConfiguration::Linear {
max_retry: 5,
delay: 2,
excluded_endpoints: None,
},
&mocked_heartbeat_function,
)
.await;

assert!(result.is_empty());
}

#[tokio::test]
async fn return_heartbeat_give_up_event_on_error_with_none_auto_retry_policy() {
let mocked_heartbeat_function: Arc<HeartbeatEffectExecutor> = Arc::new(move |_| {
Expand Down
26 changes: 25 additions & 1 deletion src/dx/subscribe/event_engine/effects/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use futures::TryFutureExt;
use log::info;

use crate::core::PubNubError;
use crate::subscribe::SubscriptionCursor;
use crate::{
dx::subscribe::event_engine::{
Expand Down Expand Up @@ -36,7 +37,11 @@ pub(super) async fn execute(
.map_ok_or_else(
|error| {
log::error!("Handshake error: {:?}", error);
vec![SubscribeEvent::HandshakeFailure { reason: error }]

// Cancel is possible and no retries should be done.
(!matches!(error, PubNubError::EffectCanceled))
.then(|| vec![SubscribeEvent::HandshakeFailure { reason: error }])
.unwrap_or(vec![])
},
|subscribe_result| {
let cursor = {
Expand Down Expand Up @@ -126,4 +131,23 @@ mod should {
SubscribeEvent::HandshakeFailure { .. }
));
}

#[tokio::test]
async fn return_empty_event_on_effect_cancel_err() {
let mock_handshake_function: Arc<SubscribeEffectExecutor> =
Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());

let result = execute(
&SubscriptionInput::new(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
),
&None,
"id",
&mock_handshake_function,
)
.await;

assert!(result.is_empty());
}
}
36 changes: 35 additions & 1 deletion src/dx/subscribe/event_engine/effects/handshake_reconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ pub(super) async fn execute(
retry_policy: &RequestRetryConfiguration,
executor: &Arc<SubscribeEffectExecutor>,
) -> Vec<SubscribeEvent> {
if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) {
if !matches!(reason, PubNubError::EffectCanceled)
&& !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason))
{
return vec![SubscribeEvent::HandshakeReconnectGiveUp { reason }];
}

Expand Down Expand Up @@ -176,6 +178,38 @@ mod should {
));
}

#[tokio::test]
async fn return_empty_event_on_effect_cancel_err() {
let mock_handshake_function: Arc<SubscribeEffectExecutor> =
Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());

let result = execute(
&SubscriptionInput::new(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
),
&None,
1,
PubNubError::Transport {
details: "test".into(),
response: Some(Box::new(TransportResponse {
status: 500,
..Default::default()
})),
},
"id",
&RequestRetryConfiguration::Linear {
delay: 0,
max_retry: 1,
excluded_endpoints: None,
},
&mock_handshake_function,
)
.await;

assert!(result.is_empty());
}

#[tokio::test]
async fn return_handshake_reconnect_give_up_event_on_err_with_none_auto_retry_policy() {
let mock_handshake_function: Arc<SubscribeEffectExecutor> = Arc::new(move |_| {
Expand Down
19 changes: 19 additions & 0 deletions src/dx/subscribe/event_engine/effects/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,23 @@ mod should {
SubscribeEvent::ReceiveFailure { .. }
));
}

#[tokio::test]
async fn return_empty_event_on_effect_cancel_err() {
let mock_receive_function: Arc<SubscribeEffectExecutor> =
Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());

let result = execute(
&SubscriptionInput::new(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
),
&Default::default(),
"id",
&mock_receive_function,
)
.await;

assert!(result.is_empty());
}
}
36 changes: 35 additions & 1 deletion src/dx/subscribe/event_engine/effects/receive_reconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ pub(crate) async fn execute(
retry_policy: &RequestRetryConfiguration,
executor: &Arc<SubscribeEffectExecutor>,
) -> Vec<SubscribeEvent> {
if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) {
if !matches!(reason, PubNubError::EffectCanceled)
&& !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason))
{
return vec![SubscribeEvent::ReceiveReconnectGiveUp { reason }];
}

Expand Down Expand Up @@ -225,6 +227,38 @@ mod should {
));
}

#[tokio::test]
async fn return_empty_event_on_effect_cancel_err() {
let mock_receive_function: Arc<SubscribeEffectExecutor> =
Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());

let result = execute(
&SubscriptionInput::new(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
),
&Default::default(),
10,
PubNubError::Transport {
details: "test".into(),
response: Some(Box::new(TransportResponse {
status: 500,
..Default::default()
})),
},
"id",
&RequestRetryConfiguration::Linear {
max_retry: 20,
delay: 0,
excluded_endpoints: None,
},
&mock_receive_function,
)
.await;

assert!(result.is_empty());
}

#[tokio::test]
async fn return_receive_reconnect_give_up_event_on_err_with_none_auto_retry_policy() {
let mock_receive_function: Arc<SubscribeEffectExecutor> = Arc::new(move |_| {
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ mod should {
channel_groups: Some(&["group_a"]),
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
});
subscription.subscribe(None);
subscription.subscribe();

let status = client.status_stream().next().await.unwrap();
let _ = subscription.messages_stream().next().await.unwrap();
Expand Down
67 changes: 49 additions & 18 deletions src/dx/subscribe/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,29 @@ where
*self.is_subscribed.read()
}

/// Register `Subscription` within `SubscriptionManager`.
///
/// # Arguments
///
/// - `cursor` - Subscription real-time events catch up cursor.
fn register_with_cursor(&self, cursor: Option<SubscriptionCursor>) {
let Some(client) = self.client.upgrade().clone() else {
return;
};

{
if let Some(manager) = client.subscription_manager(true).write().as_mut() {
// Mark entities as "in use" by subscription.
self.entity.increase_subscriptions_count();

if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.register(&handler, cursor);
}
}
}
}

/// Filters the given list of `Update` events based on the subscription
/// input and the current timetoken.
///
Expand Down Expand Up @@ -482,36 +505,44 @@ where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
fn subscribe(&self, cursor: Option<SubscriptionCursor>) {
fn subscribe(&self) {
let mut is_subscribed = self.is_subscribed.write();
if *is_subscribed {
return;
}
*is_subscribed = true;

if cursor.is_some() {
let mut cursor_slot = self.cursor.write();
if let Some(current_cursor) = cursor_slot.as_ref() {
let catchup_cursor = cursor.clone().unwrap_or_default();
catchup_cursor
.gt(current_cursor)
.then(|| *cursor_slot = Some(catchup_cursor));
} else {
*cursor_slot = cursor.clone();
}
self.register_with_cursor(self.cursor.read().clone());
}

fn subscribe_with_timetoken<SC>(&self, cursor: SC)
where
SC: Into<SubscriptionCursor>,
{
let mut is_subscribed = self.is_subscribed.write();
if *is_subscribed {
return;
}
*is_subscribed = true;

if let Some(client) = self.client().upgrade().clone() {
if let Some(manager) = client.subscription_manager(true).write().as_mut() {
// Mark entities as "in use" by subscription.
self.entity.increase_subscriptions_count();
let user_cursor = cursor.into();
let cursor = user_cursor.is_valid().then_some(user_cursor);

if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.register(&handler, cursor);
{
if cursor.is_some() {
let mut cursor_slot = self.cursor.write();
if let Some(current_cursor) = cursor_slot.as_ref() {
let catchup_cursor = cursor.clone().unwrap_or_default();
catchup_cursor
.gt(current_cursor)
.then(|| *cursor_slot = Some(catchup_cursor));
} else {
*cursor_slot = cursor.clone();
}
}
}

self.register_with_cursor(cursor);
}

fn unsubscribe(&self) {
Expand Down
Loading

0 comments on commit a0ee380

Please sign in to comment.