-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloadbalancer.go
125 lines (101 loc) · 2.84 KB
/
loadbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"log"
"net/http"
"sync/atomic"
"time"
"github.com/jonboulle/clockwork"
)
// TODO: refactor to separate NodeManager file
type NodeManager interface {
GetNextNode() *Node
isRateLimitExceeded(node *Node, bodyLen int) bool
ResetLimits()
CheckHealth()
}
type SafeNodeManager struct {
nodes []*Node
next int32
clock clockwork.Clock
}
type LoadBalancer struct {
manager NodeManager
}
func NewSafeNodeManager(nodes []*Node, clock clockwork.Clock) *SafeNodeManager {
return &SafeNodeManager{
nodes: nodes,
next: 0,
clock: clock,
}
}
func (m *SafeNodeManager) GetNextNode() *Node {
idx := atomic.AddInt32(&m.next, 1) % int32(len(m.nodes))
node := m.nodes[idx]
if node.Healthy == 1 {
return node
}
log.Printf("Skipping unhealthy node %d (%s)", node.ID, node.URL)
return nil
}
func (m *SafeNodeManager) isRateLimitExceeded(node *Node, bodyLen int) bool {
if atomic.LoadUint32(&node.ReqCount) >= node.ReqLimit ||
atomic.LoadUint64(&node.BodyCount)+uint64(bodyLen) > node.BodyLimit {
log.Printf("Rate limit hit for node %d (%s) - RPM: %d/%d, BodyLimit: %d, RequestBody: %d\n",
node.ID, node.URL, node.ReqCount, node.ReqLimit, node.BodyLimit, bodyLen)
return true
}
atomic.AddUint32(&node.ReqCount, 1)
atomic.AddUint64(&node.BodyCount, uint64(bodyLen))
return false
}
func (m *SafeNodeManager) ResetLimits() {
for _, node := range m.nodes {
atomic.StoreUint32(&node.ReqCount, 0)
atomic.StoreUint64(&node.BodyCount, 0)
}
}
func (m *SafeNodeManager) CheckHealth() {
for _, node := range m.nodes {
go func(n *Node) {
resp, err := http.Get(n.URL + "/health")
if err != nil || resp.StatusCode != http.StatusOK {
atomic.StoreUint32(&n.Healthy, 0)
log.Printf("Node %d (%s) is unhealthy\n", n.ID, n.URL)
} else {
atomic.StoreUint32(&n.Healthy, 1)
log.Printf("Node %d (%s) is healthy\n", n.ID, n.URL)
}
if resp != nil {
resp.Body.Close()
}
}(node)
}
}
func NewLoadBalancer(manager NodeManager) *LoadBalancer {
return &LoadBalancer{
manager: manager,
}
}
func (lb *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
bodyLen := int(r.ContentLength)
nodeLen := len(lb.manager.(*SafeNodeManager).nodes)
var node *Node
for i := 0; i < nodeLen; i++ {
node = lb.manager.GetNextNode()
if node == nil {
continue
}
if lb.manager.isRateLimitExceeded(node, bodyLen) {
continue
}
log.Printf("Forwarding request to node %d (%s)\n - RPM: %d/%d, BPM: %d/%d\n",
node.ID, node.URL, node.ReqCount, node.ReqLimit, node.BodyCount, node.BodyLimit)
node.ReverseProxy.ServeHTTP(w, r)
return;
}
http.Error(w, "No available node", http.StatusServiceUnavailable)
}
func (m *SafeNodeManager) StartPeriodicTasks() {
m.clock.AfterFunc(1*time.Minute, m.ResetLimits)
m.clock.AfterFunc(30*time.Second, m.CheckHealth)
}