Skip to content

Commit

Permalink
Merge pull request onflow#6803 from onflow/leo/db-ops-writes
Browse files Browse the repository at this point in the history
Extract uncurried functions from storage writes
  • Loading branch information
zhangchiqing authored Dec 18, 2024
2 parents ee4c9e4 + 36ae5ce commit 45950a7
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 77 deletions.
35 changes: 2 additions & 33 deletions ledger/complete/wal/checkpoint_v6_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
utilsio "github.com/onflow/flow-go/utils/io"
"github.com/onflow/flow-go/utils/merr"
)

const subtrieLevel = 4
Expand Down Expand Up @@ -708,39 +709,7 @@ func decodeSubtrieCount(encoded []byte) (uint16, error) {
return binary.BigEndian.Uint16(encoded), nil
}

// closeAndMergeError close the closable and merge the closeErr with the given err into a multierror
// Note: when using this function in a defer function, don't use as below:
// func XXX() (
//
// err error,
// ) {
// def func() {
// // bad, because the definition of err might get overwritten
// err = closeAndMergeError(closable, err)
// }()
//
// Better to use as below:
// func XXX() (
//
// errToReturn error,
// ) {
// def func() {
// // good, because the error to returned is only updated here, and guaranteed to be returned
// errToReturn = closeAndMergeError(closable, errToReturn)
// }()
func closeAndMergeError(closable io.Closer, err error) error {
var merr *multierror.Error
if err != nil {
merr = multierror.Append(merr, err)
}

closeError := closable.Close()
if closeError != nil {
merr = multierror.Append(merr, closeError)
}

return merr.ErrorOrNil()
}
var closeAndMergeError = merr.CloseAndMergeError

// withFile opens the file at the given path, and calls the given function with the opened file.
// it handles closing the file and evicting the file from Linux page cache.
Expand Down
150 changes: 150 additions & 0 deletions storage/operation/prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//nolint:golint,unused
package operation

import (
"encoding/binary"
"fmt"

"github.com/onflow/flow-go/model/flow"
)

const (

// codes for special database markers
// codeMax = 1 // deprecated
codeDBType = 2 // specifies a database type

// codes for views with special meaning
codeSafetyData = 10 // safety data for hotstuff state
codeLivenessData = 11 // liveness data for hotstuff state

// codes for fields associated with the root state
codeSporkID = 13
codeProtocolVersion = 14
codeEpochCommitSafetyThreshold = 15
codeSporkRootBlockHeight = 16

// code for heights with special meaning
codeFinalizedHeight = 20 // latest finalized block height
codeSealedHeight = 21 // latest sealed block height
codeClusterHeight = 22 // latest finalized height on cluster
codeExecutedBlock = 23 // latest executed block with max height
codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot
codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received
codeEpochFirstHeight = 26 // the height of the first block in a given epoch
codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot

// codes for single entity storage
codeHeader = 30
_ = 31 // DEPRECATED: 31 was used for identities before epochs
codeGuarantee = 32
codeSeal = 33
codeTransaction = 34
codeCollection = 35
codeExecutionResult = 36
codeResultApproval = 37
codeChunk = 38
codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36)

// codes for indexing single identifier by identifier/integer
codeHeightToBlock = 40 // index mapping height to block ID
codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal
codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID
codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs
codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID
codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID
codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID
codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID

// codes for indexing multiple identifiers by identifier
codeBlockChildren = 50 // index mapping block ID to children blocks
_ = 51 // DEPRECATED: 51 was used for identity indexes before epochs
codePayloadGuarantees = 52 // index mapping block ID to payload guarantees
codePayloadSeals = 53 // index mapping block ID to payload seals
codeCollectionBlock = 54 // index mapping collection ID to block ID
codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes
_ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25
codePayloadReceipts = 57 // index mapping block ID to payload receipts
codePayloadResults = 58 // index mapping block ID to payload results
codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts
codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID

// codes related to protocol level information
codeEpochSetup = 61 // EpochSetup service event, keyed by ID
codeEpochCommit = 62 // EpochCommit service event, keyed by ID
codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter
codeDKGStarted = 64 // flag that the DKG for an epoch has been started
codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state)
codeVersionBeacon = 67 // flag for storing version beacons
codeEpochProtocolState = 68
codeProtocolKVStore = 69

// code for ComputationResult upload status storage
// NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to
// be supported, we will need to define new code.
codeComputationResults = 66

// job queue consumers and producers
codeJobConsumerProcessed = 70
codeJobQueue = 71
codeJobQueuePointer = 72

// legacy codes (should be cleaned up)
codeChunkDataPack = 100
codeCommit = 101
codeEvent = 102
codeExecutionStateInteractions = 103
codeTransactionResult = 104
codeFinalizedCluster = 105
codeServiceEvent = 106
codeTransactionResultIndex = 107
codeLightTransactionResult = 108
codeLightTransactionResultIndex = 109
codeTransactionResultErrorMessage = 110
codeTransactionResultErrorMessageIndex = 111
codeIndexCollection = 200
codeIndexExecutionResultByBlock = 202
codeIndexCollectionByTransaction = 203
codeIndexResultApprovalByChunk = 204

// TEMPORARY codes
blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only

// internal failure information that should be preserved across restarts
codeExecutionFork = 254
codeEpochEmergencyFallbackTriggered = 255
)

