Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add p2p.experimental cli options #2128

Merged
merged 10 commits into from
Jul 19, 2024
2 changes: 1 addition & 1 deletion crates/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod core_addr {
Decoder::Hex.decode(b"4737c0c1B4D5b1A687B42610DdabEE781152359c");
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct EthereumStateUpdate {
pub state_root: StateCommitment,
pub block_number: BlockNumber,
Expand Down
36 changes: 25 additions & 11 deletions crates/p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,21 @@ impl Behaviour {
kademlia_config.set_provider_publication_interval(Some(PROVIDER_PUBLICATION_INTERVAL));
// This makes sure that the DHT we're implementing is incompatible with the
// "default" IPFS DHT from libp2p.
kademlia_config.set_protocol_names(vec![StreamProtocol::try_from_owned(
kademlia_protocol_name(chain_id),
)
.unwrap()]);
if cfg.kad_names.is_empty() {
kademlia_config.set_protocol_names(vec![StreamProtocol::try_from_owned(
kademlia_protocol_name(chain_id),
)
.unwrap()]);
} else {
kademlia_config.set_protocol_names(
cfg.kad_names
.iter()
.cloned()
.map(StreamProtocol::try_from_owned)
.collect::<Result<Vec<_>, _>>()
.expect("valid protocol names"),
);
}

let peer_id = identity.public().to_peer_id();

Expand All @@ -473,11 +484,14 @@ impl Behaviour {
)
.expect("valid gossipsub params");

let headers_sync = request_response_behavior::<codec::Headers>();
let classes_sync = request_response_behavior::<codec::Classes>();
let state_diffs_sync = request_response_behavior::<codec::StateDiffs>();
let transactions_sync = request_response_behavior::<codec::Transactions>();
let events_sync = request_response_behavior::<codec::Events>();
let p2p_stream_cfg = p2p_stream::Config::default()
.with_request_timeout(cfg.stream_timeout)
.with_max_concurrent_streams(cfg.max_concurrent_streams);
let headers_sync = request_response_behavior::<codec::Headers>(p2p_stream_cfg);
let classes_sync = request_response_behavior::<codec::Classes>(p2p_stream_cfg);
let state_diffs_sync = request_response_behavior::<codec::StateDiffs>(p2p_stream_cfg);
let transactions_sync = request_response_behavior::<codec::Transactions>(p2p_stream_cfg);
let events_sync = request_response_behavior::<codec::Events>(p2p_stream_cfg);

let (relay_transport, relay) = relay::client::new(peer_id);

Expand Down Expand Up @@ -842,12 +856,12 @@ impl Behaviour {
}
}

fn request_response_behavior<C>() -> p2p_stream::Behaviour<C>
fn request_response_behavior<C>(cfg: p2p_stream::Config) -> p2p_stream::Behaviour<C>
where
C: Default + p2p_stream::Codec + Clone + Send,
C::Protocol: Default,
{
p2p_stream::Behaviour::new(std::iter::once(C::Protocol::default()), Default::default())
p2p_stream::Behaviour::new(std::iter::once(C::Protocol::default()), cfg)
}

#[allow(dead_code)]
Expand Down
6 changes: 6 additions & 0 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ pub struct Config {
pub ip_whitelist: Vec<IpNet>,
pub bootstrap: BootstrapConfig,
pub inbound_connections_rate_limit: RateLimit,
/// Alternative protocol names for Kademlia
pub kad_names: Vec<String>,
/// Request timeout for p2p-stream
pub stream_timeout: Duration,
/// Applies to each of the p2p-stream protocols separately
pub max_concurrent_streams: usize,
}

#[derive(Debug, Clone)]
Expand Down
30 changes: 30 additions & 0 deletions crates/p2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ impl Default for TestPeer {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
},
Keypair::generate_ed25519(),
)
Expand Down Expand Up @@ -265,6 +268,9 @@ async fn periodic_bootstrap() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let mut boot = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -414,6 +420,9 @@ async fn reconnect_too_quickly() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -516,6 +525,9 @@ async fn duplicate_connection() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let keypair = Keypair::generate_ed25519();
let mut peer1 = TestPeer::new(cfg.clone(), keypair.clone());
Expand Down Expand Up @@ -602,6 +614,9 @@ async fn outbound_peer_eviction() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -731,6 +746,9 @@ async fn inbound_peer_eviction() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -816,6 +834,9 @@ async fn evicted_peer_reconnection() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -907,6 +928,9 @@ async fn ip_whitelist() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -940,6 +964,9 @@ async fn ip_whitelist() {
max: 1000,
interval: Duration::from_secs(1),
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};
let mut peer3 = TestPeer::new(cfg, Keypair::generate_ed25519());

Expand Down Expand Up @@ -974,6 +1001,9 @@ async fn rate_limit() {
max: 2,
interval: RATE_LIMIT_INTERVAL,
},
kad_names: Default::default(),
stream_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down
2 changes: 1 addition & 1 deletion crates/p2p_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl fmt::Display for OutboundRequestId {
}

/// The configuration for a `Behaviour` protocol.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct Config {
request_timeout: Duration,
max_concurrent_streams: usize,
Expand Down
130 changes: 126 additions & 4 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ struct P2PCli {
Example:
'/ip4/127.0.0.1/9001/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaN,/ip4/127.0.0.1/9002/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaN'"#,
value_name = "MULTIADDRESS_LIST",
value_delimiter = ',',
env = "PATHFINDER_P2P_BOOTSTRAP_ADDRESSES"
)]
bootstrap_addresses: Vec<String>,
Expand All @@ -392,6 +393,7 @@ Example:
Example:
'/ip4/127.0.0.1/9003/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaP,/ip4/127.0.0.1/9004/p2p/12D3KooWBEkKyufuqCMoZLRhVzq4xdHxVWhhYeBpjw92GSyZ6xaR'"#,
value_name = "MULTIADDRESS_LIST",
value_delimiter = ',',
env = "PATHFINDER_P2P_PREDEFINED_PEERS"
)]
predefined_peers: Vec<String>,
Expand Down Expand Up @@ -445,6 +447,65 @@ Example:
env = "IP_WHITELIST"
)]
ip_whitelist: Vec<IpNet>,

