Skip to content

Commit

Permalink
Add metrics to example channel app's store (#778)
Browse files Browse the repository at this point in the history
* added metrics to example store

* optimized storing metrics in example app

* added time observing metrics to example app

* moving SharedRegistry import below

* serving metrics in app_channel

* renamed channel_app prefix from blockstore to app_channel

* Handle errors in metrics server

* Improve metrics collection in store

* Formatting

---------

Co-authored-by: Romain Ruetschi <[email protected]>
  • Loading branch information
OakenKnight and romac authored Jan 17, 2025
1 parent 9d3dcef commit 43302df
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 24 deletions.
4 changes: 3 additions & 1 deletion code/crates/test/cli/src/cmd/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl StartCmd {
pub async fn start(node: impl Node, metrics: Option<MetricsConfig>) -> eyre::Result<()> {
// Enable Prometheus
if let Some(metrics) = metrics {
tokio::spawn(metrics::serve(metrics.clone()));
if metrics.enabled {
tokio::spawn(metrics::serve(metrics.listen_addr));
}
}

// Start the node
Expand Down
24 changes: 17 additions & 7 deletions code/crates/test/cli/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use std::io;

use axum::routing::get;
use axum::Router;
use tokio::net::TcpListener;
use tracing::info;
use tokio::net::{TcpListener, ToSocketAddrs};
use tracing::{error, info};

use malachitebft_app::metrics::export;
use malachitebft_config::MetricsConfig;

#[tracing::instrument(name = "metrics", skip_all)]
pub async fn serve(config: MetricsConfig) {
pub async fn serve(listen_addr: impl ToSocketAddrs) {
if let Err(e) = inner(listen_addr).await {
error!("Metrics server failed: {e}");
}
}

async fn inner(listen_addr: impl ToSocketAddrs) -> io::Result<()> {
let app = Router::new().route("/metrics", get(get_metrics));
let listener = TcpListener::bind(config.listen_addr).await.unwrap();
let listener = TcpListener::bind(listen_addr).await?;
let local_addr = listener.local_addr()?;

info!(address = %local_addr, "Serving metrics");
axum::serve(listener, app).await?;

info!(address = %config.listen_addr, "Serving metrics");
axum::serve(listener, app).await.unwrap();
Ok(())
}

async fn get_metrics() -> String {
Expand Down
3 changes: 2 additions & 1 deletion code/examples/channel/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl DbMetrics {
pub fn register(registry: &SharedRegistry) -> Self {
let metrics = Self::new();

registry.with_prefix("blockstore", |registry| {
registry.with_prefix("app_channel", |registry| {
registry.register(
"db_size",
"Size of the database (bytes)",
Expand Down Expand Up @@ -149,6 +149,7 @@ impl DbMetrics {
metrics
}

#[allow(dead_code)]
pub fn set_db_size(&self, size: usize) {
self.db_size.set(size as i64);
}
Expand Down
12 changes: 11 additions & 1 deletion code/examples/channel/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::{Path, PathBuf};
use async_trait::async_trait;
use rand::{CryptoRng, RngCore};

use malachitebft_app_channel::app::metrics::SharedRegistry;
use malachitebft_app_channel::app::types::config::Config;
use malachitebft_app_channel::app::types::core::VotingPower;
use malachitebft_app_channel::app::types::Keypair;
Expand All @@ -17,7 +18,9 @@ use malachitebft_test::codec::proto::ProtobufCodec;
use malachitebft_test::{
Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet,
};
use malachitebft_test_cli::metrics;

use crate::metrics::DbMetrics;
use crate::state::State;
use crate::store::Store;

Expand Down Expand Up @@ -117,7 +120,14 @@ impl Node for App {
)
.await?;

let store = Store::open(self.get_home_dir().join("store.db"))?;
let registry = SharedRegistry::global().with_moniker(&self.config.moniker);
let metrics = DbMetrics::register(&registry);

if self.config.metrics.enabled {
tokio::spawn(metrics::serve(self.config.metrics.listen_addr));
}

let store = Store::open(self.get_home_dir().join("store.db"), metrics)?;
let start_height = self.start_height.unwrap_or_default();
let mut state = State::new(ctx, address, start_height, store);

Expand Down
98 changes: 84 additions & 14 deletions code/examples/channel/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::mem::size_of;
use std::ops::RangeBounds;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use prost::Message;
Expand All @@ -20,6 +22,8 @@ use malachitebft_test::{Height, TestContext, Value};
mod keys;
use keys::{HeightKey, UndecidedValueKey};

use crate::metrics::DbMetrics;

#[derive(Clone, Debug)]
pub struct DecidedValue {
pub value: Value,
Expand Down Expand Up @@ -71,28 +75,47 @@ const UNDECIDED_PROPOSALS_TABLE: redb::TableDefinition<UndecidedValueKey, Vec<u8

struct Db {
db: redb::Database,
metrics: DbMetrics,
}

impl Db {
fn new(path: impl AsRef<Path>) -> Result<Self, StoreError> {
fn new(path: impl AsRef<Path>, metrics: DbMetrics) -> Result<Self, StoreError> {
Ok(Self {
db: redb::Database::create(path).map_err(StoreError::Database)?,
metrics,
})
}

fn get_decided_value(&self, height: Height) -> Result<Option<DecidedValue>, StoreError> {
let start = Instant::now();
let mut read_bytes = 0;

let tx = self.db.begin_read()?;

let value = {
let table = tx.open_table(DECIDED_VALUES_TABLE)?;
let value = table.get(&height)?;
value.and_then(|value| Value::from_bytes(&value.value()).ok())
value.and_then(|value| {
let bytes = value.value();
read_bytes = bytes.len() as u64;
Value::from_bytes(&bytes).ok()
})
};

let certificate = {
let table = tx.open_table(CERTIFICATES_TABLE)?;
let value = table.get(&height)?;
value.and_then(|value| decode_certificate(&value.value()).ok())
value.and_then(|value| {
let bytes = value.value();
read_bytes += bytes.len() as u64;
decode_certificate(&bytes).ok()
})
};

self.metrics.observe_read_time(start.elapsed());
self.metrics.add_read_bytes(read_bytes);
self.metrics.add_key_read_bytes(size_of::<Height>() as u64);

let decided_value = value
.zip(certificate)
.map(|(value, certificate)| DecidedValue { value, certificate });
Expand All @@ -101,19 +124,31 @@ impl Db {
}

fn insert_decided_value(&self, decided_value: DecidedValue) -> Result<(), StoreError> {
let height = decided_value.certificate.height;
let start = Instant::now();
let mut write_bytes = 0;

let height = decided_value.certificate.height;
let tx = self.db.begin_write()?;

{
let mut values = tx.open_table(DECIDED_VALUES_TABLE)?;
values.insert(height, decided_value.value.to_bytes()?.to_vec())?;
let values_bytes = decided_value.value.to_bytes()?.to_vec();
write_bytes += values_bytes.len() as u64;
values.insert(height, values_bytes)?;
}

{
let mut certificates = tx.open_table(CERTIFICATES_TABLE)?;
certificates.insert(height, encode_certificate(&decided_value.certificate)?)?;
let encoded_certificate = encode_certificate(&decided_value.certificate)?;
write_bytes += encoded_certificate.len() as u64;
certificates.insert(height, encoded_certificate)?;
}

tx.commit()?;

self.metrics.observe_write_time(start.elapsed());
self.metrics.add_write_bytes(write_bytes);

Ok(())
}

Expand All @@ -123,34 +158,52 @@ impl Db {
height: Height,
round: Round,
) -> Result<Option<ProposedValue<TestContext>>, StoreError> {
let start = Instant::now();
let mut read_bytes = 0;

let tx = self.db.begin_read()?;
let table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?;

let value = if let Ok(Some(value)) = table.get(&(height, round)) {
Some(
ProtobufCodec
.decode(Bytes::from(value.value()))
.map_err(StoreError::Protobuf)?,
)
let bytes = value.value();
read_bytes += bytes.len() as u64;

let proposal = ProtobufCodec
.decode(Bytes::from(bytes))
.map_err(StoreError::Protobuf)?;

Some(proposal)
} else {
None
};

self.metrics.observe_read_time(start.elapsed());
self.metrics.add_read_bytes(read_bytes);
self.metrics
.add_key_read_bytes(size_of::<(Height, Round)>() as u64);

Ok(value)
}

fn insert_undecided_proposal(
&self,
proposal: ProposedValue<TestContext>,
) -> Result<(), StoreError> {
let start = Instant::now();

let key = (proposal.height, proposal.round);
let value = ProtobufCodec.encode(&proposal)?;

let tx = self.db.begin_write()?;
{
let mut table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?;
table.insert(key, value.to_vec())?;
}
tx.commit()?;

self.metrics.observe_write_time(start.elapsed());
self.metrics.add_write_bytes(value.len() as u64);

Ok(())
}

Expand Down Expand Up @@ -185,7 +238,10 @@ impl Db {
}

fn prune(&self, retain_height: Height) -> Result<Vec<Height>, StoreError> {
let start = Instant::now();

let tx = self.db.begin_write().unwrap();

let pruned = {
let mut undecided = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?;
let keys = self.undecided_proposals_range(&undecided, ..(retain_height, Round::Nil))?;
Expand All @@ -203,15 +259,25 @@ impl Db {
}
keys
};

tx.commit()?;

self.metrics.observe_delete_time(start.elapsed());

Ok(pruned)
}

fn min_decided_value_height(&self) -> Option<Height> {
let start = Instant::now();

let tx = self.db.begin_read().unwrap();
let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap();
let (key, _) = table.first().ok()??;
let (key, value) = table.first().ok()??;

self.metrics.observe_read_time(start.elapsed());
self.metrics.add_read_bytes(value.value().len() as u64);
self.metrics.add_key_read_bytes(size_of::<Height>() as u64);

Some(key.value())
}

Expand All @@ -224,11 +290,14 @@ impl Db {

fn create_tables(&self) -> Result<(), StoreError> {
let tx = self.db.begin_write()?;

// Implicitly creates the tables if they do not exist yet
let _ = tx.open_table(DECIDED_VALUES_TABLE)?;
let _ = tx.open_table(CERTIFICATES_TABLE)?;
let _ = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?;

tx.commit()?;

Ok(())
}
}
Expand All @@ -239,8 +308,8 @@ pub struct Store {
}

impl Store {
pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
let db = Db::new(path)?;
pub fn open(path: impl AsRef<Path>, metrics: DbMetrics) -> Result<Self, StoreError> {
let db = Db::new(path, metrics)?;
db.create_tables()?;

Ok(Self { db: Arc::new(db) })
Expand All @@ -267,6 +336,7 @@ impl Store {
height: Height,
) -> Result<Option<DecidedValue>, StoreError> {
let db = Arc::clone(&self.db);

tokio::task::spawn_blocking(move || db.get_decided_value(height)).await?
}

Expand Down

0 comments on commit 43302df

Please sign in to comment.