Skip to content

Commit

Permalink
fix: api rate limiter and circuit breaker bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Jan 9, 2019
1 parent c1fa330 commit 1fd641e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 26 deletions.
16 changes: 16 additions & 0 deletions cmd/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"flag"
"fmt"
"net/http"
"time"

"github.com/fagongzi/util/format"
"github.com/labstack/echo"
md "github.com/labstack/echo/middleware"
)
Expand All @@ -19,6 +21,20 @@ func main() {
server := echo.New()
server.Use(md.Logger())

server.GET("/fail", func(c echo.Context) error {
sleep := c.QueryParam("sleep")
if sleep != "" {
time.Sleep(time.Second * time.Duration(format.MustParseStrInt(sleep)))
}

code := c.QueryParam("code")
if code != "" {
return c.String(format.MustParseStrInt(code), "OK")
}

return c.String(http.StatusOK, "OK")
})

server.GET("/check", func(c echo.Context) error {
return c.String(http.StatusOK, "OK")
})
Expand Down
38 changes: 24 additions & 14 deletions pkg/proxy/dispatcher_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,14 @@ func (r *dispatcher) addAPI(api *metapb.API) error {
return errAPIExists
}

r.apis[api.ID] = newAPIRuntime(api, r.tw)
a := newAPIRuntime(api, r.tw, r.refreshQPS(api.MaxQPS))
r.apis[api.ID] = a
r.sortAPIs()

if a.cb != nil {
r.addAnalysis(api.ID, a.cb)
}

log.Infof("api <%d> added, data <%s>",
api.ID,
api.String())
Expand All @@ -341,8 +346,14 @@ func (r *dispatcher) updateAPI(api *metapb.API) error {
return errAPINotFound
}

rt.activeQPS = r.refreshQPS(api.MaxQPS)
rt.updateMeta(api)
r.sortAPIs()

if rt.cb != nil {
r.addAnalysis(rt.meta.ID, rt.meta.CircuitBreaker)
}

log.Infof("api <%d> updated, data <%s>",
api.ID,
api.String())
Expand Down Expand Up @@ -403,20 +414,22 @@ func (r *dispatcher) sortAPIs() {

func (r *dispatcher) refreshAllQPS() {
for _, svr := range r.servers {
qps := r.refreshQPS(svr.meta)
svr.activeQPS = r.refreshQPS(svr.meta.MaxQPS)
svr.updateMeta(svr.meta)
svr.meta.MaxQPS = qps
r.addAnalysis(svr.meta.ID, svr.meta.CircuitBreaker)
r.addToCheck(svr)
}

for _, api := range r.apis {
api.activeQPS = r.refreshQPS(api.meta.MaxQPS)
api.updateMeta(api.meta)
}
}

func (r *dispatcher) refreshQPS(svr *metapb.Server) (originQPS int64) {
originQPS = svr.MaxQPS
func (r *dispatcher) refreshQPS(value int64) int64 {
if len(r.proxies) > 0 {
svr.MaxQPS = svr.MaxQPS / int64(len(r.proxies))
return value / int64(len(r.proxies))
}
return
return value
}

func (r *dispatcher) addServer(svr *metapb.Server) error {
Expand All @@ -427,10 +440,7 @@ func (r *dispatcher) addServer(svr *metapb.Server) error {
return errServerExists
}

qps := r.refreshQPS(svr)

rt := newServerRuntime(svr, r.tw)
svr.MaxQPS = qps
rt := newServerRuntime(svr, r.tw, r.refreshQPS(svr.MaxQPS))
r.servers[svr.ID] = rt

r.addAnalysis(rt.meta.ID, rt.meta.CircuitBreaker)
Expand All @@ -452,9 +462,9 @@ func (r *dispatcher) updateServer(meta *metapb.Server) error {
return errServerNotFound
}

qps := r.refreshQPS(meta)
rt.activeQPS = r.refreshQPS(meta.MaxQPS)
rt.updateMeta(meta)
meta.MaxQPS = qps

r.addAnalysis(rt.meta.ID, rt.meta.CircuitBreaker)
r.addToCheck(rt)

Expand Down
35 changes: 23 additions & 12 deletions pkg/proxy/dispatcher_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ func (c *clusterRuntime) selectServer(req *fasthttp.Request) uint64 {
type abstractSupportProtectedRuntime struct {
sync.RWMutex

id uint64
tw *goetty.TimeoutWheel
limiter *rate.Limiter
circuit metapb.CircuitStatus
cb *metapb.CircuitBreaker
barrier *util.RateBarrier
id uint64
tw *goetty.TimeoutWheel
activeQPS int64
limiter *rate.Limiter
circuit metapb.CircuitStatus
cb *metapb.CircuitBreaker
barrier *util.RateBarrier
}

func (s *abstractSupportProtectedRuntime) getCircuitStatus() metapb.CircuitStatus {
Expand Down Expand Up @@ -147,28 +148,32 @@ type serverRuntime struct {
useCheckDuration time.Duration
}

func newServerRuntime(meta *metapb.Server, tw *goetty.TimeoutWheel) *serverRuntime {
func newServerRuntime(meta *metapb.Server, tw *goetty.TimeoutWheel, activeQPS int64) *serverRuntime {
rt := &serverRuntime{}
rt.tw = tw
rt.activeQPS = activeQPS
rt.updateMeta(meta)
return rt
}

func (s *serverRuntime) clone() *serverRuntime {
meta := &metapb.Server{}
pbutil.MustUnmarshal(meta, pbutil.MustMarshal(s.meta))
return newServerRuntime(meta, s.tw)
return newServerRuntime(meta, s.tw, s.activeQPS)
}

func (s *serverRuntime) updateMeta(meta *metapb.Server) {
s.heathTimeout.Stop()
activeQPS := s.activeQPS
tw := s.tw

*s = serverRuntime{}
s.tw = tw
s.activeQPS = activeQPS
s.meta = meta
s.id = meta.ID
s.cb = meta.CircuitBreaker
s.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(meta.MaxQPS)), int(meta.MaxQPS))
s.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(s.activeQPS)), int(s.activeQPS))
s.status = metapb.Down
s.circuit = metapb.Open
if s.cb != nil {
Expand Down Expand Up @@ -322,10 +327,11 @@ type apiRuntime struct {
parsedRenderObjects []*renderObject
}

func newAPIRuntime(meta *metapb.API, tw *goetty.TimeoutWheel) *apiRuntime {
func newAPIRuntime(meta *metapb.API, tw *goetty.TimeoutWheel, activeQPS int64) *apiRuntime {
ar := &apiRuntime{
meta: meta,
}
ar.activeQPS = activeQPS
ar.tw = tw
ar.init()

Expand All @@ -335,12 +341,17 @@ func newAPIRuntime(meta *metapb.API, tw *goetty.TimeoutWheel) *apiRuntime {
func (a *apiRuntime) clone() *apiRuntime {
meta := &metapb.API{}
pbutil.MustUnmarshal(meta, pbutil.MustMarshal(a.meta))
return newAPIRuntime(meta, a.tw)
return newAPIRuntime(meta, a.tw, a.activeQPS)
}

func (a *apiRuntime) updateMeta(meta *metapb.API) {
tw := a.tw
activeQPS := a.activeQPS

*a = apiRuntime{}
a.meta = meta
a.tw = tw
a.activeQPS = activeQPS
a.init()
}

Expand Down Expand Up @@ -413,7 +424,7 @@ func (a *apiRuntime) init() {
a.barrier = util.NewRateBarrier(int(a.cb.HalfTrafficRate))
}
if a.meta.MaxQPS > 0 {
a.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(a.meta.MaxQPS)), int(a.meta.MaxQPS))
a.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(a.activeQPS)), int(a.activeQPS))
}

return
Expand Down

0 comments on commit 1fd641e

Please sign in to comment.