Skip to content

Commit

Permalink
Embed memorycache, drop leakybucket import
Browse files Browse the repository at this point in the history
  • Loading branch information
Tit Petric committed Jan 22, 2025
1 parent 68a2deb commit a6fa99d
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 10 deletions.
11 changes: 4 additions & 7 deletions gateway/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/sirupsen/logrus"

"github.com/TykTechnologies/drl"
"github.com/TykTechnologies/leakybucket"
"github.com/TykTechnologies/leakybucket/memorycache"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/internal/httputil"
"github.com/TykTechnologies/tyk/internal/memorycache"
"github.com/TykTechnologies/tyk/internal/model"
"github.com/TykTechnologies/tyk/internal/rate"
"github.com/TykTechnologies/tyk/internal/rate/limiter"
"github.com/TykTechnologies/tyk/internal/redis"
Expand Down Expand Up @@ -53,14 +53,11 @@ type SessionLimiter struct {
ctx context.Context
drlManager *drl.DRL
config *config.Config
bucketStore leakybucket.Storage
bucketStore model.BucketStorage
limiterStorage redis.UniversalClient
smoothing *rate.Smoothing
}

// Encourage reuse in NewSessionLimiter.
var sessionLimiterBucketStore = memorycache.New()

// NewSessionLimiter initializes the session limiter.
//
// The session limiter initializes the storage required for rate limiters.
Expand All @@ -73,7 +70,7 @@ func NewSessionLimiter(ctx context.Context, conf *config.Config, drlManager *drl
ctx: ctx,
drlManager: drlManager,
config: conf,
bucketStore: sessionLimiterBucketStore,
bucketStore: memorycache.New(ctx),
}

log.Infof("[RATELIMIT] %s", conf.RateLimit.String())
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/TykTechnologies/goverify v0.0.0-20220808203004-1486f89e7708
github.com/TykTechnologies/graphql-go-tools v1.6.2-0.20241212110213-7724a3b64bb2
github.com/TykTechnologies/graphql-translator v0.0.0-20240319092712-4ba87e4c06ff
github.com/TykTechnologies/leakybucket v0.0.0-20170301023702-71692c943e3c
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/openid2go v0.1.2
github.com/TykTechnologies/storage v1.2.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ github.com/TykTechnologies/graphql-translator v0.0.0-20240319092712-4ba87e4c06ff
github.com/TykTechnologies/graphql-translator v0.0.0-20240319092712-4ba87e4c06ff/go.mod h1:K3KhGG9CmvXv1lJhKZpnLb1tC8N1oIzXTunQsc6N9og=
github.com/TykTechnologies/kin-openapi v0.90.0 h1:kHw0mtANwIpmlU6eCeeCgRMa52EiPPhEZPuHc3lKawo=
github.com/TykTechnologies/kin-openapi v0.90.0/go.mod h1:pkzuiceujHvAuDu3bTD/AD5OacuP4eMfrz9QhJlMvdQ=
github.com/TykTechnologies/leakybucket v0.0.0-20170301023702-71692c943e3c h1:j6fd0Fz1R4oSWOmcooGjrdahqrML+btQ+PfEJw8SzbA=
github.com/TykTechnologies/leakybucket v0.0.0-20170301023702-71692c943e3c/go.mod h1:GnHUbsQx+ysI10osPhUdTmsxcE7ef64cVp38Fdyd7e0=
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 h1:T5NWziFusj8au5nxAqMMh/bZyX9CAyYnBkaMSsfH6BA=
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632/go.mod h1:UsPYgOFBpNzDXLEti7MKOwHLpVSqdzuNGkVFPspQmnQ=
github.com/TykTechnologies/openid2go v0.1.2 h1:WXctksOahA/epTVVvbn9iNUuMXKRr0ksrF4dY9KW8o8=
Expand Down
45 changes: 45 additions & 0 deletions internal/memorycache/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package memorycache

import (
"sync"
"time"

"github.com/TykTechnologies/tyk/internal/model"
)

type Bucket struct {
capacity uint
remaining uint
reset time.Time
rate time.Duration
mutex sync.Mutex
}

func (b *Bucket) Capacity() uint {
return b.capacity
}

// Remaining space in the bucket.
func (b *Bucket) Remaining() uint {
return b.remaining
}

// Reset returns when the bucket will be drained.
func (b *Bucket) Reset() time.Time {
return b.reset
}

// Add to the bucket.
func (b *Bucket) Add(amount uint) (model.BucketState, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
if time.Now().After(b.reset) {
b.reset = time.Now().Add(b.rate)
b.remaining = b.capacity
}
if amount > b.remaining {
return model.BucketState{Capacity: b.capacity, Remaining: b.remaining, Reset: b.reset}, model.ErrBucketFull
}
b.remaining -= amount
return model.BucketState{Capacity: b.capacity, Remaining: b.remaining, Reset: b.reset}, nil
}
90 changes: 90 additions & 0 deletions internal/memorycache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package memorycache

