diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..ac2a13a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,83 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'hyperswarm'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=hyperswarm" + ], + "filter": { + "name": "hyperswarm", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug example 'simple'", + "cargo": { + "args": [ + "build", + "--example=simple", + "--package=hyperswarm" + ], + "filter": { + "name": "simple", + "kind": "example" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in example 'simple'", + "cargo": { + "args": [ + "test", + "--no-run", + "--example=simple", + "--package=hyperswarm" + ], + "filter": { + "name": "simple", + "kind": "example" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug integration test 'test'", + "cargo": { + "args": [ + "test", + "--no-run", + "--test=test", + "--package=hyperswarm" + ], + "filter": { + "name": "test", + "kind": "test" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 274d1a3..4f09d9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ multicast-socket = "0.2.1" hex = "0.4.3" pretty-hash = "0.4.1" hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" } -colmeia-hyperswarm-mdns = { git = "https://github.com/bltavares/colmeia.git", rev = "e92ab71981356197a21592b7ce6854e209582985" } +colmeia-hyperswarm-mdns = { git = "https://github.com/bltavares/colmeia.git", rev = "53761799f7a9ee123875534e0108d7483a117885" } libutp-rs = { git = "https://github.com/Frando/libutp-rs.git", branch = "feat/clone", optional = true } [dev-dependencies] diff --git a/examples/lan.rs b/examples/lan.rs new file mode 100644 index 0000000..51b5a83 --- /dev/null +++ b/examples/lan.rs @@ -0,0 +1,80 @@ +use async_std::prelude::*; +use async_std::stream::StreamExt; +use async_std::task; +// use std::net::{SocketAddr, ToSocketAddrs}; + +use hyperswarm::{ + discovery::mdns::MdnsDiscovery, DhtOptions, Hyperswarm, HyperswarmStream, TopicConfig, +}; + +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let transport = Hyperswarm::listen().await?; + let discovery = MdnsDiscovery::bind(transport.local_addr().port()).await?; + let mut swarm1 = Hyperswarm::with(transport, discovery); + + let transport = Hyperswarm::listen().await?; + let discovery = MdnsDiscovery::bind(transport.local_addr().port()).await?; + let mut swarm2 = Hyperswarm::with(transport, discovery); + + let handle1 = swarm1.handle(); + let handle2 = swarm2.handle(); + + let task1 = task::spawn(async move { + while let Some(stream) = swarm1.next().await { + let stream = stream.unwrap(); + on_connection(stream, "rust1".into()); + } + }); + + let task2 = task::spawn(async move { + while let Some(stream) = swarm2.next().await { + let stream = stream.unwrap(); + on_connection(stream, "rust2".into()); + } + }); + + let topic = [0u8; 32]; + handle1.configure(topic, TopicConfig::both()); + handle2.configure(topic, TopicConfig::both()); + + task1.await; + task2.await; + + Ok(()) +} + +fn on_connection(mut stream: HyperswarmStream, local_name: String) { + let label = format!( + "[{} -> {}://{}]", + local_name, + stream.protocol(), + stream.peer_addr() + ); + eprintln!("{} connect", label); + task::spawn(async move { + stream + .write_all(format!("hi from {}", local_name).as_bytes()) + .await + .unwrap(); + let mut buf = vec![0u8; 100]; + loop { + match stream.read(&mut buf).await { + Ok(n) if n > 0 => { + let text = String::from_utf8(buf[..n].to_vec()).unwrap(); + eprintln!("{} read: {}", label, text); + } + Ok(_) => { + eprintln!("{} close", label); + break; + } + Err(e) => { + eprintln!("{} error: {}", label, e); + break; + } + } + } + }); +} diff --git a/examples/simple.rs b/examples/simple.rs index 80e91bc..50dac26 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -3,7 +3,7 @@ use async_std::stream::StreamExt; use async_std::task; // use std::net::{SocketAddr, ToSocketAddrs}; -use hyperswarm::{run_bootstrap_node, Config, Hyperswarm, HyperswarmStream, TopicConfig}; +use hyperswarm::{run_bootstrap_node, DhtOptions, Hyperswarm, HyperswarmStream, TopicConfig}; #[async_std::main] async fn main() -> Result<(), Box> { @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { let (bs_addr, bs_task) = run_bootstrap_node(Some(bs_addr)).await?; // let bs_addr: SocketAddr = bs_addr.to_socket_addrs().unwrap().next().unwrap(); - let config = Config::default().set_bootstrap_nodes(Some(vec![bs_addr])); + let config = DhtOptions::default().set_bootstrap_nodes(Some(vec![bs_addr])); let mut swarm1 = Hyperswarm::bind(config.clone()).await?; let mut swarm2 = Hyperswarm::bind(config).await?; diff --git a/src/config.rs b/src/config.rs index d1c1892..d021fe1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,12 @@ use std::net::SocketAddr; #[derive(Debug, Default, Clone)] -pub struct Config { +pub struct DhtOptions { pub bootstrap: Option>, pub ephemeral: bool, } -impl Config { +impl DhtOptions { pub fn set_bootstrap_nodes(mut self, nodes: Option>) -> Self { self.bootstrap = nodes; self diff --git a/src/discovery/combined.rs b/src/discovery/combined.rs index 7309d84..1951d3c 100644 --- a/src/discovery/combined.rs +++ b/src/discovery/combined.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use super::dht::DhtDiscovery; use super::mdns::MdnsDiscovery; use super::{Discovery, PeerInfo, Topic}; -use crate::config::Config; +use crate::config::DhtOptions; #[derive(Debug)] pub struct CombinedDiscovery { @@ -16,8 +16,8 @@ pub struct CombinedDiscovery { } impl CombinedDiscovery { - pub async fn bind(local_port: u16, config: Config) -> io::Result { - let mdns = MdnsDiscovery::bind(local_port, config.clone()).await?; + pub async fn bind(local_port: u16, config: DhtOptions) -> io::Result { + let mdns = MdnsDiscovery::bind(local_port).await?; let dht = DhtDiscovery::bind(local_port, config).await?; Ok(Self { mdns, dht }) } diff --git a/src/discovery/dht.rs b/src/discovery/dht.rs index f533ce7..e989e8f 100644 --- a/src/discovery/dht.rs +++ b/src/discovery/dht.rs @@ -8,7 +8,7 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::config::Config; +use crate::DhtOptions; use super::{Discovery, DiscoveryMethod, PeerInfo, Topic}; @@ -37,7 +37,7 @@ enum Command { } impl DhtDiscovery { - pub async fn bind(local_port: u16, config: Config) -> io::Result { + pub async fn bind(local_port: u16, config: DhtOptions) -> io::Result { let dht_config = DhtConfig::default(); let dht_config = if let Some(bootstrap) = config.bootstrap.as_ref() { dht_config.set_bootstrap_nodes(bootstrap) diff --git a/src/discovery/mdns.rs b/src/discovery/mdns.rs index adcb4db..d76a696 100644 --- a/src/discovery/mdns.rs +++ b/src/discovery/mdns.rs @@ -2,6 +2,7 @@ use async_std::channel; use async_std::stream::Stream; use async_std::task::{Context, Poll}; use colmeia_hyperswarm_mdns::{self_id, Announcer, Locator}; +use futures::FutureExt; use futures_lite::ready; // use log::*; use std::convert::TryInto; @@ -11,8 +12,6 @@ use std::io; use std::pin::Pin; use std::time::Duration; -use crate::Config; - use super::{Discovery, DiscoveryMethod, PeerInfo, Topic}; mod socket { @@ -52,7 +51,7 @@ impl fmt::Debug for MdnsDiscovery { } impl MdnsDiscovery { - pub async fn bind(local_port: u16, _config: Config) -> io::Result { + pub async fn bind(local_port: u16) -> io::Result { let self_id = self_id(); let socket = socket::create()?; let lookup_interval = Duration::from_secs(60); @@ -108,29 +107,26 @@ impl Stream for MdnsDiscovery { return Poll::Ready(Some(Err(e))); } - if let Poll::Ready(Some(_command)) = Pin::new(&mut this.pending_commands_rx).poll_next(cx) { - // TODO: Boxing the add_topic future does not work because there's no valid - // lifetime. Best would be to make the add_topic functions sync, or return - // a future that can be boxed. - // let fut = match command { - // Command::Lookup(topic) => { - // let fut = this.locator.add_topic(&topic); - // let fut = fut.map(|r| { - // r.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) - // }); - // let fut: CommandFut = fut.boxed(); - // fut - // } - // Command::Announce(topic) => { - // let fut = this.announcer.add_topic(&topic); - // let fut = fut.map(|r| { - // r.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) - // }); - // let fut: CommandFut = fut.boxed(); - // fut - // } - // }; - // this.pending_future = Some(fut); + if let Poll::Ready(Some(command)) = Pin::new(&mut this.pending_commands_rx).poll_next(cx) { + let fut = match command { + Command::Lookup(topic) => { + let fut = this.locator.add_topic(topic.to_vec()); + let fut = fut.map(|r| { + r.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) + }); + let fut: CommandFut = fut.boxed(); + fut + } + Command::Announce(topic) => { + let fut = this.announcer.add_topic(topic.to_vec()); + let fut = fut.map(|r| { + r.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) + }); + let fut: CommandFut = fut.boxed(); + fut + } + }; + this.pending_future = Some(fut); } if let Err(e) = ready!(this.poll_pending_future(cx)) { diff --git a/src/lib.rs b/src/lib.rs index 4ed3ca8..f316d3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ pub mod discovery; pub mod transport; pub use bootstrap::run_bootstrap_node; -pub use config::{Config, TopicConfig}; +pub use config::{DhtOptions, TopicConfig}; pub use swarm::Hyperswarm; use transport::combined::CombinedStream; diff --git a/src/swarm.rs b/src/swarm.rs index 1bf6358..ee5ad6b 100644 --- a/src/swarm.rs +++ b/src/swarm.rs @@ -1,4 +1,5 @@ use async_std::channel; +use futures::{AsyncRead, AsyncWrite, StreamExt}; use futures_lite::Stream; use log::*; use std::collections::HashMap; @@ -7,7 +8,7 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::config::{Config, TopicConfig}; +use crate::{config::{DhtOptions, TopicConfig}, transport::DynamicConnection}; use crate::discovery::Topic; use crate::discovery::{combined::CombinedDiscovery, Discovery}; use crate::transport::{ @@ -19,8 +20,15 @@ type ConfigureCommand = (Topic, TopicConfig); pub struct Hyperswarm { topics: HashMap, - discovery: CombinedDiscovery, - transport: CombinedTransport, + discovery: Box, + // Error: Transport requires specifing Connection + transport: Box< + dyn Transport< + Connection = Box, + Item = io::Result> + > + Send + + Unpin, + >, command_tx: channel::Sender, command_rx: channel::Receiver, } @@ -28,30 +36,40 @@ impl fmt::Debug for Hyperswarm { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Hyperswarm") .field("topics", &self.topics) - .field("discovery", &self.discovery) - .field("transport", &self.transport) .finish() } } impl Hyperswarm { - pub async fn bind(config: Config) -> io::Result { + pub async fn listen() -> io::Result { let local_addr = "localhost:0"; + CombinedTransport::bind(local_addr).await + } - let transport = CombinedTransport::bind(local_addr).await?; - let local_addr = transport.local_addr(); - let port = local_addr.port(); - let discovery = CombinedDiscovery::bind(port, config).await?; - + pub fn with( + transport: impl Transport + Send + Unpin, + discovery: impl Discovery + Send + Unpin, + ) -> Self { let (command_tx, command_rx) = channel::unbounded::(); - Ok(Self { + Self { topics: HashMap::new(), - discovery, - transport, + discovery: Box::new(discovery), + transport: Box::new(transport), command_tx, command_rx, - }) + } + } + + pub async fn bind(config: DhtOptions) -> io::Result { + let local_addr = "localhost:0"; + + let transport = CombinedTransport::bind(local_addr).await?; + let local_addr = transport.local_addr(); + let port = local_addr.port(); + let discovery = CombinedDiscovery::bind(port, config).await?; + + Ok(Self::with(transport, discovery)) } pub fn configure(&mut self, topic: Topic, config: TopicConfig) { @@ -86,7 +104,7 @@ impl SwarmHandle { } impl Stream for Hyperswarm { - type Item = io::Result>; + type Item = io::Result>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -104,7 +122,7 @@ impl Stream for Hyperswarm { } // Poll discovery results. - let discovery = Pin::new(&mut this.discovery).poll_next(cx); + let discovery = Pin::new(&mut this.discovery).poll_next_unpin(cx); match discovery { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), @@ -119,7 +137,7 @@ impl Stream for Hyperswarm { #[cfg(test)] mod test { - use super::{Config, Hyperswarm, TopicConfig}; + use super::{DhtOptions, Hyperswarm, TopicConfig}; use crate::run_bootstrap_node; use async_std::channel; use async_std::task; @@ -132,7 +150,7 @@ mod test { env_logger::init(); let (bs_addr, _bs_task) = run_bootstrap_node::(None).await?; // eprintln!("bootstrap node on {}", bs_addr); - let config = Config::default().set_bootstrap_nodes(Some(vec![bs_addr])); + let config = DhtOptions::default().set_bootstrap_nodes(Some(vec![bs_addr])); let mut swarm_a = Hyperswarm::bind(config.clone()).await?; // eprintln!("A {:?}", swarm_a); let mut swarm_b = Hyperswarm::bind(config).await?; diff --git a/src/transport/combined.rs b/src/transport/combined.rs index 2b398aa..75f236f 100644 --- a/src/transport/combined.rs +++ b/src/transport/combined.rs @@ -113,6 +113,7 @@ impl CombinedTransport { } impl Transport for CombinedTransport { + // ???? type Connection = CombinedStream; fn connect(&mut self, peer_addr: SocketAddr) { self.tcp.connect(peer_addr); diff --git a/src/transport/mod.rs b/src/transport/mod.rs index b39a03c..95a3565 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -12,10 +12,12 @@ pub mod tcp; #[cfg(feature = "transport_utp")] pub mod utp; +pub trait DynamicConnection: AsyncRead + AsyncWrite + Send + std::fmt::Debug {} + pub trait Transport: Stream::Connection>>> { - type Connection: AsyncRead + AsyncWrite + Send + std::fmt::Debug; + type Connection: DynamicConnection; fn connect(&mut self, peer_addr: SocketAddr); // fn poll_next( // self: Pin<&mut Self>,