Skip to content

Commit

Permalink
indexer-alt: watermark timestamps and other logging improvements
Browse files Browse the repository at this point in the history
## Description

Track the timestamp associated with a watermark, include it in tracing
messages, and introduce gauges to track the timestamps of the latest
checkpoint gathered and written to the DB.

This change also introduces a couple of other timestamp/watermark
related tracing improvements:

- Measure watermarks gathered and in DB for the sequential pipeline
  (previously just tracked the watermark after they were written to the
  DB). These will usually be the exact same, but may differ if the
  pipeline needs to retry a write.
- Gets rid of the ordering and equality impls for `CommitterWatermark`
  -- these are no longer required.
- Standardise the ordering of watermark traces between the sequential
  and concurrent pipelines, so that they are easier to compare with each
  other (they now share a common prefix).
- Standardised tracing messages during committer/watermark teardown:
  Now in each exit edge, we log the reason for the exit, and then a
  message with the final watermark.

## Test plan

Run `sum_obj_types` and `wal_obj_types` with a consistent range of
`3600` and note that the difference in their timestamps is roughly an
hour (to begin with, it's about 70 minutes):

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url 'postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt' \
  indexer --remote-store-url 'https://checkpoints.mainnet.sui.io'                \
  --consistent-range 3600 --pipeline wal_obj_types --pipeline sum_obj_types`
     Running `/Users/ashokmenon/sui/idx-poc/target/release/sui-indexer-alt --database-url 'postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt' indexer --remote-store-url 'https://checkpoints.mainnet.sui.io' --consistent-range 3600 --pipeline wal_obj_types --pipeline sum_obj_types`
2024-10-31T15:16:26.867221Z  INFO sui_indexer_alt::db: Running migrations ...
2024-10-31T15:16:26.889145Z  INFO sui_indexer_alt::db: Migrations complete.
2024-10-31T15:16:27.024406Z  INFO sui_indexer_alt: Skipping pipeline ev_emit_mod
2024-10-31T15:16:27.024413Z  INFO sui_indexer_alt: Skipping pipeline ev_struct_inst
2024-10-31T15:16:27.024414Z  INFO sui_indexer_alt: Skipping pipeline kv_checkpoints
2024-10-31T15:16:27.024414Z  INFO sui_indexer_alt: Skipping pipeline kv_objects
2024-10-31T15:16:27.024415Z  INFO sui_indexer_alt: Skipping pipeline kv_transactions
2024-10-31T15:16:27.024416Z  INFO sui_indexer_alt: Skipping pipeline obj_versions
2024-10-31T15:16:27.024416Z  INFO sui_indexer_alt: Skipping pipeline tx_affected_objects
2024-10-31T15:16:27.024417Z  INFO sui_indexer_alt: Skipping pipeline tx_balance_changes
2024-10-31T15:16:27.024417Z  INFO sui_indexer_alt: Skipping pipeline wal_coin_balances
2024-10-31T15:16:27.033550Z  INFO sui_indexer_alt: Skipping pipeline sum_coin_balances
2024-10-31T15:16:27.033607Z  INFO sui_indexer_alt::pipeline::processor: Starting processor pipeline="wal_obj_types"
2024-10-31T15:16:27.033856Z  INFO sui_indexer_alt::pipeline::concurrent::collector: Starting collector pipeline="wal_obj_types"
2024-10-31T15:16:27.033869Z  INFO sui_indexer_alt::pipeline::concurrent::committer: Starting committer pipeline="wal_obj_types"
2024-10-31T15:16:27.033983Z  INFO sui_indexer_alt: Skipping pipeline sum_packages
2024-10-31T15:16:27.034011Z  INFO sui_indexer_alt::pipeline::processor: Starting processor pipeline="sum_obj_types"
2024-10-31T15:16:27.034183Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Starting watermark pipeline="wal_obj_types" watermark=CommitterWatermark { pipeline: "wal_obj_types", epoch_hi_inclusive: 0, checkpoint_hi_inclusive: 0, tx_hi: 0, timestamp_ms_hi_inclusive: 0 }
2024-10-31T15:16:27.034260Z  INFO sui_indexer_alt::pipeline::sequential::committer: Starting committer pipeline="sum_obj_types" watermark=CommitterWatermark { pipeline: "sum_obj_types", epoch_hi_inclusive: 0, checkpoint_hi_inclusive: 0, tx_hi: 0, timestamp_ms_hi_inclusive: 0 }
2024-10-31T15:16:27.034305Z  INFO sui_indexer_alt: Ingestion range first_checkpoint=0 last_checkpoint=None
2024-10-31T15:16:27.034339Z  INFO sui_indexer_alt::metrics: Starting metrics service on 0.0.0.0:9184
2024-10-31T15:16:27.034351Z  INFO sui_indexer_alt::ingestion::regulator: Starting ingestion regulator
2024-10-31T15:16:27.034355Z  INFO sui_indexer_alt::ingestion::broadcaster: Starting ingestion broadcaster
2024-10-31T15:16:27.541400Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=212 transaction=213 timestamp=2023-04-13 13:28:25.836 UTC updated=true elapsed_ms=5.486208
2024-10-31T15:16:28.043533Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=515 transaction=516 timestamp=2023-04-13 13:36:17.240 UTC updated=true elapsed_ms=5.512666
2024-10-31T15:16:28.538421Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=1017 transaction=1018 timestamp=2023-04-13 13:48:04.584 UTC updated=true elapsed_ms=3.468209
2024-10-31T15:16:29.041788Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=1422 transaction=1423 timestamp=2023-04-13 13:57:11.577 UTC updated=true elapsed_ms=7.184292
2024-10-31T15:16:29.540675Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=1824 transaction=1825 timestamp=2023-04-13 14:05:59.154 UTC updated=true elapsed_ms=5.7144580000000005
2024-10-31T15:16:30.040987Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=2226 transaction=2227 timestamp=2023-04-13 14:14:54.721 UTC updated=true elapsed_ms=5.955416
2024-10-31T15:16:30.539394Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=2630 transaction=2631 timestamp=2023-04-13 14:23:21.636 UTC updated=true elapsed_ms=5.0003340000000005
2024-10-31T15:16:31.035944Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=3142 transaction=3143 timestamp=2023-04-13 14:34:00.192 UTC updated=true elapsed_ms=1.368042
2024-10-31T15:16:31.532346Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=71 transaction=72 timestamp=2023-04-13 13:24:36.662 UTC
2024-10-31T15:16:31.535237Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=3646 transaction=3647 timestamp=2023-04-13 14:45:19.227 UTC updated=true elapsed_ms=0.7080409999999999
2024-10-31T15:16:31.556459Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=103 transaction=104 timestamp=2023-04-13 13:25:35.491 UTC
2024-10-31T15:16:31.605247Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=151 transaction=152 timestamp=2023-04-13 13:27:01.087 UTC
2024-10-31T15:16:31.658676Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=203 transaction=204 timestamp=2023-04-13 13:28:14.431 UTC
2024-10-31T15:16:31.709093Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=251 transaction=252 timestamp=2023-04-13 13:29:41.350 UTC
2024-10-31T15:16:31.774101Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=304 transaction=305 timestamp=2023-04-13 13:31:05.962 UTC
2024-10-31T15:16:31.823847Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=351 transaction=352 timestamp=2023-04-13 13:32:06.748 UTC
2024-10-31T15:16:31.875613Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=404 transaction=405 timestamp=2023-04-13 13:33:26.122 UTC
2024-10-31T15:16:31.923673Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=452 transaction=453 timestamp=2023-04-13 13:34:41.672 UTC
2024-10-31T15:16:31.973523Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=502 transaction=503 timestamp=2023-04-13 13:35:54.245 UTC
2024-10-31T15:16:32.020968Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=553 transaction=554 timestamp=2023-04-13 13:37:12.723 UTC
2024-10-31T15:16:32.034336Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=4147 transaction=4148 timestamp=2023-04-13 14:56:30.470 UTC updated=true elapsed_ms=0.478916
2024-10-31T15:16:32.081115Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=603 transaction=604 timestamp=2023-04-13 13:38:26.226 UTC
2024-10-31T15:16:32.143228Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=657 transaction=658 timestamp=2023-04-13 13:39:32.583 UTC
2024-10-31T15:16:32.201323Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=710 transaction=711 timestamp=2023-04-13 13:40:44.461 UTC
2024-10-31T15:16:32.239715Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=751 transaction=752 timestamp=2023-04-13 13:41:40.955 UTC
2024-10-31T15:16:32.306828Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=803 transaction=804 timestamp=2023-04-13 13:43:00.433 UTC
2024-10-31T15:16:32.351995Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=854 transaction=855 timestamp=2023-04-13 13:44:21.487 UTC
2024-10-31T15:16:32.408044Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=903 transaction=904 timestamp=2023-04-13 13:45:33.468 UTC
2024-10-31T15:16:32.478623Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=963 transaction=964 timestamp=2023-04-13 13:46:48.531 UTC
2024-10-31T15:16:32.517822Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1004 transaction=1005 timestamp=2023-04-13 13:47:43.997 UTC
2024-10-31T15:16:32.540701Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=4550 transaction=4551 timestamp=2023-04-13 15:05:30.690 UTC updated=true elapsed_ms=5.776375
2024-10-31T15:16:32.574600Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1052 transaction=1053 timestamp=2023-04-13 13:48:55.253 UTC
2024-10-31T15:16:32.625726Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1102 transaction=1103 timestamp=2023-04-13 13:49:57.288 UTC
2024-10-31T15:16:32.679135Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1159 transaction=1160 timestamp=2023-04-13 13:51:06.819 UTC
2024-10-31T15:16:32.729858Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1203 transaction=1204 timestamp=2023-04-13 13:52:11.201 UTC
2024-10-31T15:16:32.777044Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1257 transaction=1258 timestamp=2023-04-13 13:53:26.750 UTC
2024-10-31T15:16:32.819603Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1301 transaction=1302 timestamp=2023-04-13 13:54:17.123 UTC
2024-10-31T15:16:32.870028Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1351 transaction=1352 timestamp=2023-04-13 13:55:24.124 UTC
2024-10-31T15:16:32.923059Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1402 transaction=1403 timestamp=2023-04-13 13:56:49.078 UTC
2024-10-31T15:16:32.969159Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1451 transaction=1452 timestamp=2023-04-13 13:57:56.351 UTC
2024-10-31T15:16:33.034831Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Watermark pipeline="wal_obj_types" epoch=0 checkpoint=5055 transaction=5056 timestamp=2023-04-13 15:16:38.038 UTC updated=true elapsed_ms=1.0442500000000001
2024-10-31T15:16:33.036266Z  INFO sui_indexer_alt::pipeline::sequential::committer: Watermark pipeline="sum_obj_types" epoch=0 checkpoint=1501 transaction=1502 timestamp=2023-04-13 13:58:56.227 UTC
^C2024-10-31T15:16:33.091189Z  INFO sui_indexer_alt::pipeline::concurrent::collector: Shutdown received, stopping collector pipeline="wal_obj_types"
2024-10-31T15:16:33.091195Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Shutdown received pipeline="wal_obj_types"
2024-10-31T15:16:33.091201Z  INFO sui_indexer_alt::ingestion::regulator: Shutdown received, stopping regulator
2024-10-31T15:16:33.091213Z  INFO sui_indexer_alt::metrics: Shutdown received, stopping metrics service
2024-10-31T15:16:33.091225Z  INFO sui_indexer_alt::pipeline::concurrent::watermark: Stopping watermark task pipeline="wal_obj_types" watermark=CommitterWatermark { pipeline: "wal_obj_types", epoch_hi_inclusive: 0, checkpoint_hi_inclusive: 5055, tx_hi: 5056, timestamp_ms_hi_inclusive: 1681398998038 }
2024-10-31T15:16:33.091410Z  INFO sui_indexer_alt::pipeline::concurrent::committer: Batches done, stopping committer pipeline="wal_obj_types"
2024-10-31T15:16:33.091721Z  INFO sui_indexer_alt::pipeline::sequential::committer: Shutdown received pipeline="sum_obj_types"
2024-10-31T15:16:33.091725Z  INFO sui_indexer_alt::pipeline::sequential::committer: Stopping committer pipeline="sum_obj_types" watermark=CommitterWatermark { pipeline: "sum_obj_types", epoch_hi_inclusive: 0, checkpoint_hi_inclusive: 1546, tx_hi: 1547, timestamp_ms_hi_inclusive: 1681394391225 }
2024-10-31T15:16:33.093829Z  INFO sui_indexer_alt::pipeline::processor: Shutdown received, stopping processor pipeline="wal_obj_types"
2024-10-31T15:16:33.094014Z  INFO sui_indexer_alt::pipeline::processor: Shutdown received, stopping processor pipeline="sum_obj_types"
2024-10-31T15:16:33.094406Z  INFO sui_indexer_alt::ingestion::broadcaster: Shutdown received, stopping ingestion broadcaster
2024-10-31T15:16:33.094513Z  INFO sui_indexer_alt: Indexing pipeline gracefully shut down
```
  • Loading branch information
amnn committed Nov 1, 2024
1 parent 9b8c9fb commit 6736f1d
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ axum.workspace = true
backoff.workspace = true
bb8 = "0.8.5"
bcs.workspace = true
chrono.workspace = true
clap.workspace = true
diesel = { workspace = true, features = ["chrono"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ CREATE TABLE IF NOT EXISTS watermarks
-- Exclusive upper transaction sequence number bound for this entity's
-- data. Committer updates this field.
tx_hi BIGINT NOT NULL,
-- Inclusive upper timestamp bound (in milliseconds). Committer updates
-- this field once it can guarantee that all checkpoints at or before this
-- timestamp have been written to the database.
timestamp_ms_hi_inclusive BIGINT NOT NULL,
-- Inclusive lower epoch bound for this entity's data. Pruner updates this
-- field when the epoch range exceeds the retention policy.
epoch_lo BIGINT NOT NULL,
Expand All @@ -27,7 +31,7 @@ CREATE TABLE IF NOT EXISTS watermarks
-- some data needs to be dropped. The pruner uses this column to determine
-- whether to prune or wait long enough that all in-flight reads complete
-- or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
pruner_timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data below this
-- watermark can be immediately pruned.
pruner_hi BIGINT NOT NULL
Expand Down
16 changes: 16 additions & 0 deletions crates/sui-indexer-alt/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ pub struct IndexerMetrics {
pub watermark_epoch: IntGaugeVec,
pub watermark_checkpoint: IntGaugeVec,
pub watermark_transaction: IntGaugeVec,
pub watermark_timestamp_ms: IntGaugeVec,

pub watermark_epoch_in_db: IntGaugeVec,
pub watermark_checkpoint_in_db: IntGaugeVec,
pub watermark_transaction_in_db: IntGaugeVec,
pub watermark_timestamp_in_db_ms: IntGaugeVec,
}

/// Collects information about the database connection pool.
Expand Down Expand Up @@ -344,6 +346,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
"indexer_watermark_timestamp_ms",
"Current timestamp high watermark for this committer, in milliseconds",
&["pipeline"],
registry,
)
.unwrap(),
watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
"indexer_watermark_epoch_in_db",
"Last epoch high watermark this committer wrote to the DB",
Expand All @@ -365,6 +374,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
"indexer_watermark_timestamp_ms_in_db",
"Last timestamp high watermark this committer wrote to the DB, in milliseconds",
&["pipeline"],
registry,
)
.unwrap(),
}
}

