Skip to content

Commit

Permalink
[feat] Add a channel filter on the WS integration. Progress for #322
Browse files Browse the repository at this point in the history
  • Loading branch information
jbtrystram committed Oct 10, 2022
1 parent b64d5d1 commit c929ad6
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions docs/modules/user-guide/pages/integration-ws.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
= WebSocket integration

The WebSocket integration allows consuming device events using a simple WebSockets based protocol.Events are
encoded as CloudEvents.
The WebSocket integration allows consuming device events using a simple WebSockets based protocol.
Events are encoded as CloudEvents.

The application you want to consume must be passed as a path parameter.
As an example, here is the url to connect to if you want to stream events for the application `example-app` :

[source]
----
wss://ws-integration.sandbox.drogue.cloud/example-app
----

== Authentication

Expand All @@ -19,10 +27,20 @@ a| `username` and `api_key` | Provide username and API key

|===

== Channel filter

You can set an additional path segment after the application ID to select a specific channel you want to consume the events from.

For example, if you want to consume events only from the `sensor` channel you would connect to :
[source]
----
wss://ws-integration.sandbox.drogue.cloud/example-app/sensor
----

== Consumer Group ID

You can set the consumer group ID by providing a query parameter named `group_id`. This will set the Kafka consumer
group ID to the provided value, which allows to share the load between different consumers.
group ID to the provided value, allowing to share the load between different consumers.

If no consumer group id is provided, a temporary one will be provided.

Expand Down
1 change: 1 addition & 0 deletions websocket-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ actix = "0.13.0"
actix-web-actors = "4"
anyhow = "1"
chrono = "0.4"
cloudevents-sdk = { version = "0.5"}
dotenv = "0.15"
drogue-client = "0.10"
env_logger = "0.9"
Expand Down
4 changes: 4 additions & 0 deletions websocket-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
token: user_auth.clone(),
enable_access_token,
})
.service(
web::resource("/{channel}")
.route(web::get().to(route::start_connection_with_channel_filter)),
)
.service(web::resource("").route(web::get().to(route::start_connection))),
);
})
Expand Down
3 changes: 2 additions & 1 deletion websocket-integration/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use actix::prelude::{Message, Recipient};
use cloudevents::Event;
use drogue_client::integration::ws::v1::client;
use drogue_cloud_integration_common::stream::EventStream;
use drogue_cloud_service_common::error::ServiceError;
Expand All @@ -7,7 +8,7 @@ use uuid::Uuid;
// Service sends the kafka events in this message to WSHandler
#[derive(Message)]
#[rtype(result = "()")]
pub struct WsEvent(pub String);
pub struct WsEvent(pub Event);

// WsHandler sends this to service to subscribe to the stream
#[derive(Message)]
Expand Down
66 changes: 63 additions & 3 deletions websocket-integration/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use actix_web::{
use actix_web_actors::ws;
use drogue_cloud_service_api::webapp as actix_web;
use drogue_cloud_service_common::actix_auth::authentication::AuthenticatedUntil;
use drogue_cloud_service_common::client::UserAuthClient;
use drogue_cloud_service_common::openid::Authenticator;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
Expand All @@ -23,7 +25,6 @@ pub async fn start_connection(
auth_expiration: Option<web::ReqData<AuthenticatedUntil>>,
) -> Result<HttpResponse, Error> {
let application = application.into_inner();
let auth_expiration = auth_expiration.map(|e| e.into_inner().0);

let authenticator = req.app_data().cloned();
let user_auth = req.app_data().cloned();
Expand All @@ -34,15 +35,74 @@ pub async fn start_connection(
user_auth.is_some()
);

start_websocket(
req,
stream,
application,
None,
service_addr,
group_id.group_id,
auth_expiration,
authenticator,
user_auth,
)
}

pub async fn start_connection_with_channel_filter(
req: HttpRequest,
stream: Payload,
params: web::Path<(String, String)>,
service_addr: web::Data<Addr<Service>>,
web::Query(group_id): web::Query<GroupId>,
auth_expiration: Option<web::ReqData<AuthenticatedUntil>>,
) -> Result<HttpResponse, Error> {
let (application, channel) = params.into_inner();

let authenticator = req.app_data().cloned();
let user_auth = req.app_data().cloned();

log::debug!(
"Auth state - authenticator: {}, userAuth: {}",
authenticator.is_some(),
user_auth.is_some()
);

start_websocket(
req,
stream,
application,
Some(channel),
service_addr,
group_id.group_id,
auth_expiration,
authenticator,
user_auth,
)
}

fn start_websocket(
req: HttpRequest,
stream: Payload,
application: String,
channel: Option<String>,
service_addr: web::Data<Addr<Service>>,
group_id: Option<String>,
auth_expiration: Option<web::ReqData<AuthenticatedUntil>>,
authenticator: Option<Authenticator>,
user_auth: Option<UserAuthClient>,
) -> Result<HttpResponse, Error> {
let auth_expiration = auth_expiration.map(|e| e.into_inner().0);

// launch web socket actor
let ws = WsHandler::new(
application,
group_id.group_id,
group_id,
channel,
service_addr.get_ref().clone(),
auth_expiration,
authenticator,
user_auth,
);

Ok(ws::start(ws, &req, stream)?)
ws::start(ws, &req, stream)
}
4 changes: 1 addition & 3 deletions websocket-integration/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,8 @@ impl Service {
while let Some(event) = stream.next().await {
log::debug!("Topic: {} - Event: {:?}", application, event);

// Convert the event to a JSON string
let event = serde_json::to_string(&event?)?;
// Send the event as an Actor message
recipient.send(WsEvent(event.to_string())).await?;
recipient.send(WsEvent(event?)).await?;

log::debug!("Sent message - go back to sleep");
}
Expand Down
17 changes: 16 additions & 1 deletion websocket-integration/src/wshandler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use actix::{
};
use actix_web_actors::ws::{self, CloseReason};
use chrono::{DateTime, TimeZone, Utc};
use cloudevents::AttributesReader;
use drogue_client::integration::ws::v1::client;
use drogue_cloud_service_api::{
auth::user::{
Expand Down Expand Up @@ -89,6 +90,8 @@ pub struct WsHandler {
application: String,
/// the optional consumer group
group_id: Option<String>,
/// the optional channel filter
channel: Option<String>,
/// to exit the actor if the client was disconnected
heartbeat: Instant,
service_addr: Addr<Service>,
Expand All @@ -103,6 +106,7 @@ impl WsHandler {
pub fn new(
application: String,
group_id: Option<String>,
channel: Option<String>,
service_addr: Addr<Service>,
auth_expiration: Option<DateTime<Utc>>,
authenticator: Option<Authenticator>,
Expand All @@ -122,6 +126,7 @@ impl WsHandler {
WsHandler {
application,
group_id,
channel,
heartbeat: Instant::now(),
service_addr,
id: Uuid::new_v4(),
Expand Down Expand Up @@ -280,7 +285,17 @@ impl Handler<WsEvent> for WsHandler {
type Result = ();

fn handle(&mut self, msg: WsEvent, ctx: &mut Self::Context) {
ctx.text(msg.0);
if let Some(channel) = &self.channel {
if msg.0.subject() != Some(channel.as_str()) {
return;
}
}

// Convert the event to a JSON string
match serde_json::to_string(&msg.0) {
Ok(evt) => ctx.text(evt),
Err(e) => log::warn!("Could not deserialize event : {e}"),
}
}
}

Expand Down

0 comments on commit c929ad6

Please sign in to comment.