-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Embed memorycache, drop leakybucket import
- Loading branch information
Showing
10 changed files
with
265 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package memorycache | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
|
||
"github.com/TykTechnologies/tyk/internal/model" | ||
) | ||
|
||
type Bucket struct { | ||
capacity uint | ||
remaining uint | ||
reset time.Time | ||
rate time.Duration | ||
mutex sync.Mutex | ||
} | ||
|
||
func (b *Bucket) Capacity() uint { | ||
return b.capacity | ||
} | ||
|
||
// Remaining space in the bucket. | ||
func (b *Bucket) Remaining() uint { | ||
return b.remaining | ||
} | ||
|
||
// Reset returns when the bucket will be drained. | ||
func (b *Bucket) Reset() time.Time { | ||
return b.reset | ||
} | ||
|
||
// Add to the bucket. | ||
func (b *Bucket) Add(amount uint) (model.BucketState, error) { | ||
b.mutex.Lock() | ||
defer b.mutex.Unlock() | ||
if time.Now().After(b.reset) { | ||
b.reset = time.Now().Add(b.rate) | ||
b.remaining = b.capacity | ||
} | ||
if amount > b.remaining { | ||
return model.BucketState{Capacity: b.capacity, Remaining: b.remaining, Reset: b.reset}, model.ErrBucketFull | ||
} | ||
b.remaining -= amount | ||
return model.BucketState{Capacity: b.capacity, Remaining: b.remaining, Reset: b.reset}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package memorycache | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
) | ||
|
||
// Cache is a synchronised map of items that auto-expire once stale | ||
type Cache struct { | ||
mutex sync.RWMutex | ||
ttl time.Duration | ||
items map[string]*Item | ||
} | ||
|
||
// NewCache is a helper to create instance of the Cache struct. | ||
// The ctx is used to cancel the TTL map cleanup goroutine. | ||
func NewCache(ctx context.Context, duration time.Duration) *Cache { | ||
cache := &Cache{ | ||
ttl: duration, | ||
items: map[string]*Item{}, | ||
} | ||
go cache.startCleanupTimer(ctx) | ||
return cache | ||
} | ||
|
||
// Set is a thread-safe way to add new items to the map | ||
func (cache *Cache) Set(key string, data *Bucket) { | ||
cache.mutex.Lock() | ||
item := &Item{data: data} | ||
item.touch(cache.ttl) | ||
cache.items[key] = item | ||
cache.mutex.Unlock() | ||
} | ||
|
||
// Get is a thread-safe way to lookup items | ||
// Every lookup, also touches the item, hence extending it's life | ||
func (cache *Cache) Get(key string) (data *Bucket, found bool) { | ||
cache.mutex.Lock() | ||
item, exists := cache.items[key] | ||
if !exists || item.expired() { | ||
data = &Bucket{} | ||
found = false | ||
} else { | ||
item.touch(cache.ttl) | ||
data = item.data | ||
found = true | ||
} | ||
cache.mutex.Unlock() | ||
return | ||
} | ||
|
||
// Count returns the number of items in the cache | ||
// (helpful for tracking memory leaks) | ||
func (cache *Cache) Count() int { | ||
cache.mutex.RLock() | ||
count := len(cache.items) | ||
cache.mutex.RUnlock() | ||
return count | ||
} | ||
|
||
func (cache *Cache) cleanup() { | ||
cache.mutex.Lock() | ||
for key, item := range cache.items { | ||
if item.expired() { | ||
delete(cache.items, key) | ||
} | ||
} | ||
cache.mutex.Unlock() | ||
} | ||
|
||
func (cache *Cache) startCleanupTimer(ctx context.Context) { | ||
interval := cache.ttl | ||
if interval < time.Second { | ||
interval = time.Second | ||
} | ||
|
||
ticker := time.NewTicker(interval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
// fmt.Println("Shutting down cleanup timer:", ctx.Err()) | ||
return | ||
case <-ticker.C: | ||
cache.cleanup() | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package memorycache | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestCache_Shutdown(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
|
||
cache := NewCache(ctx, time.Minute) | ||
_ = cache.Get | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package memorycache | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
// Item represents a record in the cache map. | ||
type Item struct { | ||
sync.RWMutex | ||
data *Bucket | ||
expires *time.Time | ||
} | ||
|
||
func (item *Item) touch(duration time.Duration) { | ||
item.Lock() | ||
expiration := time.Now().Add(duration) | ||
item.expires = &expiration | ||
item.Unlock() | ||
} | ||
|
||
func (item *Item) expired() bool { | ||
var value bool | ||
item.RLock() | ||
if item.expires == nil { | ||
value = true | ||
} else { | ||
value = item.expires.Before(time.Now()) | ||
} | ||
item.RUnlock() | ||
return value | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package memorycache | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/TykTechnologies/tyk/internal/model" | ||
) | ||
|
||
// BucketStorage is a non thread-safe in-memory leaky bucket factory. | ||
type BucketStorage struct { | ||
buckets *Cache | ||
} | ||
|
||
// New initializes the in-memory bucket store. | ||
func New(ctx context.Context) *BucketStorage { | ||
return &BucketStorage{ | ||
buckets: NewCache(ctx, 10*time.Minute), | ||
} | ||
} | ||
|
||
// Create a bucket. | ||
func (s *BucketStorage) Create(name string, capacity uint, rate time.Duration) (model.Bucket, error) { | ||
b, ok := s.buckets.Get(name) | ||
if ok { | ||
return b, nil | ||
} | ||
|
||
b = &Bucket{ | ||
capacity: capacity, | ||
remaining: capacity, | ||
reset: time.Now().Add(rate), | ||
rate: rate, | ||
} | ||
s.buckets.Set(name, b) | ||
return b, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package model | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
) | ||
|
||
var ( | ||
// ErrBucketFull is returned when the amount requested to add exceeds the remaining space in the bucket. | ||
ErrBucketFull = errors.New("add exceeds free bucket capacity") | ||
) | ||
|
||
// Bucket interface for interacting with leaky buckets: https://en.wikipedia.org/wiki/Leaky_bucket | ||
type Bucket interface { | ||
// Capacity of the bucket. | ||
Capacity() uint | ||
|
||
// Remaining space in the bucket. | ||
Remaining() uint | ||
|
||
// Reset returns when the bucket will be drained. | ||
Reset() time.Time | ||
|
||
// Add to the bucket. Returns bucket state after adding. | ||
Add(uint) (BucketState, error) | ||
} | ||
|
||
// BucketState is a snapshot of a bucket's properties. | ||
type BucketState struct { | ||
Capacity uint | ||
Remaining uint | ||
Reset time.Time | ||
} | ||
|
||
// BucketStorage interface for generating buckets keyed by a string. | ||
type BucketStorage interface { | ||
// Create a bucket with a name, capacity, and rate. | ||
// rate is how long it takes for full capacity to drain. | ||
Create(name string, capacity uint, rate time.Duration) (Bucket, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
// Package model provides an internal data model for use across the gateway. | ||
package model |