Skip to content

Commit

Permalink
[consensus] logging and metrics improvements (#20807)
Browse files Browse the repository at this point in the history
## Description 

- Include peer hostname in round prober errors.
- Fix block proposal interval metric.
- Log missing blocks and peers fetched from in info.
- Add `synchronizer_current_missing_blocks_by_authority` metric.

## Test plan 

CI
PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian committed Jan 9, 2025
1 parent 5ff708b commit 2519c82
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 30 deletions.
31 changes: 18 additions & 13 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,26 @@ impl Core {
.node_metrics
.proposed_block_size
.observe(serialized.len() as f64);
// Unnecessary to verify own blocks.
// Own blocks are assumed to be valid.
let verified_block = VerifiedBlock::new_verified(signed_block, serialized);

// Record the interval from last proposal, before accepting the proposed block.
let last_proposed_block = self.last_proposed_block();
if last_proposed_block.round() > 0 {
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
Duration::from_millis(
verified_block
.timestamp_ms()
.saturating_sub(last_proposed_block.timestamp_ms()),
)
.as_secs_f64(),
);
}

// Accept the block into BlockManager and DagState.
let (accepted_blocks, missing) = self
.block_manager
Expand All @@ -513,18 +530,6 @@ impl Core {
// Ensure the new block and its ancestors are persisted, before broadcasting it.
self.dag_state.write().flush();

let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms());
let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms());
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
current_proposal_duration
.saturating_sub(previous_proposal_duration)
.as_secs_f64(),
);

// Now acknowledge the transactions for their inclusion to block
ack_transactions(verified_block.reference());

Expand Down
21 changes: 14 additions & 7 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::network::metrics::NetworkMetrics;
const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
0.000_001, 0.000_050, 0.000_100, 0.000_500, 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25,
0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.5, 3.0, 3.5,
4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10.,
4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., 20., 30., 60., 120.,
];

