diff --git a/backup.go b/backup.go index aa0333916..6d4cb6239 100644 --- a/backup.go +++ b/backup.go @@ -21,7 +21,10 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "io" + "strconv" + "strings" "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" @@ -199,6 +202,25 @@ func (l *KVLoader) Finish() error { return l.throttle.Finish() } +// IDFromString returns the ID corresponding to the input string. +// Requires that s was produced by a previous call to ID.String. +func IDFromString(s string) (uint64, error) { + s = strings.TrimPrefix(s, "0x") + id, err := strconv.ParseUint(s, 16, 64) + return id, err +} + +const badgerKeySep = "/" + +func decodeBadgerKeyForDocID(key []byte) (uint64, error) { + components := strings.SplitN(string(key), badgerKeySep, 4) + if id, err := IDFromString(components[0]); err != nil { + return 0, fmt.Errorf("parsing docid failed: %v %w", components[0], err) + } else { + return id, nil + } +} + // Load reads a protobuf-encoded list of all entries from a reader and writes // them to the database. This can be used to restore the database from a backup // made by calling DB.Backup(). If more complex logic is needed to restore a badger @@ -252,3 +274,62 @@ func (db *DB) Load(r io.Reader, maxPendingWrites int) error { db.orc.txnMark.Done(db.orc.nextTxnTs - 1) return nil } + +// LoadForRange reads a protobuf-encoded list of all entries from a reader and writes +// them to the database if the docID is within the appropriate range . This can be used +// to restore the database from a backup // made by calling DB.Backup(). If more complex logic is needed to restore a badger +// backup, the KVLoader interface should be used instead. +// +// DB.LoadForRange() should be called on a database that is not running any other +// concurrent transactions while it is running. +func (db *DB) LoadForRange(r io.Reader, maxPendingWrites int, minID uint64, maxID uint64) error { + br := bufio.NewReaderSize(r, 16<<10) + unmarshalBuf := make([]byte, 1<<10) + + ldr := db.NewKVLoader(maxPendingWrites) + for { + var sz uint64 + err := binary.Read(br, binary.LittleEndian, &sz) + if err == io.EOF { + break + } else if err != nil { + return err + } + + if cap(unmarshalBuf) < int(sz) { + unmarshalBuf = make([]byte, sz) + } + + if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { + return err + } + + list := &pb.KVList{} + if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil { + return err + } + + for _, kv := range list.Kv { + if id, err := decodeBadgerKeyForDocID(kv.Key); err != nil { + return err + } else if id < minID || id > maxID { + continue + } + if err := ldr.Set(kv); err != nil { + return err + } + + // Update nextTxnTs, memtable stores this + // timestamp in badger head when flushed. + if kv.Version >= db.orc.nextTxnTs { + db.orc.nextTxnTs = kv.Version + 1 + } + } + } + + if err := ldr.Finish(); err != nil { + return err + } + db.orc.txnMark.Done(db.orc.nextTxnTs - 1) + return nil +}