Skip to content

Commit

Permalink
feat: add syncOnlyMode to aggregator (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM authored Aug 22, 2024
1 parent 63be2a4 commit 6251749
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 95 deletions.
194 changes: 100 additions & 94 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,25 @@ func New(
log.Fatalf("error creating ethtxmanager client: %v", err)
}

// Data stream client logs
streamLogConfig := streamlog.Config{
Environment: streamlog.LogEnvironment(cfg.Log.Environment),
Level: cfg.Log.Level,
Outputs: cfg.Log.Outputs,
}
var streamClient *datastreamer.StreamClient

if !cfg.SyncModeOnlyEnabled {
// Data stream client logs
streamLogConfig := streamlog.Config{
Environment: streamlog.LogEnvironment(cfg.Log.Environment),
Level: cfg.Log.Level,
Outputs: cfg.Log.Outputs,
}

log.Init(cfg.Log)
log.Init(cfg.Log)

log.Info("Creating data stream client....")
streamClient, err := datastreamer.NewClientWithLogsConfig(cfg.StreamClient.Server, dataStreamType, streamLogConfig)
if err != nil {
log.Fatalf("failed to create stream client, error: %v", err)
log.Info("Creating data stream client....")
streamClient, err = datastreamer.NewClientWithLogsConfig(cfg.StreamClient.Server, dataStreamType, streamLogConfig)
if err != nil {
log.Fatalf("failed to create stream client, error: %v", err)
}
log.Info("Data stream client created.")
}
log.Info("Data stream client created.")

// Synchonizer logs
syncLogConfig := synclog.Config{
Expand All @@ -157,7 +161,7 @@ func New(
sequencerPrivateKey *ecdsa.PrivateKey
)

if cfg.SettlementBackend == AggLayer {
if !cfg.SyncModeOnlyEnabled && cfg.SettlementBackend == AggLayer {
aggLayerClient = NewAggLayerClient(cfg.AggLayerURL)

sequencerPrivateKey, err = newKeyFromKeystore(cfg.SequencerPrivateKey)
Expand All @@ -184,11 +188,11 @@ func New(
witnessRetrievalChan: make(chan state.DBBatch),
}

log.Infof("MaxWitnessRetrievalWorkers set to %d", cfg.MaxWitnessRetrievalWorkers)

// Set function to handle the batches from the data stream
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg)
if !cfg.SyncModeOnlyEnabled {
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg)
}

return a, nil
}
Expand Down Expand Up @@ -484,106 +488,108 @@ func (a *Aggregator) Start(ctx context.Context) error {
a.ctx = ctx
a.exit = cancel

address := fmt.Sprintf("%s:%d", a.cfg.Host, a.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}

a.srv = grpc.NewServer()
prover.RegisterAggregatorServiceServer(a.srv, a)

healthService := newHealthChecker()
grpchealth.RegisterHealthServer(a.srv, healthService)

// Initial L1 Sync blocking
err = a.l1Syncr.Sync(true)
err := a.l1Syncr.Sync(true)
if err != nil {
log.Fatalf("Failed to synchronize from L1: %v", err)
return err
}

// Get last verified batch number to set the starting point for verifications
lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
return err
}
// Keep syncing L1
go func() {
err := a.l1Syncr.Sync(false)
if err != nil {
log.Fatalf("Failed to synchronize from L1: %v", err)
}
}()

// Cleanup data base
err = a.state.DeleteBatchesOlderThanBatchNumber(ctx, lastVerifiedBatchNumber, nil)
if err != nil {
return err
}
if !a.cfg.SyncModeOnlyEnabled {
address := fmt.Sprintf("%s:%d", a.cfg.Host, a.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}

// Delete ungenerated recursive proofs
err = a.state.DeleteUngeneratedProofs(ctx, nil)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
}
a.srv = grpc.NewServer()
prover.RegisterAggregatorServiceServer(a.srv, a)

accInputHash, err := a.getVerifiedBatchAccInputHash(ctx, lastVerifiedBatchNumber)
if err != nil {
return err
}
healthService := newHealthChecker()
grpchealth.RegisterHealthServer(a.srv, healthService)

log.Infof("Last Verified Batch Number:%v", lastVerifiedBatchNumber)
log.Infof("Starting AccInputHash:%v", accInputHash.String())
// Get last verified batch number to set the starting point for verifications
lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
return err
}

// Store Acc Input Hash of the latest verified batch
dummyDBBatch := state.DBBatch{Batch: state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}, Datastream: []byte{0}, Witness: []byte{0}}
err = a.state.AddBatch(ctx, &dummyDBBatch, nil)
if err != nil {
return err
}
// Cleanup data base
err = a.state.DeleteBatchesOlderThanBatchNumber(ctx, lastVerifiedBatchNumber, nil)
if err != nil {
return err
}

