Skip to content

Commit

Permalink
[indexer-alt] Re-implement coin balance bucket pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Jan 7, 2025
1 parent 59fa7d9 commit e4b1f1f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 17 deletions.
10 changes: 1 addition & 9 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,8 @@ pub struct PipelineLayer {
pub sum_displays: Option<SequentialLayer>,
pub sum_packages: Option<SequentialLayer>,

// Concurrent pipelines with a lagged consistent pruner which is also a concurrent pipeline.
// Use concurrent layer for the pruner pipelines so that they could override checkpoint lag if needed.
pub coin_balance_buckets: Option<CommitterLayer>,
pub coin_balance_buckets_pruner: Option<ConcurrentLayer>,

// All concurrent pipelines
pub coin_balance_buckets: Option<ConcurrentLayer>,
pub cp_sequence_numbers: Option<ConcurrentLayer>,
pub ev_emit_mod: Option<ConcurrentLayer>,
pub ev_struct_inst: Option<ConcurrentLayer>,
Expand Down Expand Up @@ -270,7 +266,6 @@ impl PipelineLayer {
sum_displays: Some(Default::default()),
sum_packages: Some(Default::default()),
coin_balance_buckets: Some(Default::default()),
coin_balance_buckets_pruner: Some(Default::default()),
cp_sequence_numbers: Some(Default::default()),
ev_emit_mod: Some(Default::default()),
ev_struct_inst: Some(Default::default()),
Expand Down Expand Up @@ -404,9 +399,6 @@ impl Merge for PipelineLayer {
sum_displays: self.sum_displays.merge(other.sum_displays),
sum_packages: self.sum_packages.merge(other.sum_packages),
coin_balance_buckets: self.coin_balance_buckets.merge(other.coin_balance_buckets),
coin_balance_buckets_pruner: self
.coin_balance_buckets_pruner
.merge(other.coin_balance_buckets_pruner),
cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers),
ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod),
ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst),
Expand Down
50 changes: 49 additions & 1 deletion crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, bail, Result};
use diesel::sql_query;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
Expand All @@ -19,7 +20,12 @@ use sui_types::{
TypeTag,
};

pub(crate) struct CoinBalanceBuckets;
use crate::consistent_pruning::PruningLookupTable;

#[derive(Default)]
pub(crate) struct CoinBalanceBuckets {
pruning_lookup_table: Arc<PruningLookupTable>,
}

pub(crate) struct ProcessedCoinBalanceBucket {
pub object_id: ObjectID,
Expand Down Expand Up @@ -134,6 +140,8 @@ impl Processor for CoinBalanceBuckets {

#[async_trait::async_trait]
impl Handler for CoinBalanceBuckets {
const PRUNING_REQUIRES_PROCESSED_VALUES: bool = true;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
let values = values
.iter()
Expand All @@ -145,6 +153,46 @@ impl Handler for CoinBalanceBuckets {
.execute(conn)
.await?)
}

// TODO: Add tests for this function.
async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
use sui_indexer_alt_schema::schema::coin_balance_buckets::dsl;

let to_prune = self.pruning_lookup_table.take(from, to_exclusive)?;

// For each (object_id, cp_sequence_number_exclusive), delete all entries with
// cp_sequence_number less than cp_sequence_number_exclusive that match the object_id.

let values = to_prune
.iter()
.map(|(object_id, seq_number)| {
let object_id_hex = hex::encode(object_id);
format!("('\\x{}'::BYTEA, {}::BIGINT)", object_id_hex, seq_number)
})
.collect::<Vec<_>>()
.join(",");
let query = format!(
"
WITH to_prune_data (object_id, cp_sequence_number_exclusive) AS (
VALUES {}
)
DELETE FROM coin_balance_buckets
USING to_prune_data
WHERE coin_balance_buckets.{:?} = to_prune_data.object_id
AND coin_balance_buckets.{:?} < to_prune_data.cp_sequence_number_exclusive
",
values,
dsl::object_id,
dsl::cp_sequence_number,
);
let rows_deleted = sql_query(query).execute(conn).await?;
Ok(rows_deleted)
}
}

impl FieldCount for ProcessedCoinBalanceBucket {
Expand Down
8 changes: 1 addition & 7 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use anyhow::Context;
use bootstrap::bootstrap;
use config::{ConsistencyConfig, IndexerConfig, PipelineLayer};
use handlers::coin_balance_buckets::CoinBalanceBuckets;
use handlers::coin_balance_buckets_pruner::CoinBalanceBucketsPruner;
use handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags,
Expand Down Expand Up @@ -59,7 +58,6 @@ pub async fn start_indexer(
sum_displays,
sum_packages,
coin_balance_buckets,
coin_balance_buckets_pruner,
cp_sequence_numbers,
ev_emit_mod,
ev_struct_inst,
Expand Down Expand Up @@ -190,12 +188,8 @@ pub async fn start_indexer(
add_sequential!(SumDisplays, sum_displays);
add_sequential!(SumPackages, sum_packages);

add_consistent!(
CoinBalanceBuckets, coin_balance_buckets;
CoinBalanceBucketsPruner, coin_balance_buckets_pruner
);

// Unpruned concurrent pipelines
add_concurrent!(CoinBalanceBuckets::default(), coin_balance_buckets);
add_concurrent!(CpSequenceNumbers, cp_sequence_numbers);
add_concurrent!(EvEmitMod, ev_emit_mod);
add_concurrent!(EvStructInst, ev_struct_inst);
Expand Down

0 comments on commit e4b1f1f

Please sign in to comment.