Expand Down
42 changes: 15 additions & 27 deletions crates/sui-indexer-alt/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{borrow::Cow, cmp};
use std::borrow::Cow;

use crate::{db::Connection, schema::watermarks};
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;

use crate::{db::Connection, schema::watermarks};

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = watermarks)]
pub struct StoredWatermark {
pub pipeline: String,
pub epoch_hi_inclusive: i64,
pub checkpoint_hi_inclusive: i64,
pub tx_hi: i64,
pub timestamp_ms_hi_inclusive: i64,
pub epoch_lo: i64,
pub reader_lo: i64,
pub timestamp_ms: i64,
pub pruner_timestamp_ms: i64,
pub pruner_hi: i64,
}

Expand All @@ -28,6 +31,7 @@ pub struct CommitterWatermark<'p> {
pub epoch_hi_inclusive: i64,
pub checkpoint_hi_inclusive: i64,
pub tx_hi: i64,
pub timestamp_ms_hi_inclusive: i64,
}

impl CommitterWatermark<'static> {
Expand All @@ -53,9 +57,15 @@ impl<'p> CommitterWatermark<'p> {
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: 0,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
}
}

/// The consensus timestamp associated with this checkpoint.
pub fn timestamp(&self) -> DateTime<Utc> {
DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive).unwrap_or_default()
}

