Skip to content
This repository has been archived by the owner on Nov 15, 2019. It is now read-only.

Commit

Permalink
feat/active_conn: move peer to bootstrap cache front when it sends data
Browse files Browse the repository at this point in the history
When a packet is received from remote peer that was previously put in
bootstrap cache, make sure we let the cache know that the peer is
active.
  • Loading branch information
povilasb authored and Douglas Caetano dos Santos committed Mar 15, 2019
1 parent 7e7e3c1 commit 7dbeab1
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 7 deletions.
187 changes: 180 additions & 7 deletions src/main/active_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::common::{CoreTimer, CrustUser, Message, State};
use crate::common::{CoreTimer, CrustUser, Message, PeerInfo, State};
use crate::main::{ConnectionId, CrustData, Event, EventLoopCore};
use crate::PeerId;
use mio::{Poll, Ready, Token};
Expand Down Expand Up @@ -38,6 +38,7 @@ pub struct ActiveConnection {
their_role: CrustUser,
event_tx: crate::CrustEventSender,
heartbeat: Heartbeat,
peer_info: PeerInfo,
}

impl ActiveConnection {
Expand All @@ -58,6 +59,14 @@ impl ActiveConnection {
their_id
);

let their_addr = if let Ok(addr) = socket.peer_addr() {
addr
} else {
debug!("Failed to get active connection socket address.");
return;
};
let peer_info = PeerInfo::new(their_addr, their_id.pub_enc_key);

let heartbeat = Heartbeat::new(core, token);
let state = Rc::new(RefCell::new(ActiveConnection {
token,
Expand All @@ -67,6 +76,7 @@ impl ActiveConnection {
their_role,
event_tx,
heartbeat,
peer_info,
}));
let _ = core.insert_state(token, state.clone());

Expand All @@ -89,28 +99,32 @@ impl ActiveConnection {
}

fn read(&mut self, core: &mut EventLoopCore, poll: &Poll) {
let mut msg_received = false;
loop {
match self.socket.read::<Message>() {
Ok(Some(Message::Data(data))) => {
let _ =
self.event_tx
.send(Event::NewMessage(self.their_id, self.their_role, data));
self.reset_receive_heartbeat(core, poll);
}
Ok(Some(Message::Heartbeat)) => {
self.reset_receive_heartbeat(core, poll);
msg_received = true;
}
Ok(Some(Message::Heartbeat)) => msg_received = true,
Ok(Some(message)) => {
debug!("{:?} - Unexpected message: {:?}", self.our_id, message);
self.reset_receive_heartbeat(core, poll);
msg_received = true;
}
Ok(None) => return,
Ok(None) => break,
Err(e) => {
debug!("{:?} - Failed to read from socket: {:?}", self.our_id, e);
return self.terminate(core, poll);
}
}
}

if msg_received {
self.reset_receive_heartbeat(core, poll);
self.update_bootstrap_cache(core);
}
}

#[cfg(not(test))]
Expand Down Expand Up @@ -151,6 +165,12 @@ impl ActiveConnection {
self.terminate(core, poll);
}
}

/// If peer we are communicating with is in bootstrap cache, move it to the top of the cache.
fn update_bootstrap_cache(&mut self, core: &mut EventLoopCore) {
core.user_data_mut().bootstrap_cache.touch(&self.peer_info);
core.user_data_mut().bootstrap_cache.try_commit();
}
}

