Skip to content

Commit

Permalink
feat: backfill all transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
sol-mocha committed Apr 22, 2023
1 parent a1a7e7a commit dbf337b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
59 changes: 54 additions & 5 deletions pkg/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,11 @@ func (d *DripProgramProducer) BackfillProgramOwnedAccounts(ctx context.Context,
func (d *DripProgramProducer) pollTransactions(ctx context.Context) {
logrus.Info("polling transactions")
ticker := time.NewTicker(TXPOLLFREQUENCY)
if err := d.backfillTransactions(ctx); err != nil {
logrus.WithError(err).Error("failed to backfill")
}
for {
if err := d.backfillCheckpointSlot(ctx); err != nil {
if err := d.processFromLastCheckpointSlot(ctx); err != nil {
logrus.WithError(err).Error("failed to produce block with retry, skipping...")
}
select {
Expand All @@ -176,7 +179,7 @@ func (d *DripProgramProducer) pollTransactions(ctx context.Context) {
}
}

func (d *DripProgramProducer) backfillCheckpointSlot(ctx context.Context) error {
func (d *DripProgramProducer) processFromLastCheckpointSlot(ctx context.Context) error {
checkpoint := d.txProcessingCheckpointRepo.GetLatestTransactionCheckpoint(ctx)
var untilSignature solana.Signature
minSlot := uint64(0)
Expand All @@ -185,11 +188,11 @@ func (d *DripProgramProducer) backfillCheckpointSlot(ctx context.Context) error
minSlot = checkpoint.Slot
}
log := logrus.WithField("minSlot", minSlot).WithField("untilSignature", untilSignature.String())
log.Info("starting backfill")
log.Info("starting processing")
defer func() {
log.Info("done backfill")
log.Info("done processing")
}()
txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), untilSignature, &minSlot)
txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), untilSignature, solana.Signature{}, &minSlot)
if err != nil {
log.WithError(err).Error("failed to GetSignaturesForAddress")
return err
Expand Down Expand Up @@ -219,6 +222,52 @@ func (d *DripProgramProducer) backfillCheckpointSlot(ctx context.Context) error
return nil
}

func (d *DripProgramProducer) backfillTransactions(ctx context.Context) error {
logrus.Info("starting backfill")
defer func() {
logrus.Info("done backfill")
}()
before := solana.Signature{}
txSignatures, err := d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), solana.Signature{}, before, nil)
if err != nil {
logrus.WithError(err).Error("failed to GetSignaturesForAddress")
return err
}
for len(txSignatures) > 0 {
log := logrus.WithField("before", before.String())
log.WithField("len(txSignatures)", len(txSignatures))
log.Info("got signatures")
for i := range lo.Reverse(txSignatures) {
txSignature := txSignatures[i]
tx, err := d.client.GetTransaction(ctx, txSignature.Signature.String())
if err != nil {
log.WithError(err).Error("failed to GetTransaction")
return err
}
log.WithField("transactionSignature", txSignature.Signature.String()).Info("pushing tx to queue...")
if err := d.AddItemToTransactionUpdate(ctx, txSignature.Signature.String(), *tx); err != nil {
log.WithError(err).Error("failed to insert data into queue...")
return err
} else {
log.WithField("transactionSignature", txSignature).Info("pushed tx to queue...")
}
log.Info("updating checkpoint...")
if err := d.txProcessingCheckpointRepo.UpsertTransactionProcessingCheckpoint(ctx, txSignature.Slot, txSignature.Signature.String()); err != nil {
log.WithError(err).Error("failed to insert metadata...")
return err
}
}
before = txSignatures[0].Signature
txSignatures, err = d.client.GetSignaturesForAddress(ctx, drip.ProgramID.String(), solana.Signature{}, before, nil)
if err != nil {
log.WithError(err).Error("failed to GetSignaturesForAddress")
return err
}
}

return nil
}

func (d *DripProgramProducer) AddItemToAccountUpdateQueueCallback(ctx context.Context, programId string) func(string, []byte) error {
return func(account string, data []byte) error {
priority := int32(3)
Expand Down
5 changes: 3 additions & 2 deletions pkg/service/clients/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Solana interface {
GetNetwork() config.Network

GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error)
GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error)
GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error)
GetTransaction(ctx context.Context, signature string) (*rpc.GetTransactionResult, error)
}

Expand All @@ -65,9 +65,10 @@ type impl struct {
client *rpc.Client
}

func (s impl) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) {
func (s impl) GetSignaturesForAddress(ctx context.Context, pubkey string, until solana.Signature, before solana.Signature, minSlot *uint64) ([]*rpc.TransactionSignature, error) {
return s.client.GetSignaturesForAddressWithOpts(ctx, solana.MustPublicKeyFromBase58(pubkey), &rpc.GetSignaturesForAddressOpts{
Until: until,
Before: before,
Commitment: rpc.CommitmentFinalized,
MinContextSlot: minSlot,
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/service/clients/solana/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit dbf337b

Please sign in to comment.