-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathcontroller.go
127 lines (105 loc) · 2.79 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package raft
import (
"context"
"fmt"
"io"
"sync"
"github.com/shaj13/raft/internal/membership"
"github.com/shaj13/raft/internal/raftengine"
"github.com/shaj13/raft/internal/raftpb"
"github.com/shaj13/raft/internal/storage"
"github.com/shaj13/raft/internal/transport"
etcdraftpb "go.etcd.io/etcd/raft/v3/raftpb"
)
type controller struct {
node *Node
engine raftengine.Engine
pool membership.Pool
storage storage.Storage
}
func (c *controller) Join(ctx context.Context, gid uint64, m *raftpb.Member) (*raftpb.JoinResponse, error) {
var err error
if _, ok := c.node.GetMemebr(m.ID); !ok {
err = c.node.AddMember(ctx, m)
} else {
err = c.node.UpdateMember(ctx, m)
}
if err != nil {
return nil, err
}
resp := &raftpb.JoinResponse{
ID: m.ID,
Members: c.pool.Snapshot(),
}
return resp, nil
}
func (c *controller) Push(ctx context.Context, gid uint64, m etcdraftpb.Message) error {
return c.engine.Push(m)
}
func (c *controller) PromoteMember(ctx context.Context, gid uint64, m raftpb.Member) error {
return c.node.promoteMember(ctx, m.ID, true)
}
func (c *controller) SnapshotWriter(gid, term, index uint64) (io.WriteCloser, error) {
return c.storage.Snapshotter().Writer(term, index)
}
func (c *controller) SnapshotReader(gid, term uint64, index uint64) (io.ReadCloser, error) {
return c.storage.Snapshotter().Reader(term, index)
}
type router struct {
mu sync.Mutex
ctrls map[uint64]transport.Controller
}
func (r *router) add(gid uint64, ctrl transport.Controller) {
r.mu.Lock()
defer r.mu.Unlock()
r.ctrls[gid] = ctrl
}
func (r *router) remove(gid uint64) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.ctrls, gid)
}
func (r *router) get(gid uint64) (transport.Controller, error) {
r.mu.Lock()
defer r.mu.Unlock()
ctrl, ok := r.ctrls[gid]
if !ok {
return nil, fmt.Errorf("raft: unknown group id %x", gid)
}
return ctrl, nil
}
func (r *router) Join(ctx context.Context, gid uint64, m *raftpb.Member) (*raftpb.JoinResponse, error) {
ctrl, err := r.get(gid)
if err != nil {
return nil, err
}
return ctrl.Join(ctx, gid, m)
}
func (r *router) Push(ctx context.Context, gid uint64, m etcdraftpb.Message) error {
ctrl, err := r.get(gid)
if err != nil {
return err
}
return ctrl.Push(ctx, gid, m)
}
func (r *router) PromoteMember(ctx context.Context, gid uint64, m raftpb.Member) error {
ctrl, err := r.get(gid)
if err != nil {
return err
}
return ctrl.PromoteMember(ctx, gid, m)
}
func (r *router) SnapshotWriter(gid, term, index uint64) (io.WriteCloser, error) {
ctrl, err := r.get(gid)
if err != nil {
return nil, err
}
return ctrl.SnapshotWriter(gid, term, index)
}
func (r *router) SnapshotReader(gid, term, index uint64) (io.ReadCloser, error) {
ctrl, err := r.get(gid)
if err != nil {
return nil, err
}
return ctrl.SnapshotReader(gid, term, index)
}