Skip to content
This repository has been archived by the owner on Nov 15, 2019. It is now read-only.

closes #1138: LRU bootstrap cache #1139

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 180 additions & 7 deletions src/main/active_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::common::{CoreTimer, CrustUser, Message, State};
use crate::common::{CoreTimer, CrustUser, Message, PeerInfo, State};
use crate::main::{ConnectionId, CrustData, Event, EventLoopCore};
use crate::PeerId;
use mio::{Poll, Ready, Token};
Expand Down Expand Up @@ -38,6 +38,7 @@ pub struct ActiveConnection {
their_role: CrustUser,
event_tx: crate::CrustEventSender,
heartbeat: Heartbeat,
peer_info: PeerInfo,
}

impl ActiveConnection {
Expand All @@ -58,6 +59,14 @@ impl ActiveConnection {
their_id
);

let their_addr = if let Ok(addr) = socket.peer_addr() {
addr
} else {
debug!("Failed to get active connection socket address.");
return;
};
let peer_info = PeerInfo::new(their_addr, their_id.pub_enc_key);

let heartbeat = Heartbeat::new(core, token);
let state = Rc::new(RefCell::new(ActiveConnection {
token,
Expand All @@ -67,6 +76,7 @@ impl ActiveConnection {
their_role,
event_tx,
heartbeat,
peer_info,
}));
let _ = core.insert_state(token, state.clone());

Expand All @@ -89,28 +99,32 @@ impl ActiveConnection {
}

fn read(&mut self, core: &mut EventLoopCore, poll: &Poll) {
let mut msg_received = false;
loop {
match self.socket.read::<Message>() {
Ok(Some(Message::Data(data))) => {
let _ =
self.event_tx
.send(Event::NewMessage(self.their_id, self.their_role, data));
self.reset_receive_heartbeat(core, poll);
}
Ok(Some(Message::Heartbeat)) => {
self.reset_receive_heartbeat(core, poll);
msg_received = true;
}
Ok(Some(Message::Heartbeat)) => msg_received = true,
Ok(Some(message)) => {
debug!("{:?} - Unexpected message: {:?}", self.our_id, message);
self.reset_receive_heartbeat(core, poll);
msg_received = true;
}
Ok(None) => return,
Ok(None) => break,
Err(e) => {
debug!("{:?} - Failed to read from socket: {:?}", self.our_id, e);
return self.terminate(core, poll);
}
}
}

if msg_received {
self.reset_receive_heartbeat(core, poll);
self.update_bootstrap_cache(core);
}
}

#[cfg(not(test))]
Expand Down Expand Up @@ -151,6 +165,12 @@ impl ActiveConnection {
self.terminate(core, poll);
}
}

/// If peer we are communicating with is in bootstrap cache, move it to the top of the cache.
fn update_bootstrap_cache(&mut self, core: &mut EventLoopCore) {
core.user_data_mut().bootstrap_cache.touch(&self.peer_info);
core.user_data_mut().bootstrap_cache.try_commit();
}
}

