Skip to content

Commit

Permalink
Add write-to-storage part
Browse files Browse the repository at this point in the history
  • Loading branch information
harshitbansal05 committed Feb 5, 2023
1 parent c6fc268 commit 0b64f0f
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 7 deletions.
82 changes: 80 additions & 2 deletions kv/raftstore/peer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type PeerStorage struct {
}

// NewPeerStorage get the persist raftState from engines and return a peer storage
// MY: Tries to get RaftLocalState, RaftApplyState from engines. If not present, then intializes them with a default value of 5 for all keys.
func NewPeerStorage(engines *engine_util.Engines, region *metapb.Region, regionSched chan<- worker.Task, tag string) (*PeerStorage, error) {
log.Debugf("%s creating storage for %s", tag, region.String())
raftState, err := meta.InitRaftLocalState(engines.Raft, region)
Expand Down Expand Up @@ -86,6 +87,7 @@ func (ps *PeerStorage) InitialState() (eraftpb.HardState, eraftpb.ConfState, err
return *raftState.HardState, util.ConfStateFromRegion(ps.region), nil
}

// Fetches entries in the given range from RaftDB storage.
func (ps *PeerStorage) Entries(low, high uint64) ([]eraftpb.Entry, error) {
if err := ps.checkRange(low, high); err != nil || low == high {
return nil, err
Expand Down Expand Up @@ -126,6 +128,7 @@ func (ps *PeerStorage) Entries(low, high uint64) ([]eraftpb.Entry, error) {
return nil, raft.ErrUnavailable
}

// Returns term for a given entry, if present in storage. Entry might be compacted due to applying or out of bounds due to idx > lastIndex.
func (ps *PeerStorage) Term(idx uint64) (uint64, error) {
if idx == ps.truncatedIndex() {
return ps.truncatedTerm(), nil
Expand All @@ -151,6 +154,8 @@ func (ps *PeerStorage) FirstIndex() (uint64, error) {
return ps.truncatedIndex() + 1, nil
}

// Tries to return a snapshot reading from a channel, which would be filled by RegionTaskGen runner. If snapshot is not generated, it tries till
// a maximum count of 5.
func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error) {
var snapshot eraftpb.Snapshot
if ps.snapState.StateType == snap.SnapState_Generating {
Expand Down Expand Up @@ -230,6 +235,7 @@ func (ps *PeerStorage) AppliedIndex() uint64 {
return ps.applyState.AppliedIndex
}

// Validates snapshot for staleness, corruptness, etc.
func (ps *PeerStorage) validateSnap(snap *eraftpb.Snapshot) bool {
idx := snap.GetMetadata().GetIndex()
if idx < ps.truncatedIndex() {
Expand Down Expand Up @@ -308,6 +314,33 @@ func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatc
// never be committed
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
// Your Code Here (2B).
storage_li, err := ps.LastIndex()
if err != nil {
return err
}
if len(entries) > 0 {
entries_fi := entries[0].Index
if storage_li+1 < entries_fi {
log.Panicf("[PeerStorage.Append] last index of raft log in stable storage "+
"is %d + 1 < unstable index %d", storage_li, entries_fi)
}
// delete log entries that will never be committed
if storage_li >= entries_fi {
log.Infof("[PeerStorage.Append] delete log entries in range [%d, %d] from raftDB",
entries_fi, storage_li)
for i := entries_fi; i < storage_li+1; i++ {
raftWB.DeleteMeta(meta.RaftLogKey(ps.region.Id, i))
}
}
// Append the given entries to the raft log
log.Infof("[PeerStorage.Append] append log entries in range [%d, %d] into raftDB",
entries[0].Index, entries[len(entries)-1].Index)
for _, e := range entries {
raftWB.SetMeta(meta.RaftLogKey(ps.region.Id, e.Index), &e)
}
ps.raftState.LastIndex = entries[len(entries)-1].Index
ps.raftState.LastTerm = entries[len(entries)-1].Term
}
return nil
}

Expand All @@ -323,15 +356,60 @@ func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_ut
// and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
// and ps.clearExtraData to delete stale data
// Your Code Here (2C).
return nil, nil
if ps.isInitialized() {
if err := ps.clearMeta(kvWB, raftWB); err != nil {
return nil, err
}
ps.clearExtraData(snapData.Region)
}
// update states
ps.raftState.LastIndex = snapshot.Metadata.Index
ps.raftState.LastTerm = snapshot.Metadata.Term
ps.applyState.AppliedIndex = snapshot.Metadata.Index
ps.applyState.TruncatedState.Index = snapshot.Metadata.Index
ps.applyState.TruncatedState.Term = snapshot.Metadata.Term
ps.snapState.StateType = snap.SnapState_Applying
kvWB.SetMeta(meta.ApplyStateKey(snapData.Region.GetId()), ps.applyState)
// send RegionTaskApply and wait
ch := make(chan bool, 1)
ps.regionSched <- &runner.RegionTaskApply{
RegionId: ps.region.GetId(),
Notifier: ch,
SnapMeta: snapshot.Metadata,
StartKey: snapData.Region.GetStartKey(),
EndKey: snapData.Region.GetEndKey(),
}
result := &ApplySnapResult{
PrevRegion: ps.region,
Region: snapData.Region,
}
meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
return result, nil
}

// Save memory states to disk.
// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
// Hint: you may call `Append()` and `ApplySnapshot()` in this function
// Your Code Here (2B/2C).
return nil, nil
raftWB := &engine_util.WriteBatch{}
var result *ApplySnapResult
var err error
// apply snapshot
if !raft.IsEmptySnap(&ready.Snapshot) {
kvWb := &engine_util.WriteBatch{}
result, err = ps.ApplySnapshot(&ready.Snapshot, kvWb, raftWB)
kvWb.WriteToDB(ps.Engines.Kv)
}
// store raft log
ps.Append(ready.Entries, raftWB)
// store RaftLocalState
if !raft.IsEmptyHardState(ready.HardState) {
ps.raftState.HardState = &ready.HardState
}
raftWB.SetMeta(meta.RaftStateKey(ps.region.GetId()), ps.raftState)
raftWB.WriteToDB(ps.Engines.Raft)
return result, err
}

func (ps *PeerStorage) ClearData() {
Expand Down
30 changes: 25 additions & 5 deletions kv/raftstore/snap/snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func NewApplyOptions(db *badger.DB, region *metapb.Region) *ApplyOptions {

// `Snapshot` is an interface for snapshot.
// It's used in these scenarios:
// 1. build local snapshot
// 2. read local snapshot and then replicate it to remote raftstores
// 3. receive snapshot from remote raftstore and write it to local storage
// 4. apply snapshot
// 5. snapshot gc
// 1. build local snapshot
// 2. read local snapshot and then replicate it to remote raftstores
// 3. receive snapshot from remote raftstore and write it to local storage
// 4. apply snapshot
// 5. snapshot gc
type Snapshot interface {
io.Reader
io.Writer
Expand Down Expand Up @@ -144,6 +144,7 @@ func retryDeleteSnapshot(deleter SnapshotDeleter, key SnapKey, snap Snapshot) bo
return false
}

// Gets CF, Size, Checksum from cFFiles, and returns them.
func genSnapshotMeta(cfFiles []*CFFile) (*rspb.SnapshotMeta, error) {
cfMetas := make([]*rspb.SnapshotCFFile, 0, len(engine_util.CFs))
for _, cfFile := range cfFiles {
Expand Down Expand Up @@ -235,6 +236,9 @@ type Snap struct {
holdTmpFiles bool
}

// Does the following steps:
// 1. Path of CF file: dir/{gen/rev}_<region_id>_<term>_<index>_{default/write/lock}.sst
// 2. Path of meta file: dir/{gen/rev}.meta
func NewSnap(dir string, key SnapKey, sizeTrack *int64, isSending, toBuild bool,
deleter SnapshotDeleter) (*Snap, error) {
if !util.DirExists(dir) {
Expand Down Expand Up @@ -295,6 +299,8 @@ func NewSnap(dir string, key SnapKey, sizeTrack *int64, isSending, toBuild bool,
return s, nil
}

// If s exists, returns. Else creates .tmp meta and CF files. Sets s.holdTmpFiles to true.
// Sets MetaFile.File to the tmp file. Sets cfFile.SstWriter to the .tmp CF file.
func NewSnapForBuilding(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
s, err := NewSnap(dir, key, sizeTrack, true, true, deleter)
if err != nil {
Expand All @@ -307,6 +313,7 @@ func NewSnapForBuilding(dir string, key SnapKey, sizeTrack *int64, deleter Snaps
return s, nil
}

// If s does not exists, return. If it does, opens the files at CFFile.Path, and sets it to CFFile.File
func NewSnapForSending(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
s, err := NewSnap(dir, key, sizeTrack, true, false, deleter)
if err != nil {
Expand All @@ -328,6 +335,8 @@ func NewSnapForSending(dir string, key SnapKey, sizeTrack *int64, deleter Snapsh
return s, nil
}

// If s does exists, return. Else creates .tmp meta and CF files. Sets s.holdTmpFiles to true.
// Sets MetaFile.File and CFFile.File to the tmp file.
func NewSnapForReceiving(dir string, key SnapKey, snapshotMeta *rspb.SnapshotMeta,
sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
s, err := NewSnap(dir, key, sizeTrack, false, false, deleter)
Expand Down Expand Up @@ -366,6 +375,8 @@ func NewSnapForApplying(dir string, key SnapKey, sizeTrack *int64, deleter Snaps
return NewSnap(dir, key, sizeTrack, false, false, deleter)
}

// If s exists, returns. Else creates .tmp meta and CF files. Sets s.holdTmpFiles to true.
// Sets MetaFile.File to the tmp file. Sets cfFile.SstWriter to the .tmp CF file.
func (s *Snap) initForBuilding() error {
if s.Exists() {
return nil
Expand All @@ -386,6 +397,7 @@ func (s *Snap) initForBuilding() error {
return nil
}

// Unmarshals snapshotMeta from s.MetaFile.Path.
func (s *Snap) readSnapshotMeta() (*rspb.SnapshotMeta, error) {
fi, err := os.Stat(s.MetaFile.Path)
if err != nil {
Expand All @@ -409,6 +421,7 @@ func (s *Snap) readSnapshotMeta() (*rspb.SnapshotMeta, error) {
return snapshotMeta, nil
}

// Reverse of genSnapshotMeta(). Sets the Size and Checksum attributes of CFFile after reading from snapshotMeta.
func (s *Snap) setSnapshotMeta(snapshotMeta *rspb.SnapshotMeta) error {
if len(snapshotMeta.CfFiles) != len(s.CFFiles) {
return errors.Errorf("invalid CF number of snapshot meta, expect %d, got %d",
Expand All @@ -433,6 +446,7 @@ func (s *Snap) setSnapshotMeta(snapshotMeta *rspb.SnapshotMeta) error {
return nil
}

// Calls readSnapshotMeta and then setSnapshotMeta.
func (s *Snap) loadSnapMeta() error {
snapshotMeta, err := s.readSnapshotMeta()
if err != nil {
Expand Down Expand Up @@ -467,6 +481,7 @@ func (s *Snap) validate() error {
return nil
}

// Cloes sstWriter, and renames CF file at tmpPath to Path.
func (s *Snap) saveCFFiles() error {
for _, cfFile := range s.CFFiles {
if cfFile.KVCount > 0 {
Expand Down Expand Up @@ -503,6 +518,7 @@ func (s *Snap) saveCFFiles() error {
return nil
}

// Reads s.MetaFile.Meta and writes it to MetaFile.File (which is at the tmp file path). Sets s.holdTmpFiles to false.
func (s *Snap) saveMetaFile() error {
bin, err := s.MetaFile.Meta.Marshal()
if err != nil {
Expand All @@ -520,6 +536,8 @@ func (s *Snap) saveMetaFile() error {
return nil
}

// If s exists, returns. Builds the snapshot by using builder.build function. Calls s.saveCFFiles() and then calls genSnapshotMeta to generate
// metadata from the Cf files. Sets the generated meta to s.MetaFile.Meta and and then calls s.saveMetaFile().
func (s *Snap) Build(dbSnap *badger.Txn, region *metapb.Region, snapData *rspb.RaftSnapshotData, stat *SnapStatistics, deleter SnapshotDeleter) error {
if s.Exists() {
err := s.validate()
Expand Down Expand Up @@ -582,6 +600,7 @@ func (s *Snap) Exists() bool {
return util.FileExists(s.MetaFile.Path)
}

// Deletes snapshot files, taking into consideration the tmp files.
func (s *Snap) Delete() {
log.Debugf("deleting %s", s.Path())
for _, cfFile := range s.CFFiles {
Expand Down Expand Up @@ -626,6 +645,7 @@ func (s *Snap) TotalSize() (total uint64) {
return
}

// Combination of both saveCFFiles and saveMetaFile.
func (s *Snap) Save() error {
log.Debugf("saving to %s", s.MetaFile.Path)
for _, cfFile := range s.CFFiles {
Expand Down
1 change: 1 addition & 0 deletions kv/raftstore/snap/snap_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func newSnapBuilder(cfFiles []*CFFile, dbSnap *badger.Txn, region *metapb.Region
}
}

// For a region, Adds entries to the snapshots (CF wise) from the raft entries.
func (b *snapBuilder) build() error {
defer b.txn.Discard()
startKey, endKey := b.region.StartKey, b.region.EndKey
Expand Down
6 changes: 6 additions & 0 deletions kv/raftstore/snap/snap_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewSnapManager(path string) *SnapManager {
return new(SnapManagerBuilder).Build(path)
}

// Creates base directory (if not exists). Loops over the files in this directory: If a tmp file, removes it. If a .sst file, adds it's size to sm.snapSize.
func (sm *SnapManager) Init() error {
fi, err := os.Stat(sm.base)
if os.IsNotExist(err) {
Expand Down Expand Up @@ -91,6 +92,8 @@ func (sm *SnapManager) Init() error {
return nil
}

// Loops over the meta files in base directory; Constructs the SnapKey from the file's name. If the SnapKey key does not exist in th sm.registry, adds
// it to the result. Lastly, sorts the result and returns.
func (sm *SnapManager) ListIdleSnap() ([]SnapKeyWithSending, error) {
fis, err := ioutil.ReadDir(sm.base)
if err != nil {
Expand Down Expand Up @@ -163,6 +166,7 @@ func (sm *SnapManager) GetTotalSnapSize() uint64 {
return uint64(atomic.LoadInt64(sm.snapSize))
}

// If the total snapshot size exceeds the max size, deletes old idle snaps. Then, calls NewSnapForBuilding().
func (sm *SnapManager) GetSnapshotForBuilding(key SnapKey) (Snapshot, error) {
if sm.GetTotalSnapSize() > sm.MaxTotalSize {
err := sm.deleteOldIdleSnaps()
Expand All @@ -173,6 +177,8 @@ func (sm *SnapManager) GetSnapshotForBuilding(key SnapKey) (Snapshot, error) {
return NewSnapForBuilding(sm.base, key, sm.snapSize, sm)
}

// Calls sm.GetSnapshotForSending() for idle snaps; filters sending snaps from them; Calls sm.GetSnapshotForSending(); Sorts them by the modification time;
// Calls sm.DeleteSnapshot() for them till the total suize exceeds the max size.
func (sm *SnapManager) deleteOldIdleSnaps() error {
idleSnaps, err := sm.ListIdleSnap()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions kv/storage/raft_storage/raft_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func newRaftClient(config *config.Config) *RaftClient {
}
}

// Gets connection object for an address
func (c *RaftClient) getConn(addr string, regionID uint64) (*raftConn, error) {
c.RLock()
conn, ok := c.conns[addr]
Expand All @@ -91,6 +92,7 @@ func (c *RaftClient) getConn(addr string, regionID uint64) (*raftConn, error) {
return newConn, nil
}

// Sends a msg using the connection object for the provided addr
func (c *RaftClient) Send(storeID uint64, addr string, msg *raft_serverpb.RaftMessage) error {
conn, err := c.getConn(addr, msg.GetRegionId())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions kv/storage/raft_storage/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newResolverRunner(schedulerClient scheduler_client.Client) *resolverRunner
}
}

// Gets store's address from the scheduler client
func (r *resolverRunner) getAddr(id uint64) (string, error) {
if sa, ok := r.storeAddrs[id]; ok {
if time.Since(sa.lastUpdate).Seconds() < storeAddressRefreshSeconds {
Expand Down
3 changes: 3 additions & 0 deletions kv/storage/raft_storage/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ func NewServerTransport(raftClient *RaftClient, snapScheduler chan<- worker.Task
}
}

// Get the store from the message and then send a message to that raft store
func (t *ServerTransport) Send(msg *raft_serverpb.RaftMessage) error {
storeID := msg.GetToPeer().GetStoreId()
t.SendStore(storeID, msg)
return nil
}

// Send a message to a raft store
func (t *ServerTransport) SendStore(storeID uint64, msg *raft_serverpb.RaftMessage) {
addr := t.raftClient.GetAddr(storeID)
if addr != "" {
Expand All @@ -47,6 +49,7 @@ func (t *ServerTransport) SendStore(storeID uint64, msg *raft_serverpb.RaftMessa
t.Resolve(storeID, msg)
}

// Pushes a task to the worker to resolve the address of a raft store
func (t *ServerTransport) Resolve(storeID uint64, msg *raft_serverpb.RaftMessage) {
callback := func(addr string, err error) {
// clear resolving
Expand Down

0 comments on commit 0b64f0f

Please sign in to comment.