/// Upsert the high watermark as long as it raises the watermark stored in the database.
/// Returns a boolean indicating whether the watermark was actually updated or not.
///
Expand All @@ -81,33 +91,11 @@ impl<'p> From<CommitterWatermark<'p>> for StoredWatermark {
epoch_hi_inclusive: watermark.epoch_hi_inclusive,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
tx_hi: watermark.tx_hi,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
epoch_lo: 0,
reader_lo: 0,
timestamp_ms: 0,
pruner_timestamp_ms: 0,
pruner_hi: 0,
}
}
}

// Ordering for watermarks is driven solely by their checkpoints.

impl PartialEq for CommitterWatermark<'_> {
fn eq(&self, other: &Self) -> bool {
self.checkpoint_hi_inclusive == other.checkpoint_hi_inclusive
}
}

impl Eq for CommitterWatermark<'_> {}

impl Ord for CommitterWatermark<'_> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.checkpoint_hi_inclusive
.cmp(&other.checkpoint_hi_inclusive)
}
}

impl PartialOrd for CommitterWatermark<'_> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
27 changes: 21 additions & 6 deletions crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(super) fn watermark<H: Handler + 'static>(
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!(pipeline = H::NAME, "Shutdown received, stopping watermark");
info!(pipeline = H::NAME, "Shutdown received");
break;
}