#[arg(
long = "p2p.experimental.kad-names",
long_help = "Comma separated list of custom Kademlia protocol names.",
value_name = "LIST",
default_value = "/starknet/kad/<STARKNET_CHAIN_ID>/1.0.0",
value_delimiter = ',',
env = "PATHFINDER_P2P_EXPERIMENTAL_KAD_NAMES"
)]
kad_names: Vec<String>,

#[arg(
long = "p2p.experimental.l1-checkpoint-override",
long_help = "Override L1 sync checkpoint retrieved from the Ethereum API. This option \
points to a json encoded file containing an L1 checkpoint from which \
pathfinder will sync backwards till genesis before switching to syncing \
forward and following the head of the chain. Example contents: { \
\"block_hash\": \"0x1\", \"block_number\": 2, \"state_root\": \"0x3\" }",
value_name = "JSON_FILE",
env = "PATHFINDER_P2P_EXPERIMENTAL_L1_CHECKPOINT_OVERRIDE"
)]
l1_checkpoint_override: Option<String>,

#[arg(
long = "p2p.experimental.stream-timeout",
long_help = "Timeout of the request/response-stream protocol.",
value_name = "SECONDS",
default_value = "60",
env = "PATHFINDER_P2P_EXPERIMENTAL_STREAM_TIMEOUT"
)]
stream_timeout: u32,

#[arg(
long = "p2p.experimental.max-concurrent-streams",
long_help = "Maximum allowed number of concurrent streams per each \
request/response-stream protocol.",
value_name = "LIMIT",
default_value = "100",
env = "PATHFINDER_P2P_EXPERIMENTAL_MAX_CONCURRENT_STREAMS"
)]
max_concurrent_streams: usize,

#[arg(
long = "p2p.experimental.direct-connection-timeout",
long_help = "A direct (not relayed) peer can only connect once in this period.",
value_name = "SECONDS",
default_value = "30",
env = "PATHFINDER_P2P_EXPERIMENTAL_DIRECT_CONNECTION_TIMEOUT"
)]
direct_connection_timeout: u32,

#[arg(
long = "p2p.experimental.eviction-timeout",
long_help = "How long to prevent evicted peers from reconnecting.",
value_name = "SECONDS",
default_value = "900",
env = "PATHFINDER_P2P_EXPERIMENTAL_EVICTION_TIMEOUT"
)]
eviction_timeout: u32,
}