impl State<CrustData> for ActiveConnection {
Expand Down Expand Up @@ -267,3 +287,156 @@ enum HeartbeatAction {
Send,
Terminate,
}

#[cfg(test)]
mod tests {
use super::*;
use crate::common::ipv4_addr;
use crate::main::bootstrap;
use crate::tests::utils::{
get_event_sender, peer_info_with_rand_key, rand_peer_id_and_enc_sk, test_core,
};
use hamcrest2::prelude::*;
use mio::net::TcpListener;
use mio::{Events, Poll, PollOpt, Ready, Token};
use std::sync::mpsc;
use std::thread;

fn wait_for(token: Token, el: &Poll, timeout: Option<Duration>) -> Option<()> {
let mut events = Events::with_capacity(1);
unwrap!(el.poll(&mut events, timeout));
if events.is_empty() {
None
} else {
assert_eq!(events.iter().next().map(|ev| ev.token()), Some(token));
Some(())
}
}

/// Spawns connection listener that accepts single connection and sends some dummy message to
/// it.
///
/// Returns listener port.
fn spawn_listener(port_tx: mpsc::Sender<u16>) {
let listener = unwrap!(TcpListener::bind(&ipv4_addr(0, 0, 0, 0, 0)));
let listener_port = unwrap!(listener.local_addr()).port();
unwrap!(port_tx.send(listener_port));
let mut socket = None;

const LISTENER_TOKEN: Token = Token(0);
const SOCKET_TOKEN: Token = Token(1);

let el = unwrap!(Poll::new());
unwrap!(el.register(
&listener,
LISTENER_TOKEN,
Ready::readable(),
PollOpt::edge()
));

let mut events = Events::with_capacity(16);
loop {
unwrap!(el.poll(&mut events, None));
for ev in events.iter() {
match ev.token() {
LISTENER_TOKEN => {
let (sock, _) = unwrap!(listener.accept());
let sock = TcpSock::wrap(sock);
unwrap!(el.register(
&sock,
SOCKET_TOKEN,
Ready::writable(),
PollOpt::edge()
));
socket = Some(sock);
}
SOCKET_TOKEN => {
if let Some(ref mut sock) = socket {
let buffered =
unwrap!(sock.write(Some((Message::Data(vec![1, 2, 3]), 1))));
assert!(buffered);
unwrap!(el.deregister(sock));
}
}
_ => panic!("Unexpected event"),
}
}
}
}

mod active_connection {
use super::*;

mod read {
use super::*;

#[test]
fn it_moves_peer_to_the_top_of_the_bootstrap_cache() {
let (tx, rx) = mpsc::channel();
let _listener_thread = thread::spawn(move || spawn_listener(tx));
let listener_port = unwrap!(rx.recv());

let poll = unwrap!(Poll::new());
let sock_token = Token(1);
let (peer1_id, _) = rand_peer_id_and_enc_sk();
let peer1_sock = unwrap!(TcpSock::connect(&ipv4_addr(127, 0, 0, 1, listener_port)));
unwrap!(poll.register(&peer1_sock, sock_token, Ready::writable(), PollOpt::edge()));

unwrap!(wait_for(sock_token, &poll, None));
// check for errors on connect()
assert!(unwrap!(peer1_sock.take_error()).is_none());
let peer1_addr = unwrap!(peer1_sock.peer_addr());
let peer1_info = PeerInfo::new(peer1_addr, peer1_id.pub_enc_key);

let mut cache = bootstrap::Cache::new(Default::default());
cache.put(peer1_info);
cache.put(peer_info_with_rand_key(ipv4_addr(1, 2, 3, 4, 4000)));

let mut core = test_core(cache);
let (event_tx, _event_rx) = get_event_sender();
let (our_id, _) = rand_peer_id_and_enc_sk();
let event = Event::ConnectSuccess(peer1_id);
unwrap!(poll.reregister(
&peer1_sock,
sock_token,
Ready::readable(),
PollOpt::edge()
));
ActiveConnection::start(
&mut core,
&poll,
sock_token,
peer1_sock,
our_id,
peer1_id,
CrustUser::Client,
event,
event_tx,
);

// As there may be a race condition between data received and start() being called
// above (because it calls read() before returning), tries to wait for a readable
// event and follows through
let _ = wait_for(sock_token, &poll, Some(Duration::from_secs(1)));
let state = unwrap!(core.get_state(sock_token));
let mut state = state.borrow_mut();
let active_conn = unwrap!(state.as_any().downcast_mut::<ActiveConnection>());
active_conn.update_bootstrap_cache(&mut core);

let cached_addrs: Vec<_> = core
.user_data()
.bootstrap_cache
.peers()
.iter()
.map(|peer| peer.addr)
.collect();
assert_that!(
&cached_addrs,
contains(vec![peer1_addr, ipv4_addr(1, 2, 3, 4, 4000),])
.in_order()
.exactly()
);
}
}
}
}
71 changes: 71 additions & 0 deletions src/main/bootstrap/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ impl Cache {
self.peers.peek_iter().map(|(_, peer)| *peer).collect()
}

/// Moves given peer to the cache head. The difference from `put()` is that `touch()` won't
/// insert new peer, if it does not exist in the cache.
pub fn touch(&mut self, peer: &PeerInfo) {
let _ = self.peers.get(&peer.pub_key);
}

fn open_file(&self) -> crate::Res<FileHandler<Vec<PeerInfo>>> {
let fname = self
.file_name
Expand All @@ -117,6 +123,7 @@ impl Cache {
#[cfg(test)]
mod tests {
use super::*;
use hamcrest2::prelude::*;

mod cache {
use super::*;
Expand Down Expand Up @@ -309,5 +316,69 @@ mod tests {
assert_eq!(addrs[2], ipv4_addr(1, 2, 3, 6, 6000));
}
}

mod touch {
use super::*;

#[test]
fn it_doesnt_change_cache_if_given_peer_is_not_in_it() {
let mut cache = Cache::new(CacheConfig {
file_name: None,
max_size: 5,
});
cache.put(peer_info_with_rand_key(ipv4_addr(1, 2, 3, 4, 4000)));
cache.put(peer_info_with_rand_key(ipv4_addr(1, 2, 3, 5, 5000)));

cache.touch(&peer_info_with_rand_key(ipv4_addr(1, 2, 3, 6, 6000)));

let addrs: Vec<_> = cache.peers().iter().map(|peer| peer.addr).collect();
assert_that!(
&addrs,
contains(vec![
ipv4_addr(1, 2, 3, 5, 5000),
ipv4_addr(1, 2, 3, 4, 4000)
])
.in_order()
.exactly()
);
}

#[test]
fn it_moves_given_peer_to_cache_front() {
let mut cache = Cache::new(CacheConfig {
file_name: None,
max_size: 5,
});
let peer1 = peer_info_with_rand_key(ipv4_addr(1, 2, 3, 4, 4000));
cache.put(peer1);
cache.put(peer_info_with_rand_key(ipv4_addr(1, 2, 3, 5, 5000)));
cache.put(peer_info_with_rand_key(ipv4_addr(1, 2, 3, 6, 6000)));
let addrs: Vec<_> = cache.peers().iter().map(|peer| peer.addr).collect();
assert_that!(
&addrs,
contains(vec![
ipv4_addr(1, 2, 3, 6, 6000),
ipv4_addr(1, 2, 3, 5, 5000),
ipv4_addr(1, 2, 3, 4, 4000)
])
.in_order()
.exactly()
);

cache.touch(&peer1);

let addrs: Vec<_> = cache.peers().iter().map(|peer| peer.addr).collect();
assert_that!(
&addrs,
contains(vec![
ipv4_addr(1, 2, 3, 4, 4000),
ipv4_addr(1, 2, 3, 6, 6000),
ipv4_addr(1, 2, 3, 5, 5000)
])
.in_order()
.exactly()
);
}
}
}
}

0 comments on commit 7dbeab1

Please sign in to comment.