Skip to content

Commit

Permalink
Add transactions support
Browse files Browse the repository at this point in the history
  • Loading branch information
hbansal committed Jul 2, 2024
1 parent 0b64f0f commit 11962ec
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 6 deletions.
175 changes: 172 additions & 3 deletions kv/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package server

import (
"context"
"fmt"

"github.com/pingcap-incubator/tinykv/kv/coprocessor"
"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/kv/storage/raft_storage"
"github.com/pingcap-incubator/tinykv/kv/transaction/latches"
"github.com/pingcap-incubator/tinykv/kv/transaction/mvcc"
coppb "github.com/pingcap-incubator/tinykv/proto/pkg/coprocessor"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/tinykvpb"
Expand Down Expand Up @@ -50,17 +52,184 @@ func (server *Server) Snapshot(stream tinykvpb.TinyKv_SnapshotServer) error {
// Transactional API.
func (server *Server) KvGet(_ context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
// Your Code Here (4B).
return nil, nil
resp := &kvrpcpb.GetResponse{}
reader, err := server.storage.Reader(req.Context)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
defer reader.Close()
txn := mvcc.NewMvccTxn(reader, req.Version)
lock, err := txn.GetLock(req.Key)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
if lock != nil && req.Version >= lock.Ts {
resp.Error = &kvrpcpb.KeyError{
Locked: &kvrpcpb.LockInfo{
PrimaryLock: lock.Primary,
LockVersion: lock.Ts,
Key: req.Key,
LockTtl: lock.Ttl,
}}
return resp, nil
}
value, err := txn.GetValue(req.Key)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
resp.Value = value
if value == nil {
resp.NotFound = true
}
return resp, nil
}

func (server *Server) KvPrewrite(_ context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
// Your Code Here (4B).
return nil, nil
resp := &kvrpcpb.PrewriteResponse{}
if len(req.Mutations) == 0 {
return resp, nil
}
reader, err := server.storage.Reader(req.Context)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
defer reader.Close()
txn := mvcc.NewMvccTxn(reader, req.StartVersion)
var keyErrors []*kvrpcpb.KeyError
for _, m := range req.Mutations {
write, ts, err := txn.MostRecentWrite(m.Key)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
if write != nil && req.StartVersion <= ts {
keyErrors = append(keyErrors, &kvrpcpb.KeyError{
Conflict: &kvrpcpb.WriteConflict{
StartTs: req.StartVersion,
ConflictTs: ts,
Key: m.Key,
Primary: req.PrimaryLock,
}})
continue
}
lock, err := txn.GetLock(m.Key)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
if lock != nil && lock.Ts != req.StartVersion {
keyErrors = append(keyErrors, &kvrpcpb.KeyError{
Locked: &kvrpcpb.LockInfo{
PrimaryLock: lock.Primary,
LockVersion: lock.Ts,
Key: m.Key,
LockTtl: lock.Ttl,
}})
}
var kind mvcc.WriteKind
switch m.Op {
case kvrpcpb.Op_Put:
kind = mvcc.WriteKindPut
txn.PutValue(m.Key, m.Value)
case kvrpcpb.Op_Del:
kind = mvcc.WriteKindDelete
txn.DeleteValue(m.Key)
default:
return nil, fmt.Errorf("Invalid Op")
}
txn.PutLock(m.Key, &mvcc.Lock{
Primary: req.PrimaryLock,
Ts: req.StartVersion,
Ttl: req.LockTtl,
Kind: kind})
}
if len(keyErrors) > 0 {
resp.Errors = keyErrors
return resp, nil
}
err = server.storage.Write(req.Context, txn.Writes())
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
return resp, nil
}

func (server *Server) KvCommit(_ context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
// Your Code Here (4B).
return nil, nil
resp := &kvrpcpb.CommitResponse{}
if len(req.Keys) == 0 {
return resp, nil
}
reader, err := server.storage.Reader(req.Context)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
defer reader.Close()
txn := mvcc.NewMvccTxn(reader, req.StartVersion)
server.Latches.WaitForLatches(req.Keys)
defer server.Latches.ReleaseLatches(req.Keys)
for _, key := range req.Keys {
lock, err := txn.GetLock(key)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
if lock == nil {
return resp, nil
}
if lock.Ts != req.StartVersion {
resp.Error = &kvrpcpb.KeyError{Retryable: "true"}
return resp, nil
}
txn.PutWrite(key, req.CommitVersion, &mvcc.Write{
StartTS: req.StartVersion,
Kind: lock.Kind,
})
txn.DeleteLock(key)
}
err = server.storage.Write(req.Context, txn.Writes())
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
return resp, nil
}

func (server *Server) KvScan(_ context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
Expand Down
121 changes: 118 additions & 3 deletions kv/transaction/mvcc/transaction.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package mvcc

import (
"bytes"
"encoding/binary"

"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/kv/util/codec"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/pingcap-incubator/tinykv/scheduler/pkg/tsoutil"
)
Expand Down Expand Up @@ -41,54 +43,167 @@ func (txn *MvccTxn) Writes() []storage.Modify {
// PutWrite records a write at key and ts.
func (txn *MvccTxn) PutWrite(key []byte, ts uint64, write *Write) {
// Your Code Here (4A).
modify := storage.Modify{
Data: storage.Put{
Key: EncodeKey(key, ts),
Cf: engine_util.CfWrite,
Value: write.ToBytes(),
},
}
txn.writes = append(txn.writes, modify)
}

// GetLock returns a lock if key is locked. It will return (nil, nil) if there is no lock on key, and (nil, err)
// if an error occurs during lookup.
func (txn *MvccTxn) GetLock(key []byte) (*Lock, error) {
// Your Code Here (4A).
return nil, nil
value, err := txn.Reader.GetCF(engine_util.CfLock, key)
if err != nil {
return nil, err
}
if value == nil {
return nil, nil
}
lock, err := ParseLock(value)
if err != nil {
return nil, err
}
return lock, nil
}

// PutLock adds a key/lock to this transaction.
func (txn *MvccTxn) PutLock(key []byte, lock *Lock) {
// Your Code Here (4A).
modify := storage.Modify{
Data: storage.Put{
Key: key,
Cf: engine_util.CfLock,
Value: lock.ToBytes(),
},
}
txn.writes = append(txn.writes, modify)
}

// DeleteLock adds a delete lock to this transaction.
func (txn *MvccTxn) DeleteLock(key []byte) {
// Your Code Here (4A).
modify := storage.Modify{
Data: storage.Delete{
Key: key,
Cf: engine_util.CfLock,
},
}
txn.writes = append(txn.writes, modify)
}

// GetValue finds the value for key, valid at the start timestamp of this transaction.
// I.e., the most recent value committed before the start of this transaction.
func (txn *MvccTxn) GetValue(key []byte) ([]byte, error) {
// Your Code Here (4A).
return nil, nil
iter := txn.Reader.IterCF(engine_util.CfWrite)
defer iter.Close()
iter.Seek(EncodeKey(key, txn.StartTS))
if !iter.Valid() {
return nil, nil
}
item := iter.Item()
gotKey := item.KeyCopy(nil)
userKey := DecodeUserKey(gotKey)
if !bytes.Equal(key, userKey) {
return nil, nil
}
value, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
write, err := ParseWrite(value)
if err != nil {
return nil, err
}
if write.Kind == WriteKindDelete {
return nil, nil
}
return txn.Reader.GetCF(engine_util.CfDefault, EncodeKey(key, write.StartTS))
}

// PutValue adds a key/value write to this transaction.
func (txn *MvccTxn) PutValue(key []byte, value []byte) {
// Your Code Here (4A).
modify := storage.Modify{
Data: storage.Put{
Key: EncodeKey(key, txn.StartTS),
Cf: engine_util.CfDefault,
Value: value,
},
}
txn.writes = append(txn.writes, modify)
}

// DeleteValue removes a key/value pair in this transaction.
func (txn *MvccTxn) DeleteValue(key []byte) {
// Your Code Here (4A).
modify := storage.Modify{
Data: storage.Delete{
Key: EncodeKey(key, txn.StartTS),
Cf: engine_util.CfDefault,
},
}
txn.writes = append(txn.writes, modify)
}

// CurrentWrite searches for a write with this transaction's start timestamp. It returns a Write from the DB and that
// write's commit timestamp, or an error.
func (txn *MvccTxn) CurrentWrite(key []byte) (*Write, uint64, error) {
// Your Code Here (4A).
iter := txn.Reader.IterCF(engine_util.CfWrite)
defer iter.Close()
for iter.Seek(EncodeKey(key, ^uint64(0))); iter.Valid(); iter.Next() {
item := iter.Item()
gotKey := item.KeyCopy(nil)
userKey := DecodeUserKey(gotKey)
if !bytes.Equal(key, userKey) {
return nil, 0, nil
}
value, err := item.ValueCopy(nil)
if err != nil {
return nil, 0, err
}
write, err := ParseWrite(value)
if err != nil {
return nil, 0, err
}
if write.StartTS == txn.StartTS {
return write, decodeTimestamp(gotKey), nil
}
}
return nil, 0, nil
}

// MostRecentWrite finds the most recent write with the given key. It returns a Write from the DB and that
// write's commit timestamp, or an error.
func (txn *MvccTxn) MostRecentWrite(key []byte) (*Write, uint64, error) {
// Your Code Here (4A).
return nil, 0, nil
iter := txn.Reader.IterCF(engine_util.CfWrite)
defer iter.Close()
iter.Seek(EncodeKey(key, ^uint64(0)))
if !iter.Valid() {
return nil, 0, nil
}
item := iter.Item()
gotKey := item.KeyCopy(nil)
userKey := DecodeUserKey(gotKey)
if !bytes.Equal(key, userKey) {
return nil, 0, nil
}
value, err := item.ValueCopy(nil)
if err != nil {
return nil, 0, err
}
write, err := ParseWrite(value)
if err != nil {
return nil, 0, err
}
return write, decodeTimestamp(gotKey), nil
}

// EncodeKey encodes a user key and appends an encoded timestamp to a key. Keys and timestamps are encoded so that
Expand Down

0 comments on commit 11962ec

Please sign in to comment.