diff --git a/pkg/producer/producer.go b/pkg/producer/producer.go index 1c6ab40..75b86b6 100644 --- a/pkg/producer/producer.go +++ b/pkg/producer/producer.go @@ -161,8 +161,11 @@ func (d *DripProgramProducer) BackfillProgramOwnedAccounts(ctx context.Context, func (d *DripProgramProducer) pollTransactions(ctx context.Context) { logrus.Info("polling transactions") ticker := time.NewTicker(TXPOLLFREQUENCY) + if err := d.backfillTransactions(ctx); err != nil { + logrus.WithError(err).Error("failed to backfill") + } for { - if err := d.backfillCheckpointSlot(ctx); err != nil { + if err := d.processFromLastCheckpointSlot(ctx); err != nil { logrus.WithError(err).Error("failed to produce block with retry, skipping...") } select { @@ -176,7 +179,7 @@ func (d *DripProgramProducer) pollTransactions(ctx context.Context) { } } -func (d *DripProgramProducer) backfillCheckpointSlot(ctx context.Context) error { +func (d *DripProgramProducer) processFromLastCheckpointSlot(ctx context.Context) error { checkpoint := d.txProcessingCheckpointRepo.GetLatestTransactionCheckpoint(ctx) var untilSignature solana.Signature minSlot := uint64(0) @@ -185,11 +188,11 @@ func (d *DripProgramProducer) backfillCheckpointSlot(ctx context.Context) error minSlot = checkpoint.Slot } log := logrus.WithField("minSlot", minSlot).WithField("untilSignature", untilSignature.String()) - log.Info("starting backfill") + log.Info("starting processing") defer func() { - log.Info("done backfill") + log.Info("done processing") }() - txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), untilSignature, &minSlot) + txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), untilSignature, solana.Signature{}, &minSlot) if err != nil { log.WithError(err).Error("failed to GetSignaturesForAddress") return err @@ -219,6 +222,52 @@ func (d *DripProgramProducer) backfillCheckpointSlot(ctx context.Context) error return nil } +func (d *DripProgramProducer) backfillTransactions(ctx context.Context) error { + logrus.Info("starting backfill") + defer func() { + logrus.Info("done backfill") + }() + before := solana.Signature{} + txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), solana.Signature{}, before, nil) + if err != nil { + logrus.WithError(err).Error("failed to GetSignaturesForAddress") + return err + } + for len(txSignatures) > 0 { + log := logrus.WithField("before", before.String()) + log.WithField("len(txSignatures)", len(txSignatures)) + log.Info("got signatures") + for i := range lo.Reverse(txSignatures) { + txSignature := txSignatures[i] + tx, err := d.client.GetTransaction(ctx, txSignature.Signature.String()) + if err != nil { + log.WithError(err).Error("failed to GetTransaction") + return err + } + log.WithField("transactionSignature", txSignature.Signature.String()).Info("pushing tx to queue...") + if err := d.AddItemToTransactionUpdate(ctx, txSignature.Signature.String(), *tx); err != nil { + log.WithError(err).Error("failed to insert data into queue...") + return err + } else { + log.WithField("transactionSignature", txSignature).Info("pushed tx to queue...") + } + log.Info("updating checkpoint...") + if err := d.txProcessingCheckpointRepo.UpsertTransactionProcessingCheckpoint(ctx, txSignature.Slot, txSignature.Signature.String()); err != nil { + log.WithError(err).Error("failed to insert metadata...") + return err + } + } + before = txSignatures[0].Signature + txSignatures, err = d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), solana.Signature{}, before, nil) + if err != nil { + log.WithError(err).Error("failed to GetSignaturesForAddress") + return err + } + } + + return nil +} + func (d *DripProgramProducer) AddItemToAccountUpdateQueueCallback(ctx context.Context, programId string) func(string, []byte) error { return func(account string, data []byte) error { priority := int32(3) diff --git a/pkg/service/clients/solana/client.go b/pkg/service/clients/solana/client.go index 2f10eb1..69a7caf 100644 --- a/pkg/service/clients/solana/client.go +++ b/pkg/service/clients/solana/client.go @@ -48,7 +48,7 @@ type Solana interface { GetNetwork() config.Network GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error) - GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) + GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) GetTransaction(ctx context.Context, signature string) (*rpc.GetTransactionResult, error) } @@ -65,9 +65,10 @@ type impl struct { client *rpc.Client } -func (s impl) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) { +func (s impl) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) { return s.client.GetSignaturesForAddressWithOpts(ctx, solana.MustPublicKeyFromBase58(pubkey), &rpc.GetSignaturesForAddressOpts{ Until: until, + Before: before, Commitment: rpc.CommitmentFinalized, MinContextSlot: minSlot, }) diff --git a/pkg/service/clients/solana/mock.go b/pkg/service/clients/solana/mock.go index b5ab642..37a172c 100644 --- a/pkg/service/clients/solana/mock.go +++ b/pkg/service/clients/solana/mock.go @@ -142,18 +142,18 @@ func (mr *MockSolanaMockRecorder) GetProgramAccounts(arg0, arg1 interface{}) *go } // GetSignaturesForAddress mocks base method. -func (m *MockSolana) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) { +func (m *MockSolana) GetSignaturesForAddress(ctx context.Context, pubkey string, until, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetSignaturesForAddress", ctx, pubkey, until, minSlot) + ret := m.ctrl.Call(m, "GetSignaturesForAddress", ctx, pubkey, until, before, minSlot) ret0, _ := ret[0].([]*rpc.TransactionSignature) ret1, _ := ret[1].(error) return ret0, ret1 } // GetSignaturesForAddress indicates an expected call of GetSignaturesForAddress. -func (mr *MockSolanaMockRecorder) GetSignaturesForAddress(ctx, pubkey, until, minSlot interface{}) *gomock.Call { +func (mr *MockSolanaMockRecorder) GetSignaturesForAddress(ctx, pubkey, until, before, minSlot interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSignaturesForAddress", reflect.TypeOf((*MockSolana)(nil).GetSignaturesForAddress), ctx, pubkey, until, minSlot) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSignaturesForAddress", reflect.TypeOf((*MockSolana)(nil).GetSignaturesForAddress), ctx, pubkey, until, before, minSlot) } // GetTokenMetadataAccount mocks base method.