const NUM_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -126,6 +126,7 @@ pub(crate) struct NodeMetrics {
pub(crate) fetch_blocks_scheduler_skipped: IntCounterVec,
pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec,
pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec,
pub(crate) synchronizer_current_missing_blocks_by_authority: IntGaugeVec,
pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec,
pub(crate) invalid_blocks: IntCounterVec,
pub(crate) rejected_blocks: IntCounterVec,
Expand Down Expand Up @@ -356,18 +357,24 @@ impl NodeMetrics {
&["peer", "type"],
registry,
).unwrap(),
synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_fetched_blocks_by_authority",
"Number of fetched blocks per block author via the synchronizer",
&["authority", "type"],
registry,
).unwrap(),
synchronizer_missing_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_missing_blocks_by_authority",
"Number of missing blocks per block author, as observed by the synchronizer during periodic sync.",
&["authority"],
registry,
).unwrap(),
synchronizer_current_missing_blocks_by_authority: register_int_gauge_vec_with_registry!(
"synchronizer_current_missing_blocks_by_authority",
"Current number of missing blocks per block author, as observed by the synchronizer during periodic sync.",
&["authority"],
registry,
).unwrap(),
synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_fetched_blocks_by_authority",
"Number of fetched blocks per block author via the synchronizer",
&["authority", "type"],
registry,
).unwrap(),
last_known_own_block_round: register_int_gauge_with_registry!(
"last_known_own_block_round",
"The highest round of our own block as this has been synced from peers during an amnesia recovery",
Expand Down
9 changes: 5 additions & 4 deletions consensus/core/src/round_prober.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,15 @@ impl<C: NetworkClient> RoundProber<C> {
tokio::select! {
result = requests.next() => {
let Some((peer, result)) = result else { break };
let peer_name = &self.context.committee.authority(peer).hostname;
match result {
Ok(Ok((received, accepted))) => {
if received.len() == self.context.committee.size()
{
highest_received_rounds[peer] = received;
} else {
node_metrics.round_prober_request_errors.with_label_values(&["invalid_received_rounds"]).inc();
tracing::warn!("Received invalid number of received rounds from peer {}", peer);
tracing::warn!("Received invalid number of received rounds from peer {}", peer_name);
}

if self
Expand All @@ -188,7 +189,7 @@ impl<C: NetworkClient> RoundProber<C> {
highest_accepted_rounds[peer] = accepted;
} else {
node_metrics.round_prober_request_errors.with_label_values(&["invalid_accepted_rounds"]).inc();
tracing::warn!("Received invalid number of accepted rounds from peer {}", peer);
tracing::warn!("Received invalid number of accepted rounds from peer {}", peer_name);
}
}

Expand All @@ -205,11 +206,11 @@ impl<C: NetworkClient> RoundProber<C> {
// own probing failures and actual propagation issues.
Ok(Err(err)) => {
node_metrics.round_prober_request_errors.with_label_values(&["failed_fetch"]).inc();
tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer, err);
tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer_name, err);
},
Err(_) => {
node_metrics.round_prober_request_errors.with_label_values(&["timeout"]).inc();
tracing::warn!("Timeout while getting latest rounds from peer {}", peer);
tracing::warn!("Timeout while getting latest rounds from peer {}", peer_name);
},
}
}
Expand Down
41 changes: 35 additions & 6 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,10 +893,9 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
fail_point_async!("consensus-delay");

// Fetch blocks from peers
let results = Self::fetch_blocks_from_authorities(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, core_dispatcher.clone(), dag_state).await;
let results = Self::fetch_blocks_from_authorities(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, dag_state).await;
context.metrics.node_metrics.fetch_blocks_scheduler_inflight.dec();
if results.is_empty() {
warn!("No results returned while requesting missing blocks");
return;
}

Expand Down Expand Up @@ -935,7 +934,6 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
inflight_blocks: Arc<InflightBlocksMap>,
network_client: Arc<C>,
missing_blocks: BTreeSet<BlockRef>,
_core_dispatcher: Arc<D>,
dag_state: Arc<RwLock<DagState>>,
) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
const MAX_PEERS: usize = 3;
Expand All @@ -945,6 +943,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
.into_iter()
.take(MAX_PEERS * MAX_BLOCKS_PER_FETCH)
.collect::<Vec<_>>();

let mut missing_blocks_per_authority = vec![0; context.committee.size()];
for block in &missing_blocks {
missing_blocks_per_authority[block.author] += 1;
Expand All @@ -959,6 +958,12 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
.synchronizer_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.inc_by(missing as u64);
context
.metrics
.node_metrics
.synchronizer_current_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.set(missing as i64);
}

let mut peers = context
Expand All @@ -983,10 +988,21 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let peer = peers
.next()
.expect("Possible misconfiguration as a peer should be found");
let peer_hostname = &context.committee.authority(peer).hostname;
let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();

// lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
info!(
"Fetching {} missing blocks from peer {}: {}",
block_refs.len(),
peer_hostname,
block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
peer,
Expand All @@ -1005,9 +1021,11 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C

loop {
tokio::select! {
Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() =>
Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
results.push((blocks_guard, fetched_blocks, peer_index));

// no more pending requests are left, just break the loop
Expand All @@ -1020,6 +1038,16 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
if let Some(next_peer) = peers.next() {
// do best effort to lock guards. If we can't lock then don't bother at this run.
if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
info!(
"Retrying fetching {} missing blocks from peer {}: {}",
blocks_guard.block_refs.len(),
peer_hostname,
blocks_guard.block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
next_peer,
Expand All @@ -1035,9 +1063,10 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
debug!("No more peers left to fetch blocks");
}
}
},
}
},
_ = &mut fetcher_timeout => {
debug!("Timed out while fetching all the blocks");
debug!("Timed out while fetching missing blocks");
break;
}
}
Expand Down

0 comments on commit 2519c82

Please sign in to comment.