Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add raft example #153

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
247f961
Add a Linux implementation of file locking
mhagger Jul 19, 2023
e2141f7
Fix/suppress some linter warnings
mhagger Jul 18, 2023
7b97b3a
Define `SnapshotStorage` interface
mhagger Mar 8, 2023
84b03dc
Define `KVStore` interface
mhagger Mar 8, 2023
cdd60c2
raftNode: initialize `snapshotStorage` in `newRaftNode()`
mhagger Mar 8, 2023
5034ccf
newRaftNode(): inline part of the goroutine's work
mhagger Mar 8, 2023
c1dd4dc
startRaftNode(): replacement for `newRaftNode()`
mhagger Mar 8, 2023
2a547ec
raftNode.snapdir: remove member
mhagger Mar 8, 2023
0fdf48b
raftNode.id: convert type to `uint64`
mhagger Mar 8, 2023
17981fa
startRaftNode(): take the `SnapshotStorage` as an argument
mhagger Mar 8, 2023
34654f2
kvstore.loadAndApplySnapshot(), applyCommits(): extract methods
mhagger Mar 8, 2023
5be8540
kvstore: separate initialization from startup
mhagger Mar 8, 2023
302d8a5
kvstore.loadSnapshot(): inline method
mhagger Mar 8, 2023
78c9106
FSM: new interface, representing a finite state machine
mhagger Mar 8, 2023
9163a7d
Move more functionality from `kvstore` to `kvfsm`
mhagger Mar 8, 2023
2ea3a54
TestProposeOnCommit(): add some clarifying comments
mhagger Mar 8, 2023
a328306
raftexample_test: introduce `peer` type
mhagger Mar 8, 2023
7b9b2cc
raftexample_test: give each `peer` its own `FSM`
mhagger Mar 9, 2023
1de94b9
kvfsm.applyCommits(): return an error
mhagger Mar 9, 2023
b44de5b
FSM.ProcessCommits(): return an error rather than calling `log.Fatal()`
mhagger Mar 9, 2023
58d3994
FSM.ApplyCommits(): new method
mhagger Mar 9, 2023
4feda00
newKVStore(): don't call `LoadAndApplySnapshot()`
mhagger Mar 9, 2023
ba8d8ca
Make `ProcessCommits()` a method of `raftNode`
mhagger Mar 9, 2023
bb3c651
newRaftNode(): call `LoadAndApplySnapshot()` here
mhagger Mar 9, 2023
73ae94b
LoadAndApplySnapshot(): move method to `raftNode` and make it private
mhagger Mar 9, 2023
c0cac21
raftNode: add a new and better way to tell when the node is done
mhagger Mar 9, 2023
b19e64f
serveHTTPKVAPI(): monitor the raft node using its "done" channel
mhagger Mar 9, 2023
b4c1a55
cluster.Close(): read any error from the node directly
mhagger Mar 9, 2023
d420ab3
TestProposeOnCommit(): read any error from the node directly
mhagger Mar 9, 2023
d181e4a
cluster.Cleanup(): new method, extracted from `Close()`
mhagger Mar 11, 2023
0ae9c56
TestProposeOnCommit(): change test to use `ProcessCommits()`
mhagger Mar 11, 2023
848ee9d
newRaftNode(): don't return `commitC` and `errorC`
mhagger Mar 11, 2023
cd6fbb1
add raft example
Elbehery Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions raftexample/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module go.etcd.io/raft/raftexample

go 1.21.1

require (
github.com/golang/protobuf v1.5.3
go.etcd.io/raft/v3 v3.6.0-alpha.0
go.uber.org/zap v1.26.0
)

require (
github.com/gogo/protobuf v1.3.2 // indirect
go.uber.org/multierr v1.10.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
)
57 changes: 57 additions & 0 deletions raftexample/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/raft/v3 v3.6.0-alpha.0 h1:cMmjAEjCKMGiQPowjSWM43Y5ZnBEeNP8RSYcm3ewtns=
go.etcd.io/raft/v3 v3.6.0-alpha.0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
136 changes: 136 additions & 0 deletions raftexample/httpapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"io"
"log"
"net/http"
"strconv"

