From cbf0b182a84bb29e9e1082d263dd042765f34b35 Mon Sep 17 00:00:00 2001 From: driftluo Date: Tue, 2 Jan 2024 15:27:41 +0800 Subject: [PATCH] fix: fix peer store random fetch --- network/src/peer_store/addr_manager.rs | 3 +- network/src/peer_store/peer_store_impl.rs | 8 +--- network/src/protocols/discovery/mod.rs | 2 +- network/src/protocols/tests/mod.rs | 11 +++-- network/src/tests/peer_store.rs | 53 ++++++++++++++++++++--- util/dao/utils/src/lib.rs | 3 +- 6 files changed, 61 insertions(+), 19 deletions(-) diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs index 218d28769e..cadac4b0e9 100644 --- a/network/src/peer_store/addr_manager.rs +++ b/network/src/peer_store/addr_manager.rs @@ -57,7 +57,7 @@ impl AddrManager { let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned(); if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) { let ip = socket_addr.ip(); - let is_unique_ip = duplicate_ips.insert(ip); + let is_unique_ip = !duplicate_ips.contains(&ip); // A trick to make our tests work // TODO remove this after fix the network tests. let is_test_ip = ip.is_unspecified() || ip.is_loopback(); @@ -65,6 +65,7 @@ impl AddrManager { && addr_info.is_connectable(now_ms) && filter(&addr_info) { + duplicate_ips.insert(ip); addr_infos.push(addr_info); } if addr_infos.len() == count { diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 1f347a58ff..f7e39c3cd8 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -185,19 +185,15 @@ impl PeerStore { /// Return valid addrs that success connected, used for discovery. pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec { // Get info: - // 1. Already connected or Connected within 7 days + // 1. Connected within 7 days let now_ms = ckb_systemtime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS); - let peers = &self.connected_peers; // get success connected addrs. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) - && (extract_peer_id(&peer_addr.addr) - .map(|peer_id| peers.contains_key(&peer_id)) - .unwrap_or_default() - || peer_addr.connected(|t| t > addr_expired_ms)) + && peer_addr.connected(|t| t > addr_expired_ms) }) } diff --git a/network/src/protocols/discovery/mod.rs b/network/src/protocols/discovery/mod.rs index 97f931abf1..b3e4548b26 100644 --- a/network/src/protocols/discovery/mod.rs +++ b/network/src/protocols/discovery/mod.rs @@ -16,7 +16,7 @@ use p2p::{ use rand::seq::SliceRandom; pub use self::{ - addr::{AddrKnown, AddressManager, MisbehaveResult, Misbehavior}, + addr::{AddressManager, MisbehaveResult, Misbehavior}, protocol::{DiscoveryMessage, Node, Nodes}, state::SessionState, }; diff --git a/network/src/protocols/tests/mod.rs b/network/src/protocols/tests/mod.rs index 140e9625d8..a2df501448 100644 --- a/network/src/protocols/tests/mod.rs +++ b/network/src/protocols/tests/mod.rs @@ -479,11 +479,14 @@ fn test_discovery_behavior() { wait_connect_state(&node2, 2); - wait_discovery(&node3, |num| num >= 2); + wait_discovery(&node3, |num| num >= 3); + // use node1 instead of node3 because ANNOUNCE_INTERVAL is 24 hour + // node2 can't announce node1 address to node3 within 24 hours + // but the reverse can let addrs = { - let listen_addr = &node3.listen_addr; - let mut locked = node3.network_state.peer_store.lock(); + let listen_addr = &node1.listen_addr; + let mut locked = node1.network_state.peer_store.lock(); locked .fetch_addrs_to_feeler(6) @@ -508,7 +511,7 @@ fn test_discovery_behavior() { }; for addr in addrs { - node3.dial_addr( + node1.dial_addr( addr, TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), ); diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index 75348801c9..ee713f5a7f 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -19,7 +19,7 @@ fn test_add_connected_peer() { 0 ); peer_store.add_connected_peer(addr.clone(), SessionType::Outbound); - peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap(); + peer_store.add_outbound_addr(addr, Flags::COMPATIBILITY); assert_eq!( peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), 1 @@ -314,14 +314,14 @@ fn test_fetch_random_addrs() { .is_empty()); // get peer addr from outbound peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound); - peer_store.add_addr(addr1, Flags::COMPATIBILITY).unwrap(); + peer_store.add_outbound_addr(addr1, Flags::COMPATIBILITY); assert_eq!( peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), 1 ); // get peer addrs by limit peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound); - peer_store.add_addr(addr2, Flags::COMPATIBILITY).unwrap(); + peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY); assert_eq!( peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), 2 @@ -345,7 +345,7 @@ fn test_fetch_random_addrs() { .mark_connected(0); assert_eq!( peer_store.fetch_random_addrs(3, Flags::COMPATIBILITY).len(), - 3 + 2 ); peer_store.remove_disconnected_peer(&addr3); assert_eq!( @@ -424,12 +424,53 @@ fn test_get_random_restrict_addrs_from_same_ip() { .unwrap(); peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound); peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound); - peer_store.add_addr(addr1, Flags::COMPATIBILITY).unwrap(); - peer_store.add_addr(addr2, Flags::COMPATIBILITY).unwrap(); + peer_store.add_outbound_addr(addr1, Flags::COMPATIBILITY); + peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY); + assert_eq!( + peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), + 1 + ); +} + +#[test] +fn test_get_random_with_connected_peer_and_same_peerid() { + let mut peer_store: PeerStore = Default::default(); + + let peer_id = PeerId::random().to_base58(); + let addr1: Multiaddr = format!("/ip4/225.0.0.1/tcp/1867/p2p/{}", peer_id) + .parse() + .unwrap(); + let addr2: Multiaddr = format!("/ip4/225.0.0.2/tcp/43/p2p/{}", peer_id) + .parse() + .unwrap(); + + peer_store + .add_addr(addr1.clone(), Flags::COMPATIBILITY) + .unwrap(); + peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY); + + // Node information that has not been connected must not be selected. assert_eq!( peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), 1 ); + + // add remains connected node info + peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound); + + // Even if the node remains connected, node's info without connection information cannot be selected. + assert_eq!( + peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), + 1 + ); + + peer_store.update_outbound_addr_last_connected_ms(addr1); + + // Set connected info to address, it can be selected + assert_eq!( + peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(), + 2 + ); } #[test] diff --git a/util/dao/utils/src/lib.rs b/util/dao/utils/src/lib.rs index 22d4b3ad3c..3e67098a43 100644 --- a/util/dao/utils/src/lib.rs +++ b/util/dao/utils/src/lib.rs @@ -122,12 +122,13 @@ pub fn pack_dao_data(ar: u64, c: Capacity, s: Capacity, u: Capacity) -> Byte32 { Byte32::from_slice(&buf).expect("impossible: fail to read array") } +#[cfg(test)] mod tests { pub use super::{extract_dao_data, pack_dao_data}; pub use ckb_types::core::Capacity; + pub use ckb_types::h256; pub use ckb_types::packed::Byte32; pub use ckb_types::prelude::Pack; - pub use ckb_types::{h256, H256}; #[test] #[allow(clippy::unreadable_literal)]