Skip to content

Commit

Permalink
feat: from url and from iterator for streameruri (#28)
Browse files Browse the repository at this point in the history
* feat: from url and from iterator for streameruri

* feat: streamer connect function accept impl Into<StreamerUri>  to be more generic over the parameter

* revert: cannot have impl due to Send

* feat: accept generic into streameruri in streamer connect function to be more generic

* test: specify streameruri type in parse
  • Loading branch information
carlocorradini authored Jun 2, 2024
1 parent a5bf5ba commit b824c71
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 26 deletions.
6 changes: 5 additions & 1 deletion sea-streamer-file/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ impl StreamerTrait for FileStreamer {

/// First check whether the file exists.
/// If not, depending on the options, either create it, or error.
async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> FileResult<Self> {
async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> FileResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
if uri.nodes().is_empty() {
return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode));
}
Expand Down
6 changes: 5 additions & 1 deletion sea-streamer-kafka/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ impl Streamer for KafkaStreamer {
type ConsumerOptions = KafkaConsumerOptions;
type ProducerOptions = KafkaProducerOptions;

async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> KafkaResult<Self> {
async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> KafkaResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
let admin = create_admin(&uri, &options).map_err(StreamErr::Backend)?;
let timeout = options.timeout().unwrap_or(Duration::from_secs(1));
spawn_blocking(move || admin.inner().fetch_cluster_id(timeout))
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-kafka/tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ async fn main() -> anyhow::Result<()> {
use sea_streamer_kafka::{AutoOffsetReset, KafkaConsumer, KafkaConsumerOptions, KafkaStreamer};
use sea_streamer_types::{
export::futures::StreamExt, Buffer, Consumer, ConsumerMode, ConsumerOptions, Message,
Producer, SeqPos, ShardId, StreamKey, Streamer, Timestamp,
Producer, SeqPos, ShardId, StreamKey, Streamer, StreamerUri, Timestamp,
};

let streamer = KafkaStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "localhost:9092".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
Default::default(),
)
Expand Down
6 changes: 5 additions & 1 deletion sea-streamer-redis/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ impl Streamer for RedisStreamer {
type ConsumerOptions = RedisConsumerOptions;
type ProducerOptions = RedisProducerOptions;

async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> RedisResult<Self> {
async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> RedisResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
if uri.protocol().is_none() {
return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
}
Expand Down
5 changes: 3 additions & 2 deletions sea-streamer-redis/tests/consumer-group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ async fn consumer_group() -> anyhow::Result<()> {
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
};
use sea_streamer_types::{
Consumer, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp,
Consumer, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri,
Timestamp,
};

const TEST: &str = "group-1";
Expand All @@ -29,7 +30,7 @@ async fn consumer_group() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down
9 changes: 5 additions & 4 deletions sea-streamer-redis/tests/load-balanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn load_balance() -> anyhow::Result<()> {
use sea_streamer_runtime::{sleep, spawn_task};
use sea_streamer_types::{
export::futures::stream::StreamExt, Buffer, Consumer, ConsumerMode, ConsumerOptions,
Message, Producer, StreamKey, Streamer, Timestamp,
Message, Producer, StreamKey, Streamer, StreamerUri, Timestamp,
};
use std::time::Duration;

Expand All @@ -39,7 +39,7 @@ async fn load_balance() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down Expand Up @@ -176,7 +176,8 @@ async fn failover() -> anyhow::Result<()> {
AutoCommit, AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
};
use sea_streamer_types::{
ConsumerGroup, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp,
ConsumerGroup, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri,
Timestamp,
};
use std::time::Duration;

Expand All @@ -192,7 +193,7 @@ async fn failover() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down
5 changes: 3 additions & 2 deletions sea-streamer-redis/tests/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ async fn main() -> anyhow::Result<()> {
};
use sea_streamer_runtime::sleep;
use sea_streamer_types::{
ConsumerMode, ConsumerOptions, Producer, ShardId, StreamKey, Streamer, Timestamp,
ConsumerMode, ConsumerOptions, Producer, ShardId, StreamKey, Streamer, StreamerUri,
Timestamp,
};
use std::time::Duration;

Expand All @@ -29,7 +30,7 @@ async fn main() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down
8 changes: 4 additions & 4 deletions sea-streamer-redis/tests/resumable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn immediate_and_delayed() -> anyhow::Result<()> {
use sea_streamer_runtime::timeout;
use sea_streamer_types::{
Consumer, ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, Producer, ShardId,
StreamKey, Streamer, Timestamp,
StreamKey, Streamer, StreamerUri, Timestamp,
};
use std::time::Duration;

Expand All @@ -37,7 +37,7 @@ async fn immediate_and_delayed() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down Expand Up @@ -126,7 +126,7 @@ async fn rolling_and_disabled() -> anyhow::Result<()> {
};
use sea_streamer_types::{
export::futures::StreamExt, Buffer, Consumer, ConsumerGroup, ConsumerId, ConsumerMode,
ConsumerOptions, Message, Producer, StreamKey, Streamer, Timestamp,
ConsumerOptions, Message, Producer, StreamKey, Streamer, StreamerUri, Timestamp,
};
use std::time::Duration;

Expand All @@ -147,7 +147,7 @@ async fn rolling_and_disabled() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down
5 changes: 3 additions & 2 deletions sea-streamer-redis/tests/seek-rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ async fn main() -> anyhow::Result<()> {
};
use sea_streamer_runtime::{sleep, timeout};
use sea_streamer_types::{
Consumer, ConsumerMode, ConsumerOptions, Producer, SeqPos, StreamKey, Streamer, Timestamp,
Consumer, ConsumerMode, ConsumerOptions, Producer, SeqPos, StreamKey, Streamer,
StreamerUri, Timestamp,
};
use std::time::Duration;

Expand All @@ -34,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-redis/tests/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async fn main() -> anyhow::Result<()> {
};
use sea_streamer_runtime::sleep;
use sea_streamer_types::{
ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp,
ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri, Timestamp,
};
use std::time::Duration;

Expand All @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse()
.parse::<StreamerUri>()
.unwrap(),
options,
)
Expand Down
6 changes: 5 additions & 1 deletion sea-streamer-socket/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ impl Streamer for SeaStreamer {
type ConsumerOptions = SeaConsumerOptions;
type ProducerOptions = SeaProducerOptions;

async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> SeaResult<Self> {
async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> SeaResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
let backend = match uri.protocol() {
Some(protocol) => match protocol {
#[cfg(feature = "backend-kafka")]
Expand Down
5 changes: 4 additions & 1 deletion sea-streamer-stdio/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ impl StreamerTrait for StdioStreamer {
type ProducerOptions = StdioProducerOptions;

/// Nothing will happen until you create a producer/consumer
async fn connect(_: StreamerUri, options: Self::ConnectOptions) -> StdioResult<Self> {
async fn connect<S>(_: S, options: Self::ConnectOptions) -> StdioResult<Self>
where
S: Into<StreamerUri> + Send,
{
let StdioConnectOptions { loopback } = options;
Ok(StdioStreamer { loopback })
}
Expand Down
39 changes: 36 additions & 3 deletions sea-streamer-types/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ pub trait Streamer: Sized {
type ProducerOptions: ProducerOptions;

/// Establish a connection to the streaming server.
fn connect(
streamer: StreamerUri,
fn connect<S>(
streamer: S,
options: Self::ConnectOptions,
) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send;
) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send
where
S: Into<StreamerUri> + Send;

/// Flush and disconnect from the streaming server.
fn disconnect(self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
Expand Down Expand Up @@ -98,6 +100,20 @@ impl Display for StreamerUri {
}
}

impl From<Url> for StreamerUri {
fn from(value: Url) -> Self {
Self { nodes: vec![value] }
}
}

impl FromIterator<Url> for StreamerUri {
fn from_iter<T: IntoIterator<Item = Url>>(iter: T) -> Self {
Self {
nodes: iter.into_iter().collect(),
}
}
}

impl StreamerUri {
pub fn zero() -> Self {
Self { nodes: Vec::new() }
Expand Down Expand Up @@ -347,6 +363,23 @@ mod test {
assert_eq!(uri.nodes(), &["file:///path/to/hi".parse().unwrap()]);
}

#[test]
fn test_into_streamer_uri() {
let url: Url = "proto://sea-ql.org:1234".parse().unwrap();
let uri: StreamerUri = url.clone().into();
assert!(uri.nodes.len() == 1);
assert_eq!(url, uri.nodes.first().unwrap().clone());

let urls: [Url; 3] = [
"proto://sea-ql.org:1".parse().unwrap(),
"proto://sea-ql.org:2".parse().unwrap(),
"proto://sea-ql.org:3".parse().unwrap(),
];
let uri: StreamerUri = StreamerUri::from_iter(urls.clone().into_iter());
assert!(uri.nodes.len() == 3);
assert!(uri.nodes.iter().eq(urls.iter()));
}

#[test]
fn test_parse_stream_url_err() {
use crate::StreamKeyErr;
Expand Down

0 comments on commit b824c71

Please sign in to comment.