"go.etcd.io/raft/v3/raftpb"
)

// KVStore is the interface to the key-value store that is required by
// `httpKVAPI`.
type KVStore interface {
// Propose proposes to set key `k` to value `v` in the key-value
// store. It returns immediately, before the change has
// necessarily taken effect.
Propose(k, v string)

// Lookup looks up the value for key `k` in the current state of
// the store.
Lookup(k string) (string, bool)
}

// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
store KVStore
confChangeC chan<- raftpb.ConfChange
}

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch r.Method {
case http.MethodPut:
v, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}

h.store.Propose(key, string(v))

// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case http.MethodGet:
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case http.MethodPost:
url, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}

nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}

cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeID,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
nodeID, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}

cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeID,
}
h.confChangeC <- cc

// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Set("Allow", http.MethodPut)
w.Header().Add("Allow", http.MethodGet)
w.Header().Add("Allow", http.MethodPost)
w.Header().Add("Allow", http.MethodDelete)
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}

// serveHTTPKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHTTPKVAPI(
kv KVStore, port int, confChangeC chan<- raftpb.ConfChange, done <-chan struct{},
) {
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()

// exit when raft goes down
<-done
_ = srv.Close()
}
117 changes: 117 additions & 0 deletions raftexample/kvstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"log"
"strings"
"sync"
)

// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
}

type kv struct {
Key string
Val string
}

// newKVStore creates and returns a new `kvstore`. The second return
// value can be used as the finite state machine that is driven by a
// `raftNode`.
func newKVStore(proposeC chan<- string) (*kvstore, kvfsm) {
s := &kvstore{
proposeC: proposeC,
kvStore: make(map[string]string),
}
fsm := kvfsm{
kvs: s,
}
return s, fsm
}

func (s *kvstore) Lookup(key string) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.kvStore[key]
return v, ok
}

func (s *kvstore) Propose(k string, v string) {
var buf strings.Builder
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}

// Set sets a single value. It should only be called by `kvfsm`.
func (s *kvstore) set(k, v string) {
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore[k] = v
}

func (s *kvstore) restoreFromSnapshot(store map[string]string) {
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
}

// kvfsm implements the `FSM` interface for the underlying `*kvstore`.
type kvfsm struct {
kvs *kvstore
}

// RestoreSnapshot restores the current state of the KV store to the
// value encoded in `snapshot`.
func (fsm kvfsm) RestoreSnapshot(snapshot []byte) error {
var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {
return err
}
fsm.kvs.restoreFromSnapshot(store)
return nil
}

func (fsm kvfsm) TakeSnapshot() ([]byte, error) {
fsm.kvs.mu.RLock()
defer fsm.kvs.mu.RUnlock()
return json.Marshal(fsm.kvs.kvStore)
}

// ApplyCommits decodes and applies each of the commits in `commit` to
// the current state, then signals that it is done by closing
// `commit.applyDoneC`.
func (fsm kvfsm) ApplyCommits(commit *commit) error {
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
return fmt.Errorf("could not decode message: %w", err)
}
fsm.kvs.set(dataKv.Key, dataKv.Val)
}
close(commit.applyDoneC)
return nil
}
50 changes: 50 additions & 0 deletions raftexample/kvstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"reflect"
"testing"
)

func Test_kvstore_snapshot(t *testing.T) {
tm := map[string]string{"foo": "bar"}
s := &kvstore{kvStore: tm}
fsm := kvfsm{
kvs: s,
}

v, _ := s.Lookup("foo")
if v != "bar" {
t.Fatalf("foo has unexpected value, got %s", v)
}

data, err := fsm.TakeSnapshot()
if err != nil {
t.Fatal(err)
}
s.kvStore = nil

if err := fsm.RestoreSnapshot(data); err != nil {
t.Fatal(err)
}
v, _ = s.Lookup("foo")
if v != "bar" {
t.Fatalf("foo has unexpected value, got %s", v)
}
if !reflect.DeepEqual(s.kvStore, tm) {
t.Fatalf("store expected %+v, got %+v", tm, s.kvStore)
}
}
Loading