diff --git a/cmd/backend/backend.go b/cmd/backend/backend.go index 0ffacf93..49362b0d 100644 --- a/cmd/backend/backend.go +++ b/cmd/backend/backend.go @@ -4,7 +4,9 @@ import ( "flag" "fmt" "net/http" + "time" + "github.com/fagongzi/util/format" "github.com/labstack/echo" md "github.com/labstack/echo/middleware" ) @@ -19,6 +21,20 @@ func main() { server := echo.New() server.Use(md.Logger()) + server.GET("/fail", func(c echo.Context) error { + sleep := c.QueryParam("sleep") + if sleep != "" { + time.Sleep(time.Second * time.Duration(format.MustParseStrInt(sleep))) + } + + code := c.QueryParam("code") + if code != "" { + return c.String(format.MustParseStrInt(code), "OK") + } + + return c.String(http.StatusOK, "OK") + }) + server.GET("/check", func(c echo.Context) error { return c.String(http.StatusOK, "OK") }) diff --git a/pkg/proxy/dispatcher_meta.go b/pkg/proxy/dispatcher_meta.go index 388cf0e5..b254b43f 100644 --- a/pkg/proxy/dispatcher_meta.go +++ b/pkg/proxy/dispatcher_meta.go @@ -322,9 +322,14 @@ func (r *dispatcher) addAPI(api *metapb.API) error { return errAPIExists } - r.apis[api.ID] = newAPIRuntime(api, r.tw) + a := newAPIRuntime(api, r.tw, r.refreshQPS(api.MaxQPS)) + r.apis[api.ID] = a r.sortAPIs() + if a.cb != nil { + r.addAnalysis(api.ID, a.cb) + } + log.Infof("api <%d> added, data <%s>", api.ID, api.String()) @@ -341,8 +346,14 @@ func (r *dispatcher) updateAPI(api *metapb.API) error { return errAPINotFound } + rt.activeQPS = r.refreshQPS(api.MaxQPS) rt.updateMeta(api) r.sortAPIs() + + if rt.cb != nil { + r.addAnalysis(rt.meta.ID, rt.meta.CircuitBreaker) + } + log.Infof("api <%d> updated, data <%s>", api.ID, api.String()) @@ -403,20 +414,22 @@ func (r *dispatcher) sortAPIs() { func (r *dispatcher) refreshAllQPS() { for _, svr := range r.servers { - qps := r.refreshQPS(svr.meta) + svr.activeQPS = r.refreshQPS(svr.meta.MaxQPS) svr.updateMeta(svr.meta) - svr.meta.MaxQPS = qps - r.addAnalysis(svr.meta.ID, svr.meta.CircuitBreaker) r.addToCheck(svr) } + + for _, api := range r.apis { + api.activeQPS = r.refreshQPS(api.meta.MaxQPS) + api.updateMeta(api.meta) + } } -func (r *dispatcher) refreshQPS(svr *metapb.Server) (originQPS int64) { - originQPS = svr.MaxQPS +func (r *dispatcher) refreshQPS(value int64) int64 { if len(r.proxies) > 0 { - svr.MaxQPS = svr.MaxQPS / int64(len(r.proxies)) + return value / int64(len(r.proxies)) } - return + return value } func (r *dispatcher) addServer(svr *metapb.Server) error { @@ -427,10 +440,7 @@ func (r *dispatcher) addServer(svr *metapb.Server) error { return errServerExists } - qps := r.refreshQPS(svr) - - rt := newServerRuntime(svr, r.tw) - svr.MaxQPS = qps + rt := newServerRuntime(svr, r.tw, r.refreshQPS(svr.MaxQPS)) r.servers[svr.ID] = rt r.addAnalysis(rt.meta.ID, rt.meta.CircuitBreaker) @@ -452,9 +462,9 @@ func (r *dispatcher) updateServer(meta *metapb.Server) error { return errServerNotFound } - qps := r.refreshQPS(meta) + rt.activeQPS = r.refreshQPS(meta.MaxQPS) rt.updateMeta(meta) - meta.MaxQPS = qps + r.addAnalysis(rt.meta.ID, rt.meta.CircuitBreaker) r.addToCheck(rt) diff --git a/pkg/proxy/dispatcher_runtime.go b/pkg/proxy/dispatcher_runtime.go index b228df77..5f03e620 100644 --- a/pkg/proxy/dispatcher_runtime.go +++ b/pkg/proxy/dispatcher_runtime.go @@ -85,12 +85,13 @@ func (c *clusterRuntime) selectServer(req *fasthttp.Request) uint64 { type abstractSupportProtectedRuntime struct { sync.RWMutex - id uint64 - tw *goetty.TimeoutWheel - limiter *rate.Limiter - circuit metapb.CircuitStatus - cb *metapb.CircuitBreaker - barrier *util.RateBarrier + id uint64 + tw *goetty.TimeoutWheel + activeQPS int64 + limiter *rate.Limiter + circuit metapb.CircuitStatus + cb *metapb.CircuitBreaker + barrier *util.RateBarrier } func (s *abstractSupportProtectedRuntime) getCircuitStatus() metapb.CircuitStatus { @@ -147,9 +148,10 @@ type serverRuntime struct { useCheckDuration time.Duration } -func newServerRuntime(meta *metapb.Server, tw *goetty.TimeoutWheel) *serverRuntime { +func newServerRuntime(meta *metapb.Server, tw *goetty.TimeoutWheel, activeQPS int64) *serverRuntime { rt := &serverRuntime{} rt.tw = tw + rt.activeQPS = activeQPS rt.updateMeta(meta) return rt } @@ -157,18 +159,21 @@ func newServerRuntime(meta *metapb.Server, tw *goetty.TimeoutWheel) *serverRunti func (s *serverRuntime) clone() *serverRuntime { meta := &metapb.Server{} pbutil.MustUnmarshal(meta, pbutil.MustMarshal(s.meta)) - return newServerRuntime(meta, s.tw) + return newServerRuntime(meta, s.tw, s.activeQPS) } func (s *serverRuntime) updateMeta(meta *metapb.Server) { s.heathTimeout.Stop() + activeQPS := s.activeQPS tw := s.tw + *s = serverRuntime{} s.tw = tw + s.activeQPS = activeQPS s.meta = meta s.id = meta.ID s.cb = meta.CircuitBreaker - s.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(meta.MaxQPS)), int(meta.MaxQPS)) + s.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(s.activeQPS)), int(s.activeQPS)) s.status = metapb.Down s.circuit = metapb.Open if s.cb != nil { @@ -322,10 +327,11 @@ type apiRuntime struct { parsedRenderObjects []*renderObject } -func newAPIRuntime(meta *metapb.API, tw *goetty.TimeoutWheel) *apiRuntime { +func newAPIRuntime(meta *metapb.API, tw *goetty.TimeoutWheel, activeQPS int64) *apiRuntime { ar := &apiRuntime{ meta: meta, } + ar.activeQPS = activeQPS ar.tw = tw ar.init() @@ -335,12 +341,17 @@ func newAPIRuntime(meta *metapb.API, tw *goetty.TimeoutWheel) *apiRuntime { func (a *apiRuntime) clone() *apiRuntime { meta := &metapb.API{} pbutil.MustUnmarshal(meta, pbutil.MustMarshal(a.meta)) - return newAPIRuntime(meta, a.tw) + return newAPIRuntime(meta, a.tw, a.activeQPS) } func (a *apiRuntime) updateMeta(meta *metapb.API) { + tw := a.tw + activeQPS := a.activeQPS + *a = apiRuntime{} a.meta = meta + a.tw = tw + a.activeQPS = activeQPS a.init() } @@ -413,7 +424,7 @@ func (a *apiRuntime) init() { a.barrier = util.NewRateBarrier(int(a.cb.HalfTrafficRate)) } if a.meta.MaxQPS > 0 { - a.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(a.meta.MaxQPS)), int(a.meta.MaxQPS)) + a.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(a.activeQPS)), int(a.activeQPS)) } return