Skip to content

Commit

Permalink
Merge pull request #4289 from driftluo/fix-peer-store-random-fetch
Browse files Browse the repository at this point in the history
fix: fix peer store random fetch
  • Loading branch information
zhangsoledad authored Jan 3, 2024
2 parents 07f01e9 + cbf0b18 commit ec399af
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 19 deletions.
3 changes: 2 additions & 1 deletion network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ impl AddrManager {
let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned();
if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) {
let ip = socket_addr.ip();
let is_unique_ip = duplicate_ips.insert(ip);
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
}
if addr_infos.len() == count {
Expand Down
8 changes: 2 additions & 6 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,15 @@ impl PeerStore {
/// Return valid addrs that success connected, used for discovery.
pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
// Get info:
// 1. Already connected or Connected within 7 days
// 1. Connected within 7 days

let now_ms = ckb_systemtime::unix_time_as_millis();
let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
let peers = &self.connected_peers;
// get success connected addrs.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
&& (extract_peer_id(&peer_addr.addr)
.map(|peer_id| peers.contains_key(&peer_id))
.unwrap_or_default()
|| peer_addr.connected(|t| t > addr_expired_ms))
&& peer_addr.connected(|t| t > addr_expired_ms)
})
}

Expand Down
2 changes: 1 addition & 1 deletion network/src/protocols/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use p2p::{
use rand::seq::SliceRandom;

pub use self::{
addr::{AddrKnown, AddressManager, MisbehaveResult, Misbehavior},
addr::{AddressManager, MisbehaveResult, Misbehavior},
protocol::{DiscoveryMessage, Node, Nodes},
state::SessionState,
};
Expand Down
11 changes: 7 additions & 4 deletions network/src/protocols/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,14 @@ fn test_discovery_behavior() {

wait_connect_state(&node2, 2);

wait_discovery(&node3, |num| num >= 2);
wait_discovery(&node3, |num| num >= 3);

// use node1 instead of node3 because ANNOUNCE_INTERVAL is 24 hour
// node2 can't announce node1 address to node3 within 24 hours
// but the reverse can
let addrs = {
let listen_addr = &node3.listen_addr;
let mut locked = node3.network_state.peer_store.lock();
let listen_addr = &node1.listen_addr;
let mut locked = node1.network_state.peer_store.lock();

locked
.fetch_addrs_to_feeler(6)
Expand All @@ -508,7 +511,7 @@ fn test_discovery_behavior() {
};

for addr in addrs {
node3.dial_addr(
node1.dial_addr(
addr,
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
);
Expand Down
53 changes: 47 additions & 6 deletions network/src/tests/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn test_add_connected_peer() {
0
);
peer_store.add_connected_peer(addr.clone(), SessionType::Outbound);
peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
Expand Down Expand Up @@ -314,14 +314,14 @@ fn test_fetch_random_addrs() {
.is_empty());
// get peer addr from outbound
peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound);
peer_store.add_addr(addr1, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr1, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);
// get peer addrs by limit
peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound);
peer_store.add_addr(addr2, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
2
Expand All @@ -345,7 +345,7 @@ fn test_fetch_random_addrs() {
.mark_connected(0);
assert_eq!(
peer_store.fetch_random_addrs(3, Flags::COMPATIBILITY).len(),
3
2
);
peer_store.remove_disconnected_peer(&addr3);
assert_eq!(
Expand Down Expand Up @@ -424,12 +424,53 @@ fn test_get_random_restrict_addrs_from_same_ip() {
.unwrap();
peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound);
peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound);
peer_store.add_addr(addr1, Flags::COMPATIBILITY).unwrap();
peer_store.add_addr(addr2, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr1, Flags::COMPATIBILITY);
peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);
}

#[test]
fn test_get_random_with_connected_peer_and_same_peerid() {
let mut peer_store: PeerStore = Default::default();

let peer_id = PeerId::random().to_base58();
let addr1: Multiaddr = format!("/ip4/225.0.0.1/tcp/1867/p2p/{}", peer_id)
.parse()
.unwrap();
let addr2: Multiaddr = format!("/ip4/225.0.0.2/tcp/43/p2p/{}", peer_id)
.parse()
.unwrap();

peer_store
.add_addr(addr1.clone(), Flags::COMPATIBILITY)
.unwrap();
peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY);

// Node information that has not been connected must not be selected.
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);

// add remains connected node info
peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound);

// Even if the node remains connected, node's info without connection information cannot be selected.
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);

peer_store.update_outbound_addr_last_connected_ms(addr1);

// Set connected info to address, it can be selected
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
2
);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion util/dao/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ pub fn pack_dao_data(ar: u64, c: Capacity, s: Capacity, u: Capacity) -> Byte32 {
Byte32::from_slice(&buf).expect("impossible: fail to read array")
}

#[cfg(test)]
mod tests {
pub use super::{extract_dao_data, pack_dao_data};
pub use ckb_types::core::Capacity;
pub use ckb_types::h256;
pub use ckb_types::packed::Byte32;
pub use ckb_types::prelude::Pack;
pub use ckb_types::{h256, H256};

#[test]
#[allow(clippy::unreadable_literal)]
Expand Down

0 comments on commit ec399af

Please sign in to comment.