a.resetVerifyProofTime()
// Delete ungenerated recursive proofs
err = a.state.DeleteUngeneratedProofs(ctx, nil)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
}

accInputHash, err := a.getVerifiedBatchAccInputHash(ctx, lastVerifiedBatchNumber)
if err != nil {
return err
}

go a.cleanupLockedProofs()
go a.sendFinalProof()
go a.ethTxManager.Start()
log.Infof("Last Verified Batch Number:%v", lastVerifiedBatchNumber)
log.Infof("Starting AccInputHash:%v", accInputHash.String())

// Keep syncing L1
go func() {
err := a.l1Syncr.Sync(false)
// Store Acc Input Hash of the latest verified batch
dummyDBBatch := state.DBBatch{Batch: state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}, Datastream: []byte{0}, Witness: []byte{0}}
err = a.state.AddBatch(ctx, &dummyDBBatch, nil)
if err != nil {
log.Fatalf("Failed to synchronize from L1: %v", err)
return err
}
}()

// Witness retrieval workers
for i := 0; i < a.cfg.MaxWitnessRetrievalWorkers; i++ {
go a.retrieveWitness()
}
a.resetVerifyProofTime()

// Start stream client
err = a.streamClient.Start()
if err != nil {
log.Fatalf("failed to start stream client, error: %v", err)
}
go a.cleanupLockedProofs()
go a.sendFinalProof()
go a.ethTxManager.Start()

bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: lastVerifiedBatchNumber + 1,
}
// Witness retrieval workers
for i := 0; i < a.cfg.MaxWitnessRetrievalWorkers; i++ {
go a.retrieveWitness()
}

marshalledBookMark, err := proto.Marshal(bookMark)
if err != nil {
log.Fatalf("failed to marshal bookmark: %v", err)
}
// Start stream client
err = a.streamClient.Start()
if err != nil {
log.Fatalf("failed to start stream client, error: %v", err)
}

err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark)
if err != nil {
log.Fatalf("failed to connect to data stream: %v", err)
}
bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: lastVerifiedBatchNumber + 1,
}

// A this point everything is ready, so start serving
go func() {
log.Infof("Server listening on port %d", a.cfg.Port)
if err := a.srv.Serve(lis); err != nil {
a.exit()
log.Fatalf("Failed to serve: %v", err)
marshalledBookMark, err := proto.Marshal(bookMark)
if err != nil {
log.Fatalf("failed to marshal bookmark: %v", err)
}
}()

err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark)
if err != nil {
log.Fatalf("failed to connect to data stream: %v", err)
}

// A this point everything is ready, so start serving
go func() {
log.Infof("Server listening on port %d", a.cfg.Port)
if err := a.srv.Serve(lis); err != nil {
a.exit()
log.Fatalf("Failed to serve: %v", err)
}
}()
}

<-ctx.Done()
return ctx.Err()
Expand Down
4 changes: 4 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ type Config struct {

// MaxWitnessRetrievalWorkers is the maximum number of workers that will be used to retrieve the witness
MaxWitnessRetrievalWorkers int `mapstructure:"MaxWitnessRetrievalWorkers"`

// SyncModeOnlyEnabled is a flag to enable the sync mode only
// In this mode the aggregator will only sync from L1 and will not generate or read the data stream
SyncModeOnlyEnabled bool `mapstructure:"SyncModeOnlyEnabled"`
}

// StreamClientCfg contains the data streamer's configuration properties
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ SettlementBackend = "l1"
AggLayerTxTimeout = "5m"
AggLayerURL = ""
MaxWitnessRetrievalWorkers = 2
SyncModeOnlyEnabled = false
SequencerPrivateKey = {}
[Aggregator.DB]
Name = "aggregator_db"
Expand Down
4 changes: 3 additions & 1 deletion test/config/test.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ UseL1BatchData = true
SettlementBackend = "l1"
AggLayerTxTimeout = "5m"
AggLayerURL = ""
SequencerPrivateKey = {}
MaxWitnessRetrievalWorkers = 2
SyncModeOnlyEnabled = false
UseFullWitness = false
SequencerPrivateKey = {}
[Aggregator.DB]
Name = "aggregator_db"
User = "aggregator_user"
Expand Down
3 changes: 3 additions & 0 deletions test/config/test.kurtosis_template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ UseFullWitness = false
SettlementBackend = "l1"
AggLayerTxTimeout = "5m"
AggLayerURL = ""
MaxWitnessRetrievalWorkers = 2
SyncModeOnlyEnabled = false
UseFullWitness = false
SequencerPrivateKey = {}
[Aggregator.DB]
Name = "aggregator_db"
Expand Down

0 comments on commit 6251749

Please sign in to comment.