Skip to content

Commit

Permalink
Remove checkpoint lag and consistency configs
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Jan 7, 2025
1 parent e4b1f1f commit 0ed3b65
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 253 deletions.
116 changes: 2 additions & 114 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
/// closed.
pub(super) fn collector<H: Handler + 'static>(
config: CommitterConfig,
checkpoint_lag: Option<u64>,
mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
tx: mpsc::Sender<BatchedRows<H>>,
metrics: Arc<IndexerMetrics>,
Expand All @@ -93,10 +92,6 @@ pub(super) fn collector<H: Handler + 'static>(
let mut poll = interval(config.collect_interval());
poll.set_missed_tick_behavior(MissedTickBehavior::Delay);

// Data for checkpoints that have been received but not yet ready to be sent to committer due to lag constraint.
let mut received: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
let checkpoint_lag = checkpoint_lag.unwrap_or_default();

let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
&metrics.collected_checkpoint_timestamp_lag,
&metrics.latest_collected_checkpoint_timestamp_lag_ms,
Expand Down Expand Up @@ -186,8 +181,8 @@ pub(super) fn collector<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.inc();

received.insert(indexed.checkpoint(), indexed);
pending_rows += move_ready_checkpoints(&mut received, &mut pending, checkpoint_lag);
pending_rows += indexed.len();
pending.insert(indexed.checkpoint(), indexed.into());

if pending_rows >= H::MIN_EAGER_ROWS {
poll.reset_immediately()
Expand All @@ -198,34 +193,6 @@ pub(super) fn collector<H: Handler + 'static>(
})
}

/// Move all checkpoints from `received` that are within the lag range into `pending`.
/// Returns the number of rows moved.
fn move_ready_checkpoints<H: Handler>(
received: &mut BTreeMap<u64, IndexedCheckpoint<H>>,
pending: &mut BTreeMap<u64, PendingCheckpoint<H>>,
checkpoint_lag: u64,
) -> usize {
let tip = match (received.last_key_value(), pending.last_key_value()) {
(Some((cp, _)), None) | (None, Some((cp, _))) => *cp,
(Some((cp1, _)), Some((cp2, _))) => std::cmp::max(*cp1, *cp2),
(None, None) => return 0,
};

let mut moved_rows = 0;
while let Some(entry) = received.first_entry() {
let cp = *entry.key();
if cp + checkpoint_lag > tip {
break;
}

let indexed = entry.remove();
moved_rows += indexed.len();
pending.insert(cp, indexed.into());
}

moved_rows
}

#[cfg(test)]
mod tests {
use sui_field_count::FieldCount;
Expand Down Expand Up @@ -271,82 +238,6 @@ mod tests {
}
}

#[test]
fn test_move_ready_checkpoints_empty() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 10);
assert_eq!(moved, 0);
assert!(received.is_empty());
assert!(pending.is_empty());
}

#[test]
fn test_move_ready_checkpoints_within_lag() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();

// Add checkpoints 1-5 to received
for i in 1..=5 {
received.insert(
i,
IndexedCheckpoint::new(0, i, 0, 0, vec![Entry, Entry, Entry]),
);
}

// With lag of 2 and tip at 5, only checkpoints 1-3 should move
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 2);

assert_eq!(moved, 9); // 3 checkpoints * 3 rows each
assert_eq!(received.len(), 2); // 4,5 remain
assert_eq!(pending.len(), 3); // 1,2,3 moved
assert!(pending.contains_key(&1));
assert!(pending.contains_key(&2));
assert!(pending.contains_key(&3));
}

#[test]
fn test_move_ready_checkpoints_tip_from_pending() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();

// Add checkpoint 10 to pending to establish tip
pending.insert(
10,
PendingCheckpoint::from(IndexedCheckpoint::new(0, 10, 0, 0, vec![Entry])),
);

// Add checkpoints 1-5 to received
for i in 1..=5 {
received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry]));
}

// With lag of 3 and tip at 10, checkpoints 1-7 can move
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 3);

assert_eq!(moved, 5); // All 5 checkpoints moved, 1 row each
assert!(received.is_empty());
assert_eq!(pending.len(), 6); // Original + 5 new
}

#[test]
fn test_move_ready_checkpoints_no_eligible() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();

// Add checkpoints 8-10 to received
for i in 8..=10 {
received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry]));
}

// With lag of 5 and tip at 10, no checkpoints can move
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 5);

assert_eq!(moved, 0);
assert_eq!(received.len(), 3);
assert!(pending.is_empty());
}

#[tokio::test]
async fn test_collector_batches_data() {
let (processor_tx, processor_rx) = mpsc::channel(10);
Expand All @@ -356,7 +247,6 @@ mod tests {

let _collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics,
Expand Down Expand Up @@ -399,7 +289,6 @@ mod tests {

let collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics,
Expand Down Expand Up @@ -440,7 +329,6 @@ mod tests {

let _collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ pub struct ConcurrentConfig {

/// Configuration for the pruner, that deletes old data.
pub pruner: Option<PrunerConfig>,

/// How many checkpoints lagged behind latest seen checkpoint to hold back writes for.
/// This is useful if pruning is implemented as a concurrent pipeline, and it must be behind
/// the pipeline it tries to prune from by a certain number of checkpoints, to ensure
/// consistency reads remain valid for a certain amount of time.
pub checkpoint_lag: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -204,7 +198,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
let ConcurrentConfig {
committer: committer_config,
pruner: pruner_config,
checkpoint_lag,
} = config;

let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
Expand All @@ -230,7 +223,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(

let collector = collector::<H>(
committer_config.clone(),
checkpoint_lag,
collector_rx,
collector_tx,
metrics.clone(),
Expand Down
Loading

0 comments on commit 0ed3b65

Please sign in to comment.