impl State<CrustData> for ActiveConnection {
Expand Down Expand Up @@ -267,3 +287,156 @@ enum HeartbeatAction {
Send,
Terminate,
}

#[cfg(test)]
mod tests {
use super::*;
use crate::common::ipv4_addr;
use crate::main::bootstrap;
use crate::tests::utils::{
get_event_sender, peer_info_with_rand_key, rand_peer_id_and_enc_sk, test_core,
};
use hamcrest2::prelude::*;
use mio::net::TcpListener;
use mio::{Events, Poll, PollOpt, Ready, Token};
use std::sync::mpsc;
use std::thread;

fn wait_for(token: Token, el: &Poll, timeout: Option<Duration>) -> Option<()> {
let mut events = Events::with_capacity(1);
unwrap!(el.poll(&mut events, timeout));
if events.is_empty() {
None
} else {
assert_eq!(events.iter().next().map(|ev| ev.token()), Some(token));
Some(())
}
}

/// Spawns connection listener that accepts single connection and sends some dummy message to
/// it.
///
/// Returns listener port.
fn spawn_listener(port_tx: mpsc::Sender<u16>) {
let listener = unwrap!(TcpListener::bind(&ipv4_addr(0, 0, 0, 0, 0)));
let listener_port = unwrap!(listener.local_addr()).port();
unwrap!(port_tx.send(listener_port));
let mut socket = None;

const LISTENER_TOKEN: Token = Token(0);
const SOCKET_TOKEN: Token = Token(1);

let el = unwrap!(Poll::new());
unwrap!(el.register(
&listener,
LISTENER_TOKEN,
Ready::readable(),
PollOpt::edge()
));

let mut events = Events::with_capacity(16);
loop {
unwrap!(el.poll(&mut events, None));
for ev in events.iter() {
match ev.token() {
LISTENER_TOKEN => {
let (sock, _) = unwrap!(listener.accept());
let sock = TcpSock::wrap(sock);
unwrap!(el.register(
&sock,
SOCKET_TOKEN,
Ready::writable(),
PollOpt::edge()
));
socket = Some(sock);
}
SOCKET_TOKEN => {
if let Some(ref mut sock) = socket {
let buffered =
unwrap!(sock.write(Some((Message::Data(vec![1, 2, 3]), 1))));
assert!(buffered);
unwrap!(el.deregister(sock));
}
}
_ => panic!("Unexpected event"),
}
}
}
}

mod active_connection {
use super::*;

mod read {
use super::*;

#[test]
fn it_moves_peer_to_the_top_of_the_bootstrap_cache() {
let (tx, rx) = mpsc::channel();
let _listener_thread = thread::spawn(move || spawn_listener(tx));
let listener_port = unwrap!(rx.recv());

let poll = unwrap!(Poll::new());
let sock_token = Token(1);
let (peer1_id, _) = rand_peer_id_and_enc_sk();
let peer1_sock = unwrap!(TcpSock::connect(&ipv4_addr(127, 0, 0, 1, listener_port)));
unwrap!(poll.register(&peer1_sock, sock_token, Ready::writable(), PollOpt::edge()));

unwrap!(wait_for(sock_token, &poll, None));
// check for errors on connect()
assert!(unwrap!(peer1_sock.take_error()).is_none());
let peer1_addr = unwrap!(peer1_sock.peer_addr());
let peer1_info = PeerInfo::new(peer1_addr, peer1_id.pub_enc_key);

let mut cache = bootstrap::Cache::new(Default::default());
cache.put(peer1_info);
cache.put(peer_info_with_rand_key(ipv4_addr(1, 2, 3, 4, 4000)));

let mut core = test_core(cache);
let (event_tx, _event_rx) = get_event_sender();
let (our_id, _) = rand_peer_id_and_enc_sk();
let event = Event::ConnectSuccess(peer1_id);
unwrap!(poll.reregister(
&peer1_sock,
sock_token,
Ready::readable(),
PollOpt::edge()
));
ActiveConnection::start(
&mut core,
&poll,
sock_token,
peer1_sock,
our_id,
peer1_id,
CrustUser::Client,
event,
event_tx,
);

// As there may be a race condition between data received and start() being called
// above (because it calls read() before returning), tries to wait for a readable
// event and follows through
let _ = wait_for(sock_token, &poll, Some(Duration::from_secs(1)));
let state = unwrap!(core.get_state(sock_token));
let mut state = state.borrow_mut();
let active_conn = unwrap!(state.as_any().downcast_mut::<ActiveConnection>());
active_conn.update_bootstrap_cache(&mut core);

let cached_addrs: Vec<_> = core
.user_data()
.bootstrap_cache
.peers()
.iter()
.map(|peer| peer.addr)
.collect();
assert_that!(
&cached_addrs,
contains(vec![peer1_addr, ipv4_addr(1, 2, 3, 4, 4000),])
.in_order()
.exactly()
);
}
}
}
}
Loading