Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load backup with restricted range #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"strconv"
"strings"

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
Expand Down Expand Up @@ -199,6 +202,25 @@ func (l *KVLoader) Finish() error {
return l.throttle.Finish()
}

// IDFromString returns the ID corresponding to the input string.
// Requires that s was produced by a previous call to ID.String.
func IDFromString(s string) (uint64, error) {
s = strings.TrimPrefix(s, "0x")
id, err := strconv.ParseUint(s, 16, 64)
return id, err
}

const badgerKeySep = "/"

func decodeBadgerKeyForDocID(key []byte) (uint64, error) {
components := strings.SplitN(string(key), badgerKeySep, 4)
if id, err := IDFromString(components[0]); err != nil {
return 0, fmt.Errorf("parsing docid failed: %v %w", components[0], err)
} else {
return id, nil
}
}

// Load reads a protobuf-encoded list of all entries from a reader and writes
// them to the database. This can be used to restore the database from a backup
// made by calling DB.Backup(). If more complex logic is needed to restore a badger
Expand Down Expand Up @@ -252,3 +274,62 @@ func (db *DB) Load(r io.Reader, maxPendingWrites int) error {
db.orc.txnMark.Done(db.orc.nextTxnTs - 1)
return nil
}

// LoadForRange reads a protobuf-encoded list of all entries from a reader and writes
// them to the database if the docID is within the appropriate range . This can be used
// to restore the database from a backup // made by calling DB.Backup(). If more complex logic is needed to restore a badger
// backup, the KVLoader interface should be used instead.
//
// DB.LoadForRange() should be called on a database that is not running any other
// concurrent transactions while it is running.
func (db *DB) LoadForRange(r io.Reader, maxPendingWrites int, minID uint64, maxID uint64) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass in configDocRanges (docs.RangeSlice) instead of minID and maxID?

br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)

ldr := db.NewKVLoader(maxPendingWrites)
for {
var sz uint64
err := binary.Read(br, binary.LittleEndian, &sz)
if err == io.EOF {
break
} else if err != nil {
return err
}

if cap(unmarshalBuf) < int(sz) {
unmarshalBuf = make([]byte, sz)
}

if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
return err
}

list := &pb.KVList{}
if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil {
return err
}

for _, kv := range list.Kv {
if id, err := decodeBadgerKeyForDocID(kv.Key); err != nil {
return err
} else if id < minID || id > maxID {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment, can we use !configDocRanges.Contains(id) ?

continue
}
if err := ldr.Set(kv); err != nil {
return err
}

// Update nextTxnTs, memtable stores this
// timestamp in badger head when flushed.
if kv.Version >= db.orc.nextTxnTs {
db.orc.nextTxnTs = kv.Version + 1
}
}
}

if err := ldr.Finish(); err != nil {
return err
}
db.orc.txnMark.Done(db.orc.nextTxnTs - 1)
return nil
}