Skip to content

Commit

Permalink
feat: seaprate blocking methods (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
heilhead authored Mar 11, 2024
1 parent f7ff25c commit 7664418
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 68 deletions.
34 changes: 16 additions & 18 deletions relay_client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,13 @@ impl Client {
/// when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::Subscribe> {
self.request(rpc::Subscribe { topic, block: true }).await
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::SubscribeBlocking> {
self.request(rpc::SubscribeBlocking { topic }).await
}

/// Unsubscribes from a topic.
pub async fn unsubscribe(
&self,
topic: Topic,
subscription_id: SubscriptionId,
) -> Response<rpc::Unsubscribe> {
self.request(rpc::Unsubscribe {
topic,
subscription_id,
})
.await
pub async fn unsubscribe(&self, topic: Topic) -> Response<rpc::Unsubscribe> {
self.request(rpc::Unsubscribe { topic }).await
}

/// Fetch mailbox messages for a specific topic.
Expand Down Expand Up @@ -265,12 +257,18 @@ impl Client {
pub async fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> Response<rpc::BatchSubscribe> {
self.request(rpc::BatchSubscribe {
topics: topics.into(),
block: true,
})
.await
) -> Result<
Vec<Result<SubscriptionId, Error<rpc::SubscriptionError>>>,
Error<rpc::SubscriptionError>,
> {
Ok(self
.request(rpc::BatchSubscribeBlocking {
topics: topics.into(),
})
.await?
.into_iter()
.map(crate::convert_subscription_result)
.collect())
}

/// Unsubscribes from multiple topics.
Expand Down
13 changes: 12 additions & 1 deletion relay_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use {
::http::HeaderMap,
relay_rpc::{
auth::{SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS},
domain::{MessageId, ProjectId},
domain::{MessageId, ProjectId, SubscriptionId},
rpc::{SubscriptionError, SubscriptionResult},
user_agent::UserAgent,
},
serde::Serialize,
Expand Down Expand Up @@ -170,6 +171,16 @@ impl Default for MessageIdGenerator {
}
}

#[inline]
fn convert_subscription_result(
res: SubscriptionResult,
) -> Result<SubscriptionId, error::Error<SubscriptionError>> {
match res {
SubscriptionResult::Id(id) => Ok(id),
SubscriptionResult::Error(err) => Err(ClientError::from(err).into()),
}
}

#[cfg(test)]
mod tests {
use {
Expand Down
43 changes: 26 additions & 17 deletions relay_client/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
use {
self::connection::{connection_event_loop, ConnectionControl},
crate::{error::ClientError, ConnectionOptions},
crate::{
error::{ClientError, Error},
ConnectionOptions,
},
relay_rpc::{
domain::{MessageId, SubscriptionId, Topic},
rpc::{
BatchFetchMessages,
BatchReceiveMessages,
BatchSubscribe,
BatchSubscribeBlocking,
BatchUnsubscribe,
FetchMessages,
Publish,
Receipt,
Subscribe,
SubscribeBlocking,
Subscription,
SubscriptionError,
Unsubscribe,
},
},
std::{sync::Arc, time::Duration},
std::{future::Future, sync::Arc, time::Duration},
tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
Expand Down Expand Up @@ -182,24 +188,17 @@ impl Client {
/// when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<Subscribe> {
let (request, response) = create_request(Subscribe { topic, block: true });
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<SubscribeBlocking> {
let (request, response) = create_request(SubscribeBlocking { topic });

self.request(request);

response
}

/// Unsubscribes from a topic.
pub fn unsubscribe(
&self,
topic: Topic,
subscription_id: SubscriptionId,
) -> EmptyResponseFuture<Unsubscribe> {
let (request, response) = create_request(Unsubscribe {
topic,
subscription_id,
});
pub fn unsubscribe(&self, topic: Topic) -> EmptyResponseFuture<Unsubscribe> {
let (request, response) = create_request(Unsubscribe { topic });

self.request(request);

Expand Down Expand Up @@ -240,15 +239,25 @@ impl Client {
pub fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
) -> impl Future<
Output = Result<
Vec<Result<SubscriptionId, Error<SubscriptionError>>>,
Error<SubscriptionError>,
>,
> {
let (request, response) = create_request(BatchSubscribeBlocking {
topics: topics.into(),
block: true,
});

self.request(request);

response
async move {
Ok(response
.await?
.into_iter()
.map(crate::convert_subscription_result)
.collect())
}
}

/// Unsubscribes from multiple topics.
Expand Down
100 changes: 86 additions & 14 deletions relay_rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl ErrorResponse {
}

/// Data structure representing error response params.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct ErrorData {
/// Error code.
pub code: i32,
Expand All @@ -215,7 +215,9 @@ pub enum SubscriptionError {
SubscriberLimitExceeded,
}

/// Data structure representing subscribe request params.
/// Subscription request parameters. This request does not require the
/// subscription to be fully processed, and returns as soon as the server
/// receives it.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Subscribe {
/// The topic to subscribe to.
Expand Down Expand Up @@ -244,15 +246,36 @@ impl ServiceRequest for Subscribe {
}
}

/// Subscription request parameters. This request awaits the subscription to be
/// fully processed and returns possible errors.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SubscribeBlocking {
/// The topic to subscribe to.
pub topic: Topic,
}

impl ServiceRequest for SubscribeBlocking {
type Error = SubscriptionError;
type Response = SubscriptionId;

fn validate(&self) -> Result<(), PayloadError> {
self.topic
.decode()
.map_err(|_| PayloadError::InvalidTopic)?;

Ok(())
}

fn into_params(self) -> Params {
Params::SubscribeBlocking(self)
}
}

/// Data structure representing unsubscribe request params.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Unsubscribe {
/// The topic to unsubscribe from.
pub topic: Topic,

/// The id of the subscription to unsubscribe from.
#[serde(rename = "id")]
pub subscription_id: SubscriptionId,
}

impl ServiceRequest for Unsubscribe {
Expand Down Expand Up @@ -317,7 +340,9 @@ pub struct FetchResponse {
pub has_more: bool,
}

/// Multi-topic subscription request parameters.
/// Multi-topic subscription request parameters. This request does not require
/// all subscriptions to be fully processed, and returns as soon as the server
/// receives it.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchSubscribe {
/// The topics to subscribe to.
Expand All @@ -329,12 +354,9 @@ pub struct BatchSubscribe {
pub block: bool,
}

impl ServiceRequest for BatchSubscribe {
type Error = SubscriptionError;
type Response = Vec<SubscriptionId>;

fn validate(&self) -> Result<(), PayloadError> {
let batch_size = self.topics.len();
impl BatchSubscribe {
fn validate_topics(topics: &[Topic]) -> Result<(), PayloadError> {
let batch_size = topics.len();

if batch_size == 0 {
return Err(PayloadError::BatchEmpty);
Expand All @@ -344,18 +366,55 @@ impl ServiceRequest for BatchSubscribe {
return Err(PayloadError::BatchLimitExceeded);
}

for topic in &self.topics {
for topic in topics {
topic.decode().map_err(|_| PayloadError::InvalidTopic)?;
}

Ok(())
}
}

impl ServiceRequest for BatchSubscribe {
type Error = SubscriptionError;
type Response = Vec<SubscriptionId>;

fn validate(&self) -> Result<(), PayloadError> {
Self::validate_topics(&self.topics)
}

fn into_params(self) -> Params {
Params::BatchSubscribe(self)
}
}

/// Multi-topic subscription request parameters. This request awaits all
/// subscriptions to be fully processed and returns possible errors per topic.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchSubscribeBlocking {
/// The topics to subscribe to.
pub topics: Vec<Topic>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SubscriptionResult {
Id(SubscriptionId),
Error(ErrorData),
}

impl ServiceRequest for BatchSubscribeBlocking {
type Error = SubscriptionError;
type Response = Vec<SubscriptionResult>;

fn validate(&self) -> Result<(), PayloadError> {
BatchSubscribe::validate_topics(&self.topics)
}

fn into_params(self) -> Params {
Params::BatchSubscribeBlocking(self)
}
}

/// Multi-topic unsubscription request parameters.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchUnsubscribe {
Expand Down Expand Up @@ -696,6 +755,10 @@ pub enum Params {
#[serde(rename = "irn_subscribe", alias = "iridium_subscribe")]
Subscribe(Subscribe),

/// Parameters to blocking subscribe.
#[serde(rename = "irn_subscribeBlocking", alias = "iridium_subscribeBlocking")]
SubscribeBlocking(SubscribeBlocking),

/// Parameters to unsubscribe.
#[serde(rename = "irn_unsubscribe", alias = "iridium_unsubscribe")]
Unsubscribe(Unsubscribe),
Expand All @@ -708,6 +771,13 @@ pub enum Params {
#[serde(rename = "irn_batchSubscribe", alias = "iridium_batchSubscribe")]
BatchSubscribe(BatchSubscribe),

/// Parameters to blocking batch subscribe.
#[serde(
rename = "irn_batchSubscribeBlocking",
alias = "iridium_batchSubscribeBlocking"
)]
BatchSubscribeBlocking(BatchSubscribeBlocking),

/// Parameters to batch unsubscribe.
#[serde(rename = "irn_batchUnsubscribe", alias = "iridium_batchUnsubscribe")]
BatchUnsubscribe(BatchUnsubscribe),
Expand Down Expand Up @@ -779,9 +849,11 @@ impl Request {

match &self.params {
Params::Subscribe(params) => params.validate(),
Params::SubscribeBlocking(params) => params.validate(),
Params::Unsubscribe(params) => params.validate(),
Params::FetchMessages(params) => params.validate(),
Params::BatchSubscribe(params) => params.validate(),
Params::BatchSubscribeBlocking(params) => params.validate(),
Params::BatchUnsubscribe(params) => params.validate(),
Params::BatchFetchMessages(params) => params.validate(),
Params::Publish(params) => params.validate(),
Expand Down
Loading

0 comments on commit 7664418

Please sign in to comment.