func MakePrefix(code byte, keys ...interface{}) []byte {
prefix := make([]byte, 1)
prefix[0] = code
for _, key := range keys {
prefix = append(prefix, KeyPartToBytes(key)...)
}
return prefix
}

func KeyPartToBytes(v interface{}) []byte {
switch i := v.(type) {
case uint8:
return []byte{i}
case uint32:
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
case uint64:
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
return b
case string:
return []byte(i)
case flow.Role:
return []byte{byte(i)}
case flow.Identifier:
return i[:]
case flow.ChainID:
return []byte(i)
default:
panic(fmt.Sprintf("unsupported type to convert (%T)", v))
}
}
29 changes: 20 additions & 9 deletions storage/operation/reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/merr"
)

// CheckFunc is a function that checks if the value should be read and decoded.
Expand Down Expand Up @@ -51,7 +52,7 @@ func IterateKeysByPrefixRange(r storage.Reader, startPrefix []byte, endPrefix []
// IterateKeys will iterate over all entries in the database, where the key starts with a prefixes in
// the range [startPrefix, endPrefix] (both inclusive).
// No errors expected during normal operations.
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) error {
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) (errToReturn error) {
if len(startPrefix) == 0 {
return fmt.Errorf("startPrefix prefix is empty")
}
Expand All @@ -69,7 +70,9 @@ func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFun
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(it, errToReturn)
}()

for it.First(); it.Valid(); it.Next() {
item := it.IterItem()
Expand Down Expand Up @@ -130,7 +133,7 @@ func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, o
// When this returned function is executed (and only then), it will write into the `keyExists` whether
// the key exists.
// No errors are expected during normal operation.
func KeyExists(r storage.Reader, key []byte) (bool, error) {
func KeyExists(r storage.Reader, key []byte) (exist bool, errToReturn error) {
_, closer, err := r.Get(key)
if err != nil {
// the key does not exist in the database
Expand All @@ -140,7 +143,9 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
// exception while checking for the key
return false, irrecoverable.NewExceptionf("could not load data: %w", err)
}
defer closer.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

// the key does exist in the database
return true, nil
Expand All @@ -153,13 +158,15 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
// - storage.ErrNotFound if the key does not exist in the database
// - generic error in case of unexpected failure from the database layer, or failure
// to decode an existing database value
func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) (errToReturn error) {
val, closer, err := r.Get(key)
if err != nil {
return err
}

defer closer.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

err = msgpack.Unmarshal(val, entity)
if err != nil {
Expand All @@ -172,7 +179,7 @@ func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
// keys with the format prefix` + `height` (where "+" denotes concatenation of binary strings). The height
// is encoded as Big-Endian (entries with numerically smaller height have lexicographically smaller key).
// The function finds the *highest* key with the given prefix and height equal to or below the given height.
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) error {
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) (errToReturn error) {
if len(prefix) == 0 {
return fmt.Errorf("prefix must not be empty")
}
Expand All @@ -182,7 +189,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(it, errToReturn)
}()

var highestKey []byte

Expand All @@ -203,7 +212,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64
return err
}

defer closer.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

err = msgpack.Unmarshal(val, entity)
if err != nil {
Expand Down
64 changes: 29 additions & 35 deletions storage/operation/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,61 +10,55 @@ import (
"github.com/onflow/flow-go/storage"
)

// Upsert will encode the given entity using msgpack and will insert the resulting
// UpsertByKey will encode the given entity using msgpack and will insert the resulting
// binary data under the provided key.
// If the key already exists, the value will be overwritten.
// Error returns:
// - generic error in case of unexpected failure from the database layer or
// encoding failure.
func Upsert(key []byte, val interface{}) func(storage.Writer) error {
return func(w storage.Writer) error {
value, err := msgpack.Marshal(val)
if err != nil {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}

err = w.Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}
func UpsertByKey(w storage.Writer, key []byte, val interface{}) error {
value, err := msgpack.Marshal(val)
if err != nil {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}

return nil
err = w.Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}

return nil
}

// Remove removes the entity with the given key, if it exists. If it doesn't
// RemoveByKey removes the entity with the given key, if it exists. If it doesn't
// exist, this is a no-op.
// Error returns:
// * generic error in case of unexpected database error
func Remove(key []byte) func(storage.Writer) error {
return func(w storage.Writer) error {
err := w.Delete(key)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
func RemoveByKey(w storage.Writer, key []byte) error {
err := w.Delete(key)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
}

// RemoveByPrefix removes all keys with the given prefix
// RemoveByKeyPrefix removes all keys with the given prefix
// Error returns:
// * generic error in case of unexpected database error
func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error {
return RemoveByRange(reader, key, key)
func RemoveByKeyPrefix(reader storage.Reader, w storage.Writer, key []byte) error {
return RemoveByKeyRange(reader, w, key, key)
}

// RemoveByRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
// RemoveByKeyRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
// It returns error if endPrefix < startPrefix
// no other errors are expected during normal operation
func RemoveByRange(reader storage.Reader, startPrefix []byte, endPrefix []byte) func(storage.Writer) error {
return func(w storage.Writer) error {
if bytes.Compare(startPrefix, endPrefix) > 0 {
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
}
err := w.DeleteByRange(reader, startPrefix, endPrefix)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
func RemoveByKeyRange(reader storage.Reader, w storage.Writer, startPrefix []byte, endPrefix []byte) error {
if bytes.Compare(startPrefix, endPrefix) > 0 {
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
}
err := w.DeleteByRange(reader, startPrefix, endPrefix)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
}
Loading

0 comments on commit 45950a7

Please sign in to comment.