Expand Down Expand Up @@ -165,10 +165,16 @@ pub(super) fn watermark<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.set(watermark.tx_hi);

metrics
.watermark_timestamp_ms
.with_label_values(&[H::NAME])
.set(watermark.timestamp_ms_hi_inclusive);

debug!(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
watermark = watermark.checkpoint_hi_inclusive,
timestamp = %watermark.timestamp(),
pending = precommitted.len(),
"Gathered watermarks",
);
Expand Down Expand Up @@ -210,27 +216,34 @@ pub(super) fn watermark<H: Handler + 'static>(
.watermark_transaction_in_db
.with_label_values(&[H::NAME])
.set(watermark.tx_hi);

metrics
.watermark_timestamp_in_db_ms
.with_label_values(&[H::NAME])
.set(watermark.timestamp_ms_hi_inclusive);
}

if watermark.checkpoint_hi_inclusive > next_loud_watermark_update {
next_loud_watermark_update += LOUD_WATERMARK_UPDATE_INTERVAL;
info!(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
updated,
epoch = watermark.epoch_hi_inclusive,
checkpoint = watermark.checkpoint_hi_inclusive,
transaction = watermark.tx_hi,
timestamp = %watermark.timestamp(),
updated,
elapsed_ms = elapsed * 1000.0,
"Watermark",
);
} else {
debug!(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
updated,
epoch = watermark.epoch_hi_inclusive,
checkpoint = watermark.checkpoint_hi_inclusive,
transaction = watermark.tx_hi,
timestamp = %watermark.timestamp(),
updated,
elapsed_ms = elapsed * 1000.0,
"Watermark",
);
}
Expand All @@ -239,7 +252,7 @@ pub(super) fn watermark<H: Handler + 'static>(
}

