From acb48ee56577ca8146a05940dad6f76c0b265b57 Mon Sep 17 00:00:00 2001 From: FattiesPatties <63286473+fatima2003@users.noreply.github.com> Date: Wed, 11 Dec 2024 18:14:33 +0400 Subject: [PATCH 1/3] Introduce HandleListPage Helper for Streamlined Pagination (#781) * Introduce HandleListPage which simplifies proccessing Lists and defines itemCallback for individual entries and batchCallback for cumulative batch operations Signed-off-by: fp17 * Refactor path_tidy to use handleListPage instead of processing each item and page in the doTidy methods Signed-off-by: fp17 --------- Signed-off-by: fp17 --- builtin/logical/pki/path_tidy.go | 464 +++++++++++++++---------------- sdk/logical/storage.go | 67 +++++ 2 files changed, 291 insertions(+), 240 deletions(-) diff --git a/builtin/logical/pki/path_tidy.go b/builtin/logical/pki/path_tidy.go index 8241e28e4a..1a96605a4b 100644 --- a/builtin/logical/pki/path_tidy.go +++ b/builtin/logical/pki/path_tidy.go @@ -943,134 +943,127 @@ func (b *backend) doTidyCertStore(ctx context.Context, req *logical.Request, log revokedSafetyBuffer = *config.RevokedSafetyBuffer } - // Total number of certificates + // Total number of certificates in storage var totalSerialCount int - // Number of certificates on current page. This value is <= PageSize. - var lenSerials int - - var after string + // Total number of deleted revoked certificates in this tidy call var revokedDeleted uint haveWarned := false - for { - // Fetch the next batch of certificate serials using pagination - serials, err := req.Storage.ListPage(ctx, "certs/", after, config.PageSize) - if err != nil { - return 0, fmt.Errorf("error fetching list of certs: %w", err) - } + // Define item-level callback that processes each certificate entry + itemCallback := func(page int, index int, serial string) (bool, error) { + b.tidyStatusMessage(fmt.Sprintf("Tidying certificate store: checking entry %d of %d on current page; total certs checked: %d", index, config.PageSize, totalSerialCount+index)) + metrics.SetGauge([]string{"secrets", "pki", "tidy", "cert_store_current_entry"}, float32(totalSerialCount+index)) - // If no serials are returned, we've reached the end of the cert list - if len(serials) == 0 { - break + // Check for cancel before continuing + if atomic.CompareAndSwapUint32(b.tidyCancelCAS, 1, 0) { + return false, tidyCancelledError } - lenSerials = len(serials) - after = serials[lenSerials-1] - - for i, serial := range serials { - b.tidyStatusMessage(fmt.Sprintf("Tidying certificate store: checking entry %d of %d on current page; total certs checked: %d", i, lenSerials, totalSerialCount+i)) - metrics.SetGauge([]string{"secrets", "pki", "tidy", "cert_store_current_entry"}, float32(totalSerialCount+i)) + // Check for pause duration to reduce resource consumption + if config.PauseDuration > (0 * time.Second) { + time.Sleep(config.PauseDuration) + } - // Check for cancel before continuing. - if atomic.CompareAndSwapUint32(b.tidyCancelCAS, 1, 0) { - return 0, tidyCancelledError - } + certEntry, err := req.Storage.Get(ctx, "certs/"+serial) + if err != nil { + return false, fmt.Errorf("error fetching certificate %q: %w", serial, err) + } - // Check for pause duration to reduce resource consumption. - if config.PauseDuration > (0 * time.Second) { - time.Sleep(config.PauseDuration) + if certEntry == nil { + logger.Warn("certificate entry is nil; tidying up since it is no longer useful for any server operations", "serial", serial) + if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { + return false, fmt.Errorf("error deleting nil entry with serial %s: %w", serial, err) } + b.tidyStatusIncCertStoreCount() + return true, nil + } - certEntry, err := req.Storage.Get(ctx, "certs/"+serial) - if err != nil { - return 0, fmt.Errorf("error fetching certificate %q: %w", serial, err) + if certEntry.Value == nil || len(certEntry.Value) == 0 { + logger.Warn("certificate entry has no value; tidying up since it is no longer useful for any server operations", "serial", serial) + if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { + return false, fmt.Errorf("error deleting entry with nil value with serial %s: %w", serial, err) } + b.tidyStatusIncCertStoreCount() + return true, nil + } - if certEntry == nil { - logger.Warn("certificate entry is nil; tidying up since it is no longer useful for any server operations", "serial", serial) - if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { - return 0, fmt.Errorf("error deleting nil entry with serial %s: %w", serial, err) + cert, err := x509.ParseCertificate(certEntry.Value) + if err != nil { + // only log warning once + if !haveWarned { + msg := "Unable to parse stored certificate. Other invalid certificates may exist; " + if config.InvalidCerts { + msg += "tidying up since it is not usable." + } else { + msg += "tidy by enabling tidy_invalid_certs=true." } - b.tidyStatusIncCertStoreCount() - continue + logger.Warn(msg, "serial", serial, "err", err) + haveWarned = true } - if certEntry.Value == nil || len(certEntry.Value) == 0 { - logger.Warn("certificate entry has no value; tidying up since it is no longer useful for any server operations", "serial", serial) + // if tidy_invalid_certs enabled, delete invalid cert. Because + // we're cleaning up revoked certs later by virtue of + // config.InvalidCerts=true, we can skip deleting revoked certs + // here. + if config.InvalidCerts { if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { - return 0, fmt.Errorf("error deleting entry with nil value with serial %s: %w", serial, err) + return false, fmt.Errorf("error deleting invalid certificate %s: %w", serial, err) } b.tidyStatusIncCertStoreCount() - continue } - cert, err := x509.ParseCertificate(certEntry.Value) - if err != nil { - // only log warning once - if !haveWarned { - msg := "Unable to parse stored certificate. Other invalid certificates may exist; " - if config.InvalidCerts { - msg += "tidying up since it is not usable." - } else { - msg += "tidy by enabling tidy_invalid_certs=true." - } - logger.Warn(msg, "serial", serial, "err", err) - haveWarned = true - } + return true, nil + } - // if tidy_invalid_certs enabled, delete invalid cert. Because - // we're cleaning up revoked certs later by virtue of - // config.InvalidCerts=true, we can skip deleting revoked certs - // here. - if config.InvalidCerts { - if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { - return 0, fmt.Errorf("error deleting invalid certificate %s: %w", serial, err) - } - b.tidyStatusIncCertStoreCount() - } + // We could be exclusively looking for invalid certificates; skip + // fetching a known-good revocation entry here if so. This also lets + // us avoid guarding each deletion check below. + if !config.CertStore { + return true, nil + } - continue - } + // Check if a revocation entry exists for this cert; if so, use the + // appropriate entry. + revokedResp, err := req.Storage.Get(ctx, "revoked/"+serial) + if err != nil { + return false, fmt.Errorf("error fetching revocation status of serial %q from storage: %w", serial, err) + } - // We could be exclusively looking for invalid certificates; skip - // fetching a known-good revocation entry here if so. This also lets - // us avoid guarding each deletion check below. - if !config.CertStore { - continue + if revokedResp == nil && time.Since(cert.NotAfter) > config.SafetyBuffer { + if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { + return false, fmt.Errorf("error deleting serial %q from storage: %w", serial, err) } - - // Check if a revocation entry exists for this cert; if so, use the - // appropriate entry. - revokedResp, err := req.Storage.Get(ctx, "revoked/"+serial) - if err != nil { - return 0, fmt.Errorf("error fetching revocation status of serial %q from storage: %w", serial, err) + b.tidyStatusIncCertStoreCount() + } else if revokedResp != nil && time.Since(cert.NotAfter) > revokedSafetyBuffer { + if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { + return false, fmt.Errorf("error deleting serial %q from store when tidying revoked: %w", serial, err) } - - if revokedResp == nil && time.Since(cert.NotAfter) > config.SafetyBuffer { - if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { - return 0, fmt.Errorf("error deleting serial %q from storage: %w", serial, err) - } - b.tidyStatusIncCertStoreCount() - } else if revokedResp != nil && time.Since(cert.NotAfter) > revokedSafetyBuffer { - if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { - return 0, fmt.Errorf("error deleting serial %q from store when tidying revoked: %w", serial, err) - } - // Only tidy revoked certs if requested. - if config.RevokedCerts { - if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { - return 0, fmt.Errorf("error deleting serial %q from revoked list: %w", serial, err) - } - revokedDeleted += 1 - b.tidyStatusIncRevokedCertCount() + // Only tidy revoked certs if requested. + if config.RevokedCerts { + if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { + return false, fmt.Errorf("error deleting serial %q from revoked list: %w", serial, err) } - b.tidyStatusIncCertStoreCount() + revokedDeleted++ + b.tidyStatusIncRevokedCertCount() } + b.tidyStatusIncCertStoreCount() } + return true, nil + } + + // Define batch-level callback that updates cumulative count after processing the page + batchCallback := func(page int, entries []string) (bool, error) { + totalSerialCount += len(entries) + return true, nil + } - // Update the cumulative count after processing the page - totalSerialCount += lenSerials + // Use HandleListPage to process paginated results + err := logical.HandleListPage(req.Storage, "certs/", config.PageSize, itemCallback, batchCallback) + if err != nil { + return 0, err } + // Set metrics for total certificates and remaining certificates b.tidyStatusLock.RLock() metrics.SetGauge([]string{"secrets", "pki", "tidy", "cert_store_total_entries"}, float32(totalSerialCount)) metrics.SetGauge([]string{"secrets", "pki", "tidy", "cert_store_total_entries_remaining"}, float32(uint(totalSerialCount)-b.tidyStatus.certStoreDeletedCount)) @@ -1102,157 +1095,152 @@ func (b *backend) doTidyRevocationStore(ctx context.Context, req *logical.Reques // Total number of deleted revoked certificates in this tidy call var revokedDeletedCount int = 0 - var after string var revInfo revocationInfo haveWarned := false rebuildCRL := false fixedIssuers := 0 - for { - revokedSerials, err := req.Storage.ListPage(ctx, "revoked/", after, config.PageSize) - if err != nil { - return false, fmt.Errorf("error fetching list of revoked certs: %w", err) - } + // Define item-level callback that processes each revoked cert entry + itemCallback := func(page int, index int, serial string) (bool, error) { + b.tidyStatusMessage(fmt.Sprintf("Tidying revoked certificates: checking certificate %d of %d on current page; total revoked certs checked: %d", index, lenSerials, int(totalRevokedSerialCount)+index)) + metrics.SetGauge([]string{"secrets", "pki", "tidy", "revoked_cert_current_entry"}, float32(int(totalRevokedSerialCount)+index)) - // If no revokedSerials are returned, we've reached the end of the list - if len(revokedSerials) == 0 { - break + // Check for cancel before continuing. + if atomic.CompareAndSwapUint32(b.tidyCancelCAS, 1, 0) { + return false, tidyCancelledError } - lenSerials = len(revokedSerials) - after = revokedSerials[lenSerials-1] + // Check for pause duration to reduce resource consumption. + if config.PauseDuration > (0 * time.Second) { + b.revokeStorageLock.Unlock() + time.Sleep(config.PauseDuration) + b.revokeStorageLock.Lock() + } - for i, serial := range revokedSerials { - b.tidyStatusMessage(fmt.Sprintf("Tidying revoked certificates: checking certificate %d of %d on current page; total revoked certs checked: %d", i, lenSerials, int(totalRevokedSerialCount)+i)) - metrics.SetGauge([]string{"secrets", "pki", "tidy", "revoked_cert_current_entry"}, float32(int(totalRevokedSerialCount)+i)) + revokedEntry, err := req.Storage.Get(ctx, "revoked/"+serial) + if err != nil { + return false, fmt.Errorf("unable to fetch revoked cert with serial %q: %w", serial, err) + } - // Check for cancel before continuing. - if atomic.CompareAndSwapUint32(b.tidyCancelCAS, 1, 0) { - return false, tidyCancelledError + if revokedEntry == nil { + if !haveWarned { + logger.Warn("Revoked entry is nil. Other invalid entries may exist; tidying up since it is no longer useful for any server operations.", "serial", serial) } - - // Check for pause duration to reduce resource consumption. - if config.PauseDuration > (0 * time.Second) { - b.revokeStorageLock.Unlock() - time.Sleep(config.PauseDuration) - b.revokeStorageLock.Lock() + if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { + return false, fmt.Errorf("error deleting nil revoked entry with serial %s: %w", serial, err) } + b.tidyStatusIncRevokedCertCount() + revokedDeletedCount += 1 + return true, nil + } - revokedEntry, err := req.Storage.Get(ctx, "revoked/"+serial) - if err != nil { - return false, fmt.Errorf("unable to fetch revoked cert with serial %q: %w", serial, err) + if revokedEntry.Value == nil || len(revokedEntry.Value) == 0 { + if !haveWarned { + logger.Warn("Revoked entry has nil value. Other invalid entries may exist; tidying up since it is no longer useful for any server operations", "serial", serial) } + if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { + return false, fmt.Errorf("error deleting revoked entry with nil value with serial %s: %w", serial, err) + } + b.tidyStatusIncRevokedCertCount() + revokedDeletedCount += 1 + return true, nil + } - if revokedEntry == nil { - if !haveWarned { - logger.Warn("Revoked entry is nil. Other invalid entries may exist; tidying up since it is no longer useful for any server operations.", "serial", serial) - } - if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { - return false, fmt.Errorf("error deleting nil revoked entry with serial %s: %w", serial, err) + err = revokedEntry.DecodeJSON(&revInfo) + if err != nil { + return false, fmt.Errorf("error decoding revocation entry for serial %q: %w", serial, err) + } + + revokedCert, err := x509.ParseCertificate(revInfo.CertificateBytes) + if err != nil { + // only log warning once + if !haveWarned { + msg := "Unable to parse revoked certificate. Other invalid certificates may exist; " + if config.InvalidCerts { + msg += "tidying up since it is not usable." + } else { + msg += "tidy by enabling tidy_invalid_certs=true." } - b.tidyStatusIncRevokedCertCount() - revokedDeletedCount += 1 - continue + logger.Warn(msg, "serial", serial, "err", err) + haveWarned = true } - if revokedEntry.Value == nil || len(revokedEntry.Value) == 0 { - if !haveWarned { - logger.Warn("Revoked entry has nil value. Other invalid entries may exist; tidying up since it is no longer useful for any server operations", "serial", serial) - } + // If tidy_invalid_certs enabled, delete invalid revoked cert. + // We know we've already deleted the invalid cert entry via + // doTidyCertStore(...) earlier so don't bother deleting that + // too. + if config.InvalidCerts { if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { - return false, fmt.Errorf("error deleting revoked entry with nil value with serial %s: %w", serial, err) + return false, fmt.Errorf("error deleting invalid revoked certificate %s: %w", serial, err) } b.tidyStatusIncRevokedCertCount() revokedDeletedCount += 1 - continue - } - - err = revokedEntry.DecodeJSON(&revInfo) - if err != nil { - return false, fmt.Errorf("error decoding revocation entry for serial %q: %w", serial, err) } - revokedCert, err := x509.ParseCertificate(revInfo.CertificateBytes) - if err != nil { - // only log warning once - if !haveWarned { - msg := "Unable to parse revoked certificate. Other invalid certificates may exist; " - if config.InvalidCerts { - msg += "tidying up since it is not usable." - } else { - msg += "tidy by enabling tidy_invalid_certs=true." - } - logger.Warn(msg, "serial", serial, "err", err) - haveWarned = true - } + return true, nil + } - // If tidy_invalid_certs enabled, delete invalid revoked cert. - // We know we've already deleted the invalid cert entry via - // doTidyCertStore(...) earlier so don't bother deleting that - // too. - if config.InvalidCerts { - if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { - return false, fmt.Errorf("error deleting invalid revoked certificate %s: %w", serial, err) - } - b.tidyStatusIncRevokedCertCount() - revokedDeletedCount += 1 + // Tidy operations over revoked certs should execute prior to + // tidyRevokedCerts as that may remove the entry. If that happens, + // we won't persist the revInfo changes (as it was deleted instead). + var storeCert bool = false + if config.IssuerAssocs { + if !isRevInfoIssuerValid(&revInfo, issuerIDCertMap) { + b.tidyStatusIncMissingIssuerCertCount() + revInfo.CertificateIssuer = issuerID("") + storeCert = true + if associateRevokedCertWithIsssuer(&revInfo, revokedCert, issuerIDCertMap) { + fixedIssuers += 1 } - - continue } + } - // Tidy operations over revoked certs should execute prior to - // tidyRevokedCerts as that may remove the entry. If that happens, - // we won't persist the revInfo changes (as it was deleted instead). - var storeCert bool = false - if config.IssuerAssocs { - if !isRevInfoIssuerValid(&revInfo, issuerIDCertMap) { - b.tidyStatusIncMissingIssuerCertCount() - revInfo.CertificateIssuer = issuerID("") - storeCert = true - if associateRevokedCertWithIsssuer(&revInfo, revokedCert, issuerIDCertMap) { - fixedIssuers += 1 - } - } - } + if config.RevokedCerts { + // Only remove the entries from revoked/ and certs/ if we're + // past its NotAfter value. This is because we use the + // information on revoked/ to build the CRL and the + // information on certs/ for lookup. - if config.RevokedCerts { - // Only remove the entries from revoked/ and certs/ if we're - // past its NotAfter value. This is because we use the - // information on revoked/ to build the CRL and the - // information on certs/ for lookup. - - if time.Since(revokedCert.NotAfter) > revokedSafetyBuffer { - if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { - return false, fmt.Errorf("error deleting serial %q from revoked list: %w", serial, err) - } - if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { - return false, fmt.Errorf("error deleting serial %q from store when tidying revoked: %w", serial, err) - } - rebuildCRL = true - storeCert = false - b.tidyStatusIncRevokedCertCount() - revokedDeletedCount += 1 + if time.Since(revokedCert.NotAfter) > revokedSafetyBuffer { + if err := req.Storage.Delete(ctx, "revoked/"+serial); err != nil { + return false, fmt.Errorf("error deleting serial %q from revoked list: %w", serial, err) + } + if err := req.Storage.Delete(ctx, "certs/"+serial); err != nil { + return false, fmt.Errorf("error deleting serial %q from store when tidying revoked: %w", serial, err) } + rebuildCRL = true + storeCert = false + b.tidyStatusIncRevokedCertCount() + revokedDeletedCount += 1 } + } - // If the entry wasn't removed but was otherwise modified, - // go ahead and write it back out. - if storeCert { - revokedEntry, err = logical.StorageEntryJSON("revoked/"+serial, revInfo) - if err != nil { - return false, fmt.Errorf("error building entry to persist changes to serial %v from revoked list: %w", serial, err) - } + // If the entry wasn't removed but was otherwise modified, + // go ahead and write it back out. + if storeCert { + revokedEntry, err = logical.StorageEntryJSON("revoked/"+serial, revInfo) + if err != nil { + return false, fmt.Errorf("error building entry to persist changes to serial %v from revoked list: %w", serial, err) + } - err = req.Storage.Put(ctx, revokedEntry) - if err != nil { - return false, fmt.Errorf("error persisting changes to serial %v from revoked list: %w", serial, err) - } + err = req.Storage.Put(ctx, revokedEntry) + if err != nil { + return false, fmt.Errorf("error persisting changes to serial %v from revoked list: %w", serial, err) } } + return true, nil + } + + // Define batch-level callback for updating cumulative count after processing the page + batchCallback := func(page int, entries []string) (bool, error) { + totalRevokedSerialCount += len(entries) + return true, nil + } - // Update the cumulative count after processing the page - totalRevokedSerialCount += lenSerials + // Use handleListPage to process paginated results + err = logical.HandleListPage(req.Storage, "revoked/", config.PageSize, itemCallback, batchCallback) + if err != nil { + return false, err } totalRevokedSerialCount += int(revokedDeleted) @@ -1473,45 +1461,41 @@ func (b *backend) doTidyAcme(ctx context.Context, req *logical.Request, logger h defer b.acmeAccountLock.Unlock() sc := b.makeStorageContext(ctx, req.Storage) - var after string var thumbprintsCount uint var lenThumbprints int - for { - thumbprints, err := sc.Storage.ListPage(ctx, acmeThumbprintPrefix, after, config.PageSize) + itemCallback := func(page int, index int, thumbprint string) (bool, error) { + b.tidyStatusMessage(fmt.Sprintf("Tidying Acme: checking entry %d of %d on current page; total thumbprints checked: %d", index, lenThumbprints, int(thumbprintsCount)+index)) + + err := b.tidyAcmeAccountByThumbprint(b.acmeState, sc, thumbprint, config.SafetyBuffer, config.AcmeAccountSafetyBuffer) if err != nil { - return err + logger.Warn("error tidying account %v: %v", thumbprint, err.Error()) } - // If no thumbprints are returned, we've reached the end of the list - if len(thumbprints) == 0 { - break + // Check for cancel before continuing. + if atomic.CompareAndSwapUint32(b.tidyCancelCAS, 1, 0) { + return false, tidyCancelledError } - lenThumbprints = len(thumbprints) - after = thumbprints[lenThumbprints-1] - - for i, thumbprint := range thumbprints { - b.tidyStatusMessage(fmt.Sprintf("Tidying Acme: checking entry %d of %d on current page; total thumbprints checked: %d", i, lenThumbprints, int(thumbprintsCount)+i)) - - err := b.tidyAcmeAccountByThumbprint(b.acmeState, sc, thumbprint, config.SafetyBuffer, config.AcmeAccountSafetyBuffer) - if err != nil { - logger.Warn("error tidying account %v: %v", thumbprint, err.Error()) - } - - // Check for cancel before continuing. - if atomic.CompareAndSwapUint32(b.tidyCancelCAS, 1, 0) { - return tidyCancelledError - } - - // Check for pause duration to reduce resource consumption. - if config.PauseDuration > (0 * time.Second) { - b.acmeAccountLock.Unlock() // Correct the Lock - time.Sleep(config.PauseDuration) - b.acmeAccountLock.Lock() - } + // Check for pause duration to reduce resource consumption. + if config.PauseDuration > (0 * time.Second) { + b.acmeAccountLock.Unlock() // Correct the Lock + time.Sleep(config.PauseDuration) + b.acmeAccountLock.Lock() } + return true, nil + } + + // Define batch-level callback that updates cumulative count after processing the page + batchCallback := func(page int, entries []string) (bool, error) { thumbprintsCount += uint(lenThumbprints) + return true, nil + } + + // Use HandleListPage to process paginated results + err := logical.HandleListPage(req.Storage, acmeThumbprintPrefix, config.PageSize, itemCallback, batchCallback) + if err != nil { + return err } b.tidyStatusLock.Lock() diff --git a/sdk/logical/storage.go b/sdk/logical/storage.go index 3b76bf8176..22da742b60 100644 --- a/sdk/logical/storage.go +++ b/sdk/logical/storage.go @@ -165,3 +165,70 @@ func ClearViewWithLogging(ctx context.Context, view ClearableView, logger hclog. return nil } + +// HandleListPage provides a helper for processing paginated storage lists. +// It supports both item-level and batch-level callbacks for flexibility. +// +// itemCallback: Invoked for each individual entry in the paginated list. +// - Parameters: `page` (page index), `index` (entry index in the page), and `entry` (the storage entry). +// - Return: A boolean `cont` (whether to continue processing) and an `error` if an issue occurs. +// +// batchCallback: Invoked after processing a full batch of entries in the current page. +// - Parameters: `page` (page index) and `entries` (all entries in the current batch). +// - Return: A boolean `cont` (whether to continue processing) and an `error` if an issue occurs. +// +// The callbacks are executed sequentially, with `itemCallback` processing each entry individually, +// followed by `batchCallback` handling the entire batch. +func HandleListPage( + storage Storage, + prefix string, + limit int, + itemCallback func(page int, index int, entry string) (cont bool, err error), + batchCallback func(page int, entries []string) (cont bool, err error), +) error { + page := 0 + for { + var after string + + // Fetch the next page + entries, err := storage.ListPage(context.Background(), prefix, after, limit) + if err != nil { + return err + } + + // Exit if no entries are returned + if len(entries) == 0 { + break + } + + // Process each entry in the page + for index, entry := range entries { + cont, err := itemCallback(page, index, entry) + if err != nil || !cont { + return err + } + } + + // Process the entire batch + cont, err := batchCallback(page, entries) + if err != nil || !cont { + return err + } + + // Stop since all certs have already been processed + if limit <= 0 { + break + } + + // Stop since this is the last page; prevents 1 unnecessary call to ListPage + if len(entries) < limit { + break + } + + // Update after for the next page + after = entries[len(entries)-1] + page++ + } + + return nil +} From 7b6621f598c05c27de6b07b29f1bba2160bf34a2 Mon Sep 17 00:00:00 2001 From: Klaus Kiefer Date: Wed, 11 Dec 2024 15:59:31 +0100 Subject: [PATCH 2/3] Add test for Canonicalize (#790) * add test for Canonicalize Signed-off-by: klaus-sap * add counter for test case Signed-off-by: D023954 Signed-off-by: klaus-sap * sign-off Signed-off-by: klaus-sap --------- Signed-off-by: klaus-sap Signed-off-by: D023954 --- helper/namespace/namespace_test.go | 35 ++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/helper/namespace/namespace_test.go b/helper/namespace/namespace_test.go index fd4c4c2f99..856d642a74 100644 --- a/helper/namespace/namespace_test.go +++ b/helper/namespace/namespace_test.go @@ -7,6 +7,37 @@ import ( "testing" ) +func TestCanonicalize(t *testing.T) { + tcases := []struct { + nsPath string + result string + }{ + { + "", + "", + }, + { + "ns1", + "ns1/", + }, + { + "/ns1", + "ns1/", + }, + { + "ns1/ns2", + "ns1/ns2/", + }, + } + + for i, c := range tcases { + result := Canonicalize(c.nsPath) + if result != c.result { + t.Fatalf("bad test case %d: %s != %s", i, result, c.result) + } + } +} + func TestSplitIDFromString(t *testing.T) { tcases := []struct { input string @@ -70,10 +101,10 @@ func TestSplitIDFromString(t *testing.T) { }, } - for _, c := range tcases { + for i, c := range tcases { pre, id := SplitIDFromString(c.input) if pre != c.prefix || id != c.id { - t.Fatalf("bad test case: %s != %s, %s != %s", pre, c.prefix, id, c.id) + t.Fatalf("bad test case %d: %s != %s, %s != %s", i, pre, c.prefix, id, c.id) } } } From be513d8b7ee1cc9211dd1fdae0c8ced2e090c689 Mon Sep 17 00:00:00 2001 From: Jan Martens <44572196+JanMa@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:00:00 +0100 Subject: [PATCH 3/3] Test non-voters persist across leadership changes (#779) * lower autopilot delegate log level Writing a Debug log entry every time we try to fetch or store the non-voters generates a lot of log spam. Lower this to a Trace Signed-off-by: Jan Martens * test non-voters persist across leadership changes Signed-off-by: Jan Martens * fix data race in non-voter handling Ensure that the Custom Promoter works on a copy of the non-voter map and not on a reference. Signed-off-by: Jan Martens --------- Signed-off-by: Jan Martens --- physical/raft/raft_autopilot_delegate.go | 6 +- .../raft/raft_autopilot_test.go | 75 ++++++++++++++++++- 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/physical/raft/raft_autopilot_delegate.go b/physical/raft/raft_autopilot_delegate.go index ffcbb821de..1872d7b1ee 100644 --- a/physical/raft/raft_autopilot_delegate.go +++ b/physical/raft/raft_autopilot_delegate.go @@ -64,7 +64,7 @@ func (d *Delegate) AutopilotConfig() *autopilot.Config { MaxTrailingLogs: d.autopilotConfig.MaxTrailingLogs, MinQuorum: d.autopilotConfig.MinQuorum, ServerStabilizationTime: d.autopilotConfig.ServerStabilizationTime, - Ext: d.permanentNonVoters, + Ext: maps.Clone(d.permanentNonVoters), } return config } @@ -310,7 +310,7 @@ func (d *Delegate) NonVoters() []raft.ServerID { func (d *Delegate) StoreNonVoters() error { d.dl.RLock() defer d.dl.RUnlock() - d.logger.Debug("updating non-voters", "non_voters", d.permanentNonVoters) + d.logger.Trace("updating non-voters", "non_voters", d.permanentNonVoters) v, err := json.Marshal(d.permanentNonVoters) if err != nil { return err @@ -332,7 +332,6 @@ func (d *Delegate) FetchNonVoters() error { } if e == nil { - d.logger.Debug("no non-voters") return nil } @@ -346,6 +345,7 @@ func (d *Delegate) FetchNonVoters() error { nV := d.permanentNonVoters d.dl.RUnlock() if !maps.Equal(nV, nonVoters) { + d.logger.Trace("fetched new non-voters", "non_voters", nonVoters) d.dl.Lock() d.permanentNonVoters = nonVoters d.dl.Unlock() diff --git a/vault/external_tests/raft/raft_autopilot_test.go b/vault/external_tests/raft/raft_autopilot_test.go index 718d0adcde..505efa6c5e 100644 --- a/vault/external_tests/raft/raft_autopilot_test.go +++ b/vault/external_tests/raft/raft_autopilot_test.go @@ -412,6 +412,75 @@ func TestRaft_VotersStayVoters(t *testing.T) { require.NoError(t, err) } +// TestRaft_NonVotersStayNonVoters ensures that autopilot doesn't promote a node +// that was marked as a non-voter after a leader election. +func TestRaft_NonVotersStayNonVoters(t *testing.T) { + cluster, _ := raftCluster(t, &RaftClusterOpts{ + DisableFollowerJoins: true, + InmemCluster: true, + EnableAutopilot: true, + PhysicalFactoryConfig: map[string]interface{}{ + "performance_multiplier": "5", + "autopilot_reconcile_interval": "300ms", + "autopilot_update_interval": "100ms", + }, + VersionMap: map[int]string{ + 0: version.Version, + 1: version.Version, + 2: version.Version, + 3: version.Version, + }, + NumCores: 4, + }) + defer cluster.Cleanup() + testhelpers.WaitForActiveNode(t, cluster) + + client := cluster.Cores[0].Client + + config, err := client.Sys().RaftAutopilotConfiguration() + require.NoError(t, err) + joinAndStabilizeAndPromote(t, cluster.Cores[1], client, cluster, config, "core-1", 2) + joinAndStabilizeAndPromote(t, cluster.Cores[2], client, cluster, config, "core-2", 3) + + errIfNonVotersExist := func() error { + t.Helper() + resp, err := client.Sys().RaftAutopilotState() + if err != nil { + t.Fatal(err) + } + for k, v := range resp.Servers { + if v.Status == "non-voter" { + return fmt.Errorf("node %q is a non-voter", k) + } + } + return nil + } + + errIfVoter := func() error { + t.Helper() + resp, err := client.Sys().RaftAutopilotState() + if err != nil { + t.Fatal(err) + } + if resp.Servers["core-3"].Status == "voter" { + return fmt.Errorf("node %q is a voter", "core-3") + } + return nil + } + testhelpers.RetryUntil(t, 10*time.Second, errIfNonVotersExist) + + joinAndStabilize(t, cluster.Cores[3], client, cluster, config, "core-3", 4, true) + + // Core0 is the leader, sealing it will both cause an election - and the + // new leader won't have seen any heartbeats initially - and create a "down" + // node that won't be sending heartbeats. + testhelpers.EnsureCoreSealed(t, cluster.Cores[0]) + time.Sleep(config.ServerStabilizationTime + 2*time.Second) + client = cluster.Cores[1].Client + err = errIfVoter() + require.NoError(t, err) +} + // TestRaft_Autopilot_DeadServerCleanup tests that dead servers are correctly // removed by Vault and autopilot when a node stops and a replacement node joins. // The expected behavior is that removing a node from a 3 node cluster wouldn't @@ -499,7 +568,7 @@ func TestRaft_Autopilot_DeadServerCleanup(t *testing.T) { } func joinAndStabilizeAndPromote(t *testing.T, core *vault.TestClusterCore, client *api.Client, cluster *vault.TestCluster, config *api.AutopilotConfig, nodeID string, numServers int) { - joinAndStabilize(t, core, client, cluster, config, nodeID, numServers) + joinAndStabilize(t, core, client, cluster, config, nodeID, numServers, false) // Now that the server is stable, wait for autopilot to reconcile and // promotion to happen. Reconcile interval is 10 seconds. Bound it by @@ -523,9 +592,9 @@ func joinAndStabilizeAndPromote(t *testing.T, core *vault.TestClusterCore, clien } } -func joinAndStabilize(t *testing.T, core *vault.TestClusterCore, client *api.Client, cluster *vault.TestCluster, config *api.AutopilotConfig, nodeID string, numServers int) { +func joinAndStabilize(t *testing.T, core *vault.TestClusterCore, client *api.Client, cluster *vault.TestCluster, config *api.AutopilotConfig, nodeID string, numServers int, nonVoter bool) { t.Helper() - joinAndUnseal(t, core, cluster, false, false) + joinAndUnseal(t, core, cluster, nonVoter, false) time.Sleep(2 * time.Second) state, err := client.Sys().RaftAutopilotState()