Skip to content

Commit

Permalink
Merge pull request #141 from kaleido-io/rps-config
Browse files Browse the repository at this point in the history
Adding support for leaky bucket throttling (rps + burst) in ffresty client
  • Loading branch information
peterbroadhurst authored Jun 7, 2024
2 parents f0084b0 + 802321f commit c214085
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 9 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
gitlab.com/hfuss/mux-prometheus v0.0.5
golang.org/x/crypto v0.18.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gotest.tools v2.2.0+incompatible
)
Expand Down
14 changes: 13 additions & 1 deletion pkg/ffresty/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -67,6 +67,14 @@ const (
HTTPIdleTimeout = "idleTimeout"
// HTTPMaxIdleConns the max number of idle connections to hold pooled
HTTPMaxIdleConns = "maxIdleConns"
// HTTPThrottleRequestsPerSecond The average rate at which requests are allowed to pass through over time. Default to RPS
// requests over the limit will be blocked using a buffered channel
// the blocked time period is not counted in request timeout
HTTPThrottleRequestsPerSecond = "throttle.requestsPerSecond"

// HTTPThrottleBurst The maximum number of requests that can be made in a short period of time before the RPS throttling kicks in.
HTTPThrottleBurst = "throttle.burst"

// HTTPMaxConnsPerHost the max number of concurrent connections
HTTPMaxConnsPerHost = "maxConnsPerHost"
// HTTPConnectionTimeout the connection timeout for new connections
Expand Down Expand Up @@ -94,6 +102,8 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime)
conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex)
conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout)
conf.AddKnownKey(HTTPThrottleRequestsPerSecond)
conf.AddKnownKey(HTTPThrottleBurst)
conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout)
conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns)
conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost)
Expand All @@ -120,6 +130,8 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) {
RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)),
RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)),
RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex),
ThrottleRequestsPerSecond: conf.GetInt(HTTPThrottleRequestsPerSecond),
ThrottleBurst: conf.GetInt(HTTPThrottleBurst),
HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)),
HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)),
HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns),
Expand Down
24 changes: 24 additions & 0 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/metric"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

type retryCtxKey struct{}
Expand All @@ -53,6 +54,7 @@ type Config struct {
}

var (
rateLimiter *rate.Limiter
metricsManager metric.MetricsManager
onErrorHooks []resty.ErrorHook
onSuccessHooks []resty.SuccessHook
Expand All @@ -69,6 +71,8 @@ type HTTPConfig struct {
HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"`
AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"`
AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"`
ThrottleRequestsPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"`
ThrottleBurst int `ffstruct:"RESTConfig" json:"burst,omitempty"`
Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"`
RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"`
RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"`
Expand Down Expand Up @@ -172,6 +176,17 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client
return NewWithConfig(ctx, *ffrestyConfig), nil
}

func getRateLimiter(rps, burst int) *rate.Limiter {
if rps != 0 { // if rps is not set no need for a rate limiter
rpsLimiter := rate.Limit(rps)
if burst == 0 {
burst = rps
}
return rate.NewLimiter(rpsLimiter, burst)
}
return nil
}

// New creates a new Resty client, using static configuration (from the config file)
// from a given section in the static configuration
//
Expand Down Expand Up @@ -210,6 +225,8 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client = resty.NewWithClient(httpClient)
}

rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
client.SetBaseURL(url)
Expand All @@ -223,6 +240,13 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout))

client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error {
if rateLimiter != nil {
// Wait for permission to proceed with the request
err := rateLimiter.Wait(req.Context())
if err != nil {
return err
}
}
rCtx := req.Context()
rc := rCtx.Value(retryCtxKey{})
if rc == nil {
Expand Down
157 changes: 149 additions & 8 deletions pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -42,6 +42,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftls"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/metric"
"golang.org/x/time/rate"

"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -90,6 +91,148 @@ func TestRequestOK(t *testing.T) {
assert.Equal(t, 1, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiter(t *testing.T) {
rps := 5
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPThrottleRequestsPerSecond, rps)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
requestChan := make(chan bool, expectedNumberOfRequest)
startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
go func() {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
requestChan <- true
}()
}
count := 0
for {
<-requestChan
count++
if count == expectedNumberOfRequest {
break

}
}

duration := time.Since(startTime)
assert.GreaterOrEqual(t, duration, 3*time.Second)
assert.LessOrEqual(t, duration, 4*time.Second)
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiterHighBurst(t *testing.T) {
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPThrottleRequestsPerSecond, 0)
utConf.Set(HTTPThrottleBurst, expectedNumberOfRequest)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
requestChan := make(chan bool, expectedNumberOfRequest)
startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
go func() {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
requestChan <- true
}()
}
count := 0
for {
<-requestChan
count++
if count == expectedNumberOfRequest {
break

}
}

duration := time.Since(startTime)
assert.Less(t, duration, 1*time.Second)
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRateLimiterFailure(t *testing.T) {
customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPConfigRetryEnabled, true)

utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
resp, err := c.R().Get("/test")
assert.Error(t, err)
assert.Regexp(t, "exceeds", err)
assert.Nil(t, resp)
rateLimiter = nil // reset limiter
}

func TestRequestRetry(t *testing.T) {

ctx := context.Background()
Expand Down Expand Up @@ -529,17 +672,15 @@ func TestEnableClientMetrics(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

}



func TestEnableClientMetricsIdempotent(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")
_ = EnableClientMetrics(ctx, mr)
_ = EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)
}
Expand All @@ -549,17 +690,17 @@ func TestHooks(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

onErrorCount := 0
onSuccessCount := 0

customOnError := func(req *resty.Request, err error){
customOnError := func(req *resty.Request, err error) {
onErrorCount++
}

customOnSuccess := func(c *resty.Client, resp *resty.Response){
customOnSuccess := func(c *resty.Client, resp *resty.Response) {
onSuccessCount++
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/i18n/en_base_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var (
ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType)
ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType)
ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType)
ConfigGlobalThrottleRPS = ffc("config.global.throttle.requestsPerSecond", "The average rate at which requests are allowed to pass through over time.", IntType)
ConfigGlobalThrottleBurst = ffc("config.global.throttle.burst", "The maximum number of requests that can be made in a short period of time before the throttling kicks in.", IntType)
ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType)
ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType)
ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType)
Expand Down
2 changes: 2 additions & 0 deletions pkg/i18n/en_base_field_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ var (
RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call")
RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection")
RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool")
RESTConfigThrottleRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Requests per second")
RESTConfigThrottleBurst = ffm("RESTConfig.burst", "Burst")
RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host")
RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool")
RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool")
Expand Down

0 comments on commit c214085

Please sign in to comment.