Skip to content

Commit

Permalink
indexer-alt: allow sequential pipeline immediate reset (#20119)
Browse files Browse the repository at this point in the history
## Description

If the sequential pipeline committer can guarantee that it could process
more checkpoints by looking at its pending buffer, we now reset the
polling interval immediately, so it does not wait to issue the next
write. This mimics a similar behaviour in the concurrent pipeline.

I made this change after noticing how the pipeline behaves when
ingestion is stuck retrying a checkpoint, on my local machine. Usually
when running locally, performance is limited by checkpoint download
rate, but in a sequential pipeline, if a checkpoint failed to download,
it is possible for many checkpoints to end up processed and pending.

With the previous implementation, once ingestion had recovered (the
checkpoint is fetched), the pending buffer kept growing because it was
only able to land `MAX_BATCH_CHECKPOINTS / commit_interval`, so if
checkpoints were getting added faster than that, it would never recover.

With this change, the pipeline recovers almost instantly, and I expect
that in GCP where bandwidth is not the rate limiting factor, this should
improve throughput during backfill, and synthetic benchmarks.

## Test plan

Run the indexer with a large ingestion buffer and concurrency, wait for
ingestion to fail to fetch a checkpoint, and then notice how the
situation recovers (instead of getting worse until the pipeline
eventually complains that it has too many pending checkpoints):

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                  \
  --last-checkpoint 1200000 --pipeline sum_packages                              \
  --checkpoint-buffer-size 50000 --ingest-concurrency 20000
```

## Stack

- #20089 
- #20114 
- #20116 
- #20117 

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn authored Nov 1, 2024
1 parent eb0305f commit 9b8c9fb
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions crates/sui-indexer-alt/src/pipeline/sequential/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub(super) fn committer<H: Handler + 'static>(
// because of checkpoint lag.
//
// TODO(amnn): Test this (depends on migrations and tempdb)
let commit_hi = match (checkpoint_lag, pending.last_key_value()) {
let commit_hi_inclusive = match (checkpoint_lag, pending.last_key_value()) {
(Some(lag), None) => {
debug!(pipeline = H::NAME, lag, "No pending checkpoints");
if rx.is_closed() && rx.is_empty() {
Expand Down Expand Up @@ -157,7 +157,7 @@ pub(super) fn committer<H: Handler + 'static>(
break;
};

if matches!(commit_hi, Some(hi) if hi < *entry.key()) {
if matches!(commit_hi_inclusive, Some(hi) if hi < *entry.key()) {
break;
}

Expand Down Expand Up @@ -319,26 +319,21 @@ pub(super) fn committer<H: Handler + 'static>(
batch_rows = 0;
attempt = 0;

// Keep going if we might get more rows from the processor.
if !rx.is_closed() || !rx.is_empty() {
continue;
}

// If there are no more pending checkpoints, stop processing, because there are
// no more coming.
let Some((next, _)) = pending.first_key_value() else {
break;
};

// If there is a gap between the next checkpoint and the pending checkpoint,
// stop because there are no more coming.
if next_checkpoint < *next {
break;
}

// If checkpoint lag is being enforced, and the pending checkpoint is being
// held back, stop because `commit_hi` will never progress.
if matches!(commit_hi, Some(hi) if hi < *next) {
// If there is a pending checkpoint, no greater than the expected next
// checkpoint, and less than or equal to the inclusive upperbound due to
// checkpoint lag, then the pipeline can do more work immediately (without
// waiting).
//
// Otherwise, if its channels have been closed, we know that it is guaranteed
// not to make any more progress, and we can stop the task.
if pending
.first_key_value()
.is_some_and(|(next, _)| {
*next <= next_checkpoint && commit_hi_inclusive.map_or(true, |hi| *next <= hi)
})
{
poll.reset_immediately();
} else if rx.is_closed() && rx.is_empty() {
break;
}
}
Expand All @@ -365,8 +360,14 @@ pub(super) fn committer<H: Handler + 'static>(
continue;
};

if *next <= next_checkpoint {
poll.reset_immediately();
match (checkpoint_lag, pending.last_key_value()) {
(Some(_), None) => continue,
(Some(lag), Some((last, _))) if last.saturating_sub(lag) <= *next => {
continue;
}
_ => if *next <= next_checkpoint {
poll.reset_immediately();
}
}
}
}
Expand Down

0 comments on commit 9b8c9fb

Please sign in to comment.