diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs index 0c55873c3ab41..d1e9ed8e7ea6d 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs @@ -116,7 +116,7 @@ pub(super) fn committer( // because of checkpoint lag. // // TODO(amnn): Test this (depends on migrations and tempdb) - let commit_hi = match (checkpoint_lag, pending.last_key_value()) { + let commit_hi_inclusive = match (checkpoint_lag, pending.last_key_value()) { (Some(lag), None) => { debug!(pipeline = H::NAME, lag, "No pending checkpoints"); if rx.is_closed() && rx.is_empty() { @@ -157,7 +157,7 @@ pub(super) fn committer( break; }; - if matches!(commit_hi, Some(hi) if hi < *entry.key()) { + if matches!(commit_hi_inclusive, Some(hi) if hi < *entry.key()) { break; } @@ -319,26 +319,21 @@ pub(super) fn committer( batch_rows = 0; attempt = 0; - // Keep going if we might get more rows from the processor. - if !rx.is_closed() || !rx.is_empty() { - continue; - } - - // If there are no more pending checkpoints, stop processing, because there are - // no more coming. - let Some((next, _)) = pending.first_key_value() else { - break; - }; - - // If there is a gap between the next checkpoint and the pending checkpoint, - // stop because there are no more coming. - if next_checkpoint < *next { - break; - } - - // If checkpoint lag is being enforced, and the pending checkpoint is being - // held back, stop because `commit_hi` will never progress. - if matches!(commit_hi, Some(hi) if hi < *next) { + // If there is a pending checkpoint, no greater than the expected next + // checkpoint, and less than or equal to the inclusive upperbound due to + // checkpoint lag, then the pipeline can do more work immediately (without + // waiting). + // + // Otherwise, if its channels have been closed, we know that it is guaranteed + // not to make any more progress, and we can stop the task. + if pending + .first_key_value() + .is_some_and(|(next, _)| { + *next <= next_checkpoint && commit_hi_inclusive.map_or(true, |hi| *next <= hi) + }) + { + poll.reset_immediately(); + } else if rx.is_closed() && rx.is_empty() { break; } } @@ -365,8 +360,14 @@ pub(super) fn committer( continue; }; - if *next <= next_checkpoint { - poll.reset_immediately(); + match (checkpoint_lag, pending.last_key_value()) { + (Some(_), None) => continue, + (Some(lag), Some((last, _))) if last.saturating_sub(lag) <= *next => { + continue; + } + _ => if *next <= next_checkpoint { + poll.reset_immediately(); + } } } }