if rx.is_closed() && rx.is_empty() {
info!(pipeline = H::NAME, ?watermark, "Committer closed channel, stopping watermark task");
info!(pipeline = H::NAME, "Committer closed channel");
break;
}
}
Expand All @@ -259,5 +272,7 @@ pub(super) fn watermark<H: Handler + 'static>(
}
}
}

info!(pipeline = H::NAME, ?watermark, "Stopping watermark task");
})
}
9 changes: 8 additions & 1 deletion crates/sui-indexer-alt/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,20 @@ enum Break {
}

impl<P: Processor> Indexed<P> {
fn new(epoch: u64, cp_sequence_number: u64, tx_hi: u64, values: Vec<P::Value>) -> Self {
fn new(
epoch: u64,
cp_sequence_number: u64,
tx_hi: u64,
timestamp_ms: u64,
values: Vec<P::Value>,
) -> Self {
Self {
watermark: CommitterWatermark {
pipeline: P::NAME.into(),
epoch_hi_inclusive: epoch as i64,
checkpoint_hi_inclusive: cp_sequence_number as i64,
tx_hi: tx_hi as i64,
timestamp_ms_hi_inclusive: timestamp_ms as i64,
},
values,
}
Expand Down
13 changes: 10 additions & 3 deletions crates/sui-indexer-alt/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub(super) fn processor<P: Processor + 'static>(
let epoch = checkpoint.checkpoint_summary.epoch;
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
let tx_hi = checkpoint.checkpoint_summary.network_total_transactions;
let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;

debug!(
pipeline = P::NAME,
Expand All @@ -95,9 +96,15 @@ pub(super) fn processor<P: Processor + 'static>(
.with_label_values(&[P::NAME])
.inc_by(values.len() as u64);

tx.send(Indexed::new(epoch, cp_sequence_number, tx_hi, values))
.await
.map_err(|_| Break::Cancel)?;
tx.send(Indexed::new(
epoch,
cp_sequence_number,
tx_hi,
timestamp_ms,
values,
))
.await
.map_err(|_| Break::Cancel)?;

Ok(())
}
Expand Down
Loading

0 comments on commit 6736f1d

Please sign in to comment.