#[cfg(feature = "p2p")]
Expand Down Expand Up @@ -615,6 +676,12 @@ pub struct P2PConfig {
pub max_outbound_connections: usize,
pub ip_whitelist: Vec<IpNet>,
pub low_watermark: usize,
pub kad_names: Vec<String>,
pub l1_checkpoint_override: Option<pathfinder_ethereum::EthereumStateUpdate>,
pub stream_timeout: Duration,
pub max_concurrent_streams: usize,
pub direct_connection_timeout: Duration,
pub eviction_timeout: Duration,
}

#[cfg(not(feature = "p2p"))]
Expand Down Expand Up @@ -689,14 +756,14 @@ impl P2PConfig {
use clap::error::ErrorKind;
use p2p::libp2p::multiaddr::Result;

let parse_multiaddr_vec = |multiaddrs: Vec<String>| -> Vec<Multiaddr> {
let parse_multiaddr_vec = |field: &str, multiaddrs: Vec<String>| -> Vec<Multiaddr> {
multiaddrs
.into_iter()
.map(|addr| Multiaddr::from_str(&addr))
.collect::<Result<Vec<_>>>()
.unwrap_or_else(|error| {
Cli::command()
.error(ErrorKind::ValueValidation, error)
.error(ErrorKind::ValueValidation, format!("{field}: {error}"))
.exit()
})
};
Expand Down Expand Up @@ -728,6 +795,17 @@ impl P2PConfig {
.exit()
}

if args.kad_names.is_empty() || args.kad_names.iter().any(|x| x.is_empty()) {
Cli::command()
.error(
ErrorKind::ValueValidation,
"p2p.experimental.kad-names must contain at least one non-empty string",
)
.exit()
}

let l1_checkpoint_override = parse_l1_checkpoint_or_exit(args.l1_checkpoint_override);

Self {
max_inbound_direct_connections: args.max_inbound_direct_connections.try_into().unwrap(),
max_inbound_relayed_connections: args
Expand All @@ -738,14 +816,58 @@ impl P2PConfig {
proxy: args.proxy,
identity_config_file: args.identity_config_file,
listen_on: args.listen_on,
bootstrap_addresses: parse_multiaddr_vec(args.bootstrap_addresses),
predefined_peers: parse_multiaddr_vec(args.predefined_peers),
bootstrap_addresses: parse_multiaddr_vec(
"p2p.bootstrap-addresses",
args.bootstrap_addresses,
),
predefined_peers: parse_multiaddr_vec("p2p.predefined-peers", args.predefined_peers),
ip_whitelist: args.ip_whitelist,
low_watermark: 0,
kad_names: args.kad_names,
l1_checkpoint_override,
stream_timeout: Duration::from_secs(args.stream_timeout.into()),
max_concurrent_streams: args.max_concurrent_streams,
direct_connection_timeout: Duration::from_secs(args.direct_connection_timeout.into()),
eviction_timeout: Duration::from_secs(args.eviction_timeout.into()),
}
}
}

#[cfg(feature = "p2p")]
fn parse_l1_checkpoint_or_exit(
l1_checkpoint_override: Option<String>,
) -> Option<pathfinder_ethereum::EthereumStateUpdate> {
use clap::error::ErrorKind;
use pathfinder_common::{BlockHash, BlockNumber, StateCommitment};

#[derive(serde::Deserialize)]
struct Dto {
state_root: StateCommitment,
block_number: BlockNumber,
block_hash: BlockHash,
}

fn exit_now(e: impl std::fmt::Display) {
Cli::command()
.error(
ErrorKind::ValueValidation,
format!("p2p.experimental.l1-anchor: {e}"),
)
.exit()
}

l1_checkpoint_override.map(|f| {
// SAFETY: unwraps are safe because we exit the process on error
let f = std::fs::File::open(f).map_err(exit_now).unwrap();
let dto: Dto = serde_json::from_reader(f).map_err(exit_now).unwrap();
pathfinder_ethereum::EthereumStateUpdate {
state_root: dto.state_root,
block_number: dto.block_number,
block_hash: dto.block_hash,
}
})
}

#[cfg(not(feature = "p2p"))]
impl DebugConfig {
fn parse(_: ()) -> Self {
Expand Down
Loading