import (
"context"
"sync"
"time"
)

// Cache is a synchronised map of items that auto-expire once stale
type Cache struct {
mutex sync.RWMutex
ttl time.Duration
items map[string]*Item
}

// NewCache is a helper to create instance of the Cache struct.
// The ctx is used to cancel the TTL map cleanup goroutine.
func NewCache(ctx context.Context, duration time.Duration) *Cache {
cache := &Cache{
ttl: duration,
items: map[string]*Item{},
}
go cache.startCleanupTimer(ctx)
return cache
}

// Set is a thread-safe way to add new items to the map
func (cache *Cache) Set(key string, data *Bucket) {
cache.mutex.Lock()
item := &Item{data: data}
item.touch(cache.ttl)
cache.items[key] = item
cache.mutex.Unlock()
}

// Get is a thread-safe way to lookup items
// Every lookup, also touches the item, hence extending it's life
func (cache *Cache) Get(key string) (data *Bucket, found bool) {
cache.mutex.Lock()
item, exists := cache.items[key]
if !exists || item.expired() {
data = &Bucket{}
found = false
} else {
item.touch(cache.ttl)
data = item.data
found = true
}
cache.mutex.Unlock()
return
}

// Count returns the number of items in the cache
// (helpful for tracking memory leaks)
func (cache *Cache) Count() int {
cache.mutex.RLock()
count := len(cache.items)
cache.mutex.RUnlock()
return count
}

func (cache *Cache) cleanup() {
cache.mutex.Lock()
for key, item := range cache.items {
if item.expired() {
delete(cache.items, key)
}
}
cache.mutex.Unlock()
}

func (cache *Cache) startCleanupTimer(ctx context.Context) {
interval := cache.ttl
if interval < time.Second {
interval = time.Second
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
// fmt.Println("Shutting down cleanup timer:", ctx.Err())
return
case <-ticker.C:
cache.cleanup()
}
}
}
15 changes: 15 additions & 0 deletions internal/memorycache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package memorycache

import (
"context"
"testing"
"time"
)

func TestCache_Shutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

cache := NewCache(ctx, time.Minute)
_ = cache.Get
}
32 changes: 32 additions & 0 deletions internal/memorycache/item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package memorycache

import (
"sync"
"time"
)

// Item represents a record in the cache map.
type Item struct {
sync.RWMutex
data *Bucket
expires *time.Time
}

func (item *Item) touch(duration time.Duration) {
item.Lock()
expiration := time.Now().Add(duration)
item.expires = &expiration
item.Unlock()
}

func (item *Item) expired() bool {
var value bool
item.RLock()
if item.expires == nil {
value = true
} else {
value = item.expires.Before(time.Now())
}
item.RUnlock()
return value
}
37 changes: 37 additions & 0 deletions internal/memorycache/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package memorycache

import (
"context"
"time"

"github.com/TykTechnologies/tyk/internal/model"
)

// BucketStorage is a non thread-safe in-memory leaky bucket factory.
type BucketStorage struct {
buckets *Cache
}

// New initializes the in-memory bucket store.
func New(ctx context.Context) *BucketStorage {
return &BucketStorage{
buckets: NewCache(ctx, 10*time.Minute),
}
}

// Create a bucket.
func (s *BucketStorage) Create(name string, capacity uint, rate time.Duration) (model.Bucket, error) {
b, ok := s.buckets.Get(name)
if ok {
return b, nil
}

b = &Bucket{
capacity: capacity,
remaining: capacity,
reset: time.Now().Add(rate),
rate: rate,
}
s.buckets.Set(name, b)
return b, nil
}
40 changes: 40 additions & 0 deletions internal/model/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package model

import (
"errors"
"time"
)

var (
// ErrBucketFull is returned when the amount requested to add exceeds the remaining space in the bucket.
ErrBucketFull = errors.New("add exceeds free bucket capacity")
)

// Bucket interface for interacting with leaky buckets: https://en.wikipedia.org/wiki/Leaky_bucket
type Bucket interface {
// Capacity of the bucket.
Capacity() uint

// Remaining space in the bucket.
Remaining() uint

// Reset returns when the bucket will be drained.
Reset() time.Time

// Add to the bucket. Returns bucket state after adding.
Add(uint) (BucketState, error)
}

// BucketState is a snapshot of a bucket's properties.
type BucketState struct {
Capacity uint
Remaining uint
Reset time.Time
}

// BucketStorage interface for generating buckets keyed by a string.
type BucketStorage interface {
// Create a bucket with a name, capacity, and rate.
// rate is how long it takes for full capacity to drain.
Create(name string, capacity uint, rate time.Duration) (Bucket, error)
}
2 changes: 2 additions & 0 deletions internal/model/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package model provides an internal data model for use across the gateway.
package model

0 comments on commit a6fa99d

Please sign in to comment.