Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more logs for l1infotreesyncer #262

Merged
merged 9 commits into from
Jan 13, 2025
4 changes: 2 additions & 2 deletions .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ jobs:
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
path: "kurtosis-cdk"
ref: "v0.2.19"
path: kurtosis-cdk
ref: v0.2.25

- name: Setup Bats and bats libs
uses: bats-core/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion bridgesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestBridgeEventE2E(t *testing.T) {
dbPathReorg := path.Join(t.TempDir(), "file::memory:?cache=shared")

client, setup := helpers.SimulatedBackend(t, nil, 0)
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg}, reorgdetector.L1)
require.NoError(t, err)

go rd.Start(ctx) //nolint:errcheck
Expand Down
7 changes: 4 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ func newState(c *config.Config, l2ChainID uint64, sqlDB *pgxpool.Pool) *state.St
func newReorgDetector(
cfg *reorgdetector.Config,
client *ethclient.Client,
network reorgdetector.Network,
) *reorgdetector.ReorgDetector {
rd, err := reorgdetector.New(client, *cfg)
rd, err := reorgdetector.New(client, *cfg, network)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -600,7 +601,7 @@ func runReorgDetectorL1IfNeeded(
components) {
return nil, nil
}
rd := newReorgDetector(cfg, l1Client)
rd := newReorgDetector(cfg, l1Client, reorgdetector.L1)

errChan := make(chan error)
go func() {
Expand All @@ -622,7 +623,7 @@ func runReorgDetectorL2IfNeeded(
if !isNeeded([]string{cdkcommon.AGGORACLE, cdkcommon.RPC, cdkcommon.AGGSENDER}, components) {
return nil, nil
}
rd := newReorgDetector(cfg, l2Client)
rd := newReorgDetector(cfg, l2Client, reorgdetector.L2)

errChan := make(chan error)
go func() {
Expand Down
56 changes: 34 additions & 22 deletions crates/cdk/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use regex::Regex;
use reqwest::blocking::get;
use std::env;
use std::fs::File;
use std::io::Write;
use std::io::{self, Write};
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use serde_json::Value;

fn main() {
let _ = build_versions();
Expand Down Expand Up @@ -55,45 +56,56 @@ fn main() {
}

// build_versions retrieves the versions from the Starlark file and embeds them in the binary.
fn build_versions() -> std::io::Result<()> {
// Retrieve the contents of the file from the URL
fn build_versions() -> io::Result<()> {
// URL of the Starlark file
let url = "https://raw.githubusercontent.com/0xPolygon/kurtosis-cdk/refs/heads/main/input_parser.star";

// Download the file content
let response = get(url).expect("Failed to send request");
let content = response.text().expect("Failed to read response text");

// Write the contents to a file
let out_dir = std::env::var("OUT_DIR").unwrap();
let dest_path = Path::new(&out_dir).join("input_parser.star");
let mut file = File::create(&dest_path)?;
file.write_all(content.as_bytes())?;

// Get the corresponding lines from the contents of the starlark file
let versions = content
// Extract the relevant lines (skip the first 30 lines, take the next 15)
let raw_versions = content
.lines()
.skip(30)
.take(15)
.collect::<Vec<&str>>()
.join("\n");

// Replace the string DEFAULT_IMAGES = from the versions string
let versions = versions.replace("DEFAULT_IMAGES = ", "");
// Remove the declaration `DEFAULT_IMAGES = `
let raw_versions = raw_versions.replace("DEFAULT_IMAGES = ", "");

// Clean up the content by removing comments and unnecessary spaces
let re_comments = Regex::new(r"#.*$").unwrap(); // Regex to remove comments
let re_trailing_commas = Regex::new(r",(\s*})").unwrap(); // Regex to fix trailing commas

let cleaned_versions = raw_versions
.lines()
.map(|line| re_comments.replace_all(line, "").trim().to_string()) // Remove comments and trim spaces
.filter(|line| !line.is_empty()) // Filter out empty lines
.collect::<Vec<_>>()
.join("\n");

// Remove all comments to the end of the line using a regexp
let re = Regex::new(r"\s#\s.*\n").unwrap();
let versions = re.replace_all(&versions, "");
// Replace the trailing comma on the last line
let versions = versions.replace(", }", " }");
// Fix improperly placed trailing commas
let cleaned_versions = re_trailing_commas.replace_all(&cleaned_versions, "$1");

// The versions string is a JSON object we can parse
let versions_json: serde_json::Value = serde_json::from_str(&versions).unwrap();
// Attempt to parse the cleaned content as JSON
let versions_json: Value = match serde_json::from_str(&cleaned_versions) {
Ok(json) => json,
Err(e) => {
eprintln!("Failed to parse JSON: {}", e); // Print the error
eprintln!("Input string was: {}", cleaned_versions); // Print the input causing the error
return Err(io::Error::new(io::ErrorKind::InvalidData, "JSON parsing failed"));
}
};

// Write the versions to a file
// Define the output file path for the JSON
let dest_path = Path::new(".").join("versions.json");
let mut file = File::create(&dest_path)?;
file.write_all(
format!(
"{}\n",
serde_json::to_string_pretty(&versions_json).unwrap()
serde_json::to_string_pretty(&versions_json).unwrap() // Pretty-print JSON to the file
)
.as_bytes(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestWithReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down Expand Up @@ -278,7 +278,7 @@ func TestStressAndReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down
5 changes: 5 additions & 0 deletions l1infotreesync/migrations/l1infotreesync0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +migrate Down
ALTER TABLE block DROP COLUMN hash;

-- +migrate Up
ALTER TABLE block ADD COLUMN hash VARCHAR;
7 changes: 7 additions & 0 deletions l1infotreesync/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var mig001 string
//go:embed l1infotreesync0002.sql
var mig002 string

//go:embed l1infotreesync0003.sql
var mig003 string

func RunMigrations(dbPath string) error {
migrations := []types.Migration{
{
Expand All @@ -29,6 +32,10 @@ func RunMigrations(dbPath string) error {
ID: "l1infotreesync0002",
SQL: mig002,
},
{
ID: "l1infotreesync0003",
SQL: mig003,
},
}
for _, tm := range treeMigrations.Migrations {
migrations = append(migrations, types.Migration{
Expand Down
38 changes: 24 additions & 14 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type processor struct {
rollupExitTree *tree.UpdatableTree
halted bool
haltedReason string
log *log.Logger
}

// UpdateL1InfoTree representation of the UpdateL1InfoTree event
Expand Down Expand Up @@ -149,6 +150,7 @@ func newProcessor(dbPath string) (*processor, error) {
db: db,
l1InfoTree: tree.NewAppendOnlyTree(db, migrations.L1InfoTreePrefix),
rollupExitTree: tree.NewUpdatableTree(db, migrations.RollupExitTreePrefix),
log: log.WithFields("processor", "l1infotreesync"),
}, nil
}

Expand Down Expand Up @@ -176,7 +178,7 @@ func (p *processor) GetLatestInfoUntilBlock(ctx context.Context, blockNum uint64
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Warnf("error rolling back tx: %v", err)
p.log.Warnf("error rolling back tx: %v", err)
}
}()

Expand Down Expand Up @@ -233,6 +235,8 @@ func (p *processor) getLastProcessedBlockWithTx(tx db.Querier) (uint64, error) {
// Reorg triggers a purge and reset process on the processor to leaf it on a state
// as if the last block processed was firstReorgedBlock-1
func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
p.log.Infof("reorging to block %d", firstReorgedBlock)

tx, err := db.NewTx(ctx, p.db)
if err != nil {
return err
Expand Down Expand Up @@ -266,6 +270,9 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
if err := tx.Commit(); err != nil {
return err
}

p.log.Infof("reorged to block %d, %d rows affected", firstReorgedBlock, rowsAffected)

if rowsAffected > 0 {
p.halted = false
p.haltedReason = ""
Expand All @@ -278,7 +285,7 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
// and updates the last processed block (can be called without events for that purpose)
func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if p.halted {
log.Errorf("processor is halted due to: %s", p.haltedReason)
p.log.Errorf("processor is halted due to: %s", p.haltedReason)
return sync.ErrInconsistentState
}
tx, err := db.NewTx(ctx, p.db)
Expand All @@ -289,12 +296,12 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
defer func() {
if shouldRollback {
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
p.log.Errorf("error while rolling back tx %v", errRllbck)
}
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, block.Num); err != nil {
if _, err := tx.Exec(`INSERT INTO block (num, hash) VALUES ($1, $2)`, block.Num, block.Hash.String()); err != nil {
return fmt.Errorf("insert Block. err: %w", err)
}

Expand Down Expand Up @@ -340,10 +347,13 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if err != nil {
return fmt.Errorf("AddLeaf(%s). err: %w", info.String(), err)
}
log.Infof("inserted L1InfoTreeLeaf %s", info.String())
p.log.Infof("inserted L1InfoTreeLeaf %s", info.String())
l1InfoLeavesAdded++
}
if event.UpdateL1InfoTreeV2 != nil {
p.log.Infof("handle UpdateL1InfoTreeV2 event. Block: %d, block hash: %s. Event root: %s. Event leaf count: %d.",
block.Num, block.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(), event.UpdateL1InfoTreeV2.LeafCount)

root, err := p.l1InfoTree.GetLastRoot(tx)
if err != nil {
return fmt.Errorf("GetLastRoot(). err: %w", err)
Expand All @@ -355,33 +365,33 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if root.Hash != event.UpdateL1InfoTreeV2.CurrentL1InfoRoot || root.Index+1 != event.UpdateL1InfoTreeV2.LeafCount {
errStr := fmt.Sprintf(
"failed to check UpdateL1InfoTreeV2. Root: %s vs event:%s. "+
"Index: : %d vs event.LeafCount:%d. Happened on block %d",
root.Hash, common.Bytes2Hex(event.UpdateL1InfoTreeV2.CurrentL1InfoRoot[:]),
"Index: %d vs event.LeafCount: %d. Happened on block %d. Block hash: %s.",
root.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(),
root.Index, event.UpdateL1InfoTreeV2.LeafCount,
block.Num,
block.Num, block.Hash.String(),
)
log.Error(errStr)
p.log.Error(errStr)
p.haltedReason = errStr
p.halted = true
return sync.ErrInconsistentState
}
}
if event.VerifyBatches != nil {
log.Debugf("handle VerifyBatches event %s", event.VerifyBatches.String())
p.log.Debugf("handle VerifyBatches event %s", event.VerifyBatches.String())
err = p.processVerifyBatches(tx, block.Num, event.VerifyBatches)
if err != nil {
err = fmt.Errorf("processVerifyBatches. err: %w", err)
log.Errorf("error processing VerifyBatches: %v", err)
p.log.Errorf("error processing VerifyBatches: %v", err)
return err
}
}

if event.InitL1InfoRootMap != nil {
log.Debugf("handle InitL1InfoRootMap event %s", event.InitL1InfoRootMap.String())
p.log.Debugf("handle InitL1InfoRootMap event %s", event.InitL1InfoRootMap.String())
err = processEventInitL1InfoRootMap(tx, block.Num, event.InitL1InfoRootMap)
if err != nil {
err = fmt.Errorf("initL1InfoRootMap. Err: %w", err)
log.Errorf("error processing InitL1InfoRootMap: %v", err)
p.log.Errorf("error processing InitL1InfoRootMap: %v", err)
return err
}
}
Expand All @@ -392,7 +402,7 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
}
shouldRollback = false

log.Infof("block %d processed with %d events", block.Num, len(block.Events))
p.log.Infof("block %d processed with %d events", block.Num, len(block.Events))
return nil
}

Expand Down
18 changes: 17 additions & 1 deletion reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ import (
"golang.org/x/sync/errgroup"
)

type Network string

const (
L1 Network = "l1"
L2 Network = "l2"
)

func (n Network) String() string {
return string(n)
}

type EthClient interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
Expand All @@ -34,9 +45,11 @@ type ReorgDetector struct {

subscriptionsLock sync.RWMutex
subscriptions map[string]*Subscription

log *log.Logger
}

func New(client EthClient, cfg Config) (*ReorgDetector, error) {
func New(client EthClient, cfg Config, network Network) (*ReorgDetector, error) {
err := migrations.RunMigrations(cfg.DBPath)
if err != nil {
return nil, err
Expand All @@ -52,6 +65,7 @@ func New(client EthClient, cfg Config) (*ReorgDetector, error) {
checkReorgInterval: cfg.GetCheckReorgsInterval(),
trackedBlocks: make(map[string]*headersList),
subscriptions: make(map[string]*Subscription),
log: log.WithFields("reorg-detector", network.String()),
}, nil
}

Expand Down Expand Up @@ -122,6 +136,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
errGroup errgroup.Group
)

rd.log.Infof("Checking reorgs in tracked blocks up to block %d", lastFinalisedBlock.Number.Uint64())

subscriberIDs := rd.getSubscriberIDs()

for _, id := range subscriberIDs {
Expand Down
3 changes: 3 additions & 0 deletions reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error {
hdrs.add(b)
}
rd.trackedBlocksLock.Unlock()

rd.log.Debugf("Tracking block %d for subscriber %s", b.Num, id)

return meddler.Insert(rd.db, "tracked_block", &headerWithSubscriberID{
SubscriberID: id,
Num: b.Num,
Expand Down
2 changes: 2 additions & 0 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
sub, ok := rd.subscriptions[id]
rd.subscriptionsLock.RUnlock()

rd.log.Infof("Reorg detected for subscriber %s at block %d", id, startingBlock.Num)

if ok {
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
Expand Down
Loading
Loading