Skip to content

Commit

Permalink
Merge branch 'main' of ../private
Browse files Browse the repository at this point in the history
  • Loading branch information
lunar-devops committed Aug 28, 2024
2 parents 0317ab6 + 1e24119 commit 8109d70
Show file tree
Hide file tree
Showing 43 changed files with 758 additions and 335 deletions.
31 changes: 31 additions & 0 deletions proxy/integration-tests/features/flows_shared_state.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
@secondaryTests
Feature: Lunar Proxy - rate limit
Background: Starts the Proxy
Given API Provider is up
And Lunar Proxy env var `LUNAR_STREAMS_ENABLED` set to `true`
And Lunar Proxy env var `REDIS_URL` set to `redis://lunar-redis:6379`
And First Shared Lunar Proxy is up
And Second Shared Lunar Proxy is up

@flakey
Scenario: When basic rate limit flow is loaded and 2 proxies are used, they should share state
Given Redis is up

When Basic rate limit flow created for httpbinmock/* with 10 requests per 1 seconds
And flow file is saved on lunar-proxy-pro-1
And flow file is saved on lunar-proxy-pro-2
And resource file is saved on lunar-proxy-pro-1
And resource file is saved on lunar-proxy-pro-2

And load_flows command is run on lunar-proxy-pro-1
And load_flows command is run on lunar-proxy-pro-2

And next epoch-based 1 seconds window arrives

And 8 requests are sent in parallel to httpbinmock /headers through first Shared Lunar Proxy
Then 8 requests returning with status 200 and 0 with 429

# Second proxy runs in shares memory state and its state at this stage is 8
When 3 requests are sent to httpbinmock /headers through second Shared Lunar Proxy
# 10 with 200 and 1 with 429 since second proxy started from 8
Then 10 requests returning with status 200 and 1 with 429
12 changes: 11 additions & 1 deletion proxy/integration-tests/features/steps/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Might be handled later.

import os
import asyncio
from behave import when
from behave.api.async_step import async_run_until_complete
from typing import Any
Expand Down Expand Up @@ -62,7 +63,7 @@ async def step_impl(context: Any, container_name: str):
print(f"resource yaml:\n{resource_yaml}")
await write_resource_file(
filename="resource_quota.yaml",
resource_yaml=flow_yaml,
resource_yaml=resource_yaml,
container_name=container_name,
)

Expand All @@ -71,3 +72,12 @@ async def step_impl(context: Any, container_name: str):
@async_run_until_complete
async def step_impl(_: Any):
assert os.system("docker exec lunar-proxy load_flows") == 0


@when("load_flows command is run on {container_name}")
@async_run_until_complete
async def step_impl(_: Any, container_name: str):
for _ in range(10):
if os.system(f"docker exec {container_name} load_flows") == 0:
return True
await asyncio.sleep(1)
76 changes: 76 additions & 0 deletions proxy/src/libs/toolkit-core/context-manager/context_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package contextmanager

import (
"context"
"lunar/toolkit-core/clock"
"sync"

"github.com/rs/zerolog/log"
)

// Manager is the singleton that holds the context and clock
type ContextManager struct {
ctx context.Context
clock clock.Clock
}

var (
instance *ContextManager
once sync.Once
)

// Get returns the singleton instance of ContextManager
func Get() *ContextManager {
if instance == nil {
initContextManager(context.Background(), clock.NewRealClock())
}
return instance
}

// WithContext returns a new instance of ContextManager with the provided context
func (m *ContextManager) WithContext(ctx context.Context) *ContextManager {
m.ctx = ctx
return instance
}

// WithClock returns a new instance of ContextManager with the provided clock
func (m *ContextManager) SetRealClock() *ContextManager {
m.clock = clock.NewRealClock()
return instance
}

// SetMockClock returns a new instance of ContextManager with a mock clock
func (m *ContextManager) SetMockClock() *ContextManager {
m.clock = clock.NewMockClock()
return instance
}

// GetContext returns the context held by the Manager
func (m *ContextManager) GetContext() context.Context {
return m.ctx
}

// GetClock returns the current time held by the Manager
func (m *ContextManager) GetClock() clock.Clock {
return m.clock
}

// GetMockClock returns the mock time held by the Manager
func (m *ContextManager) GetMockClock() *clock.MockClock {
mock, ok := m.clock.(*clock.MockClock)
if !ok {
log.Error().Msg("Clock is not a mock clock")
return nil
}
return mock
}

// initContextManager initializes the singleton instance of ContextManager
func initContextManager(ctx context.Context, clk clock.Clock) {
once.Do(func() {
instance = &ContextManager{
ctx: ctx,
clock: clk,
}
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package contextmanager

import (
"context"
"lunar/toolkit-core/clock"
"testing"

"github.com/stretchr/testify/require"
)

func TestInitContextManager(t *testing.T) {
ctx := context.Background()
clk := clock.NewRealClock()

initContextManager(ctx, clk)

require.NotNil(t, instance)
require.Equal(t, ctx, instance.ctx)
require.Equal(t, clk, instance.clock)
}

func TestGetContextManager(t *testing.T) {
result := Get()
require.NotNil(t, result)
require.Equal(t, instance, result)
}

func TestGetContext(t *testing.T) {
ctx := context.Background()
result := Get().GetContext()
require.Equal(t, ctx, result)
}

func TestGetClock(t *testing.T) {
clk := clock.NewRealClock()
result := Get().GetClock()

require.Equal(t, clk, result)
}

func TestWithMockClock(t *testing.T) {
result := Get().SetMockClock()
require.IsType(t, &clock.MockClock{}, result.clock)
}

func TestWithRealClock(t *testing.T) {
result := Get().SetRealClock()
require.IsType(t, &clock.RealClock{}, result.clock)
}

func TestWithContext(t *testing.T) {
ctx := context.Background()
result := Get().WithContext(ctx)

require.Equal(t, ctx, result.GetContext())
}
7 changes: 3 additions & 4 deletions proxy/src/libs/toolkit-core/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package otel

import (
"context"
contextmanager "lunar/toolkit-core/context-manager"
"os"
"time"

Expand All @@ -27,10 +28,8 @@ const (

// Initializes an OTLP exporter, and configures the corresponding trace and
// metric providers.
func InitProvider(
ctx context.Context,
serviceName string,
) func() {
func InitProvider(serviceName string) func() {
ctx := contextmanager.Get().GetContext()
resource, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithProcess(),
Expand Down
45 changes: 43 additions & 2 deletions proxy/src/libs/toolkit-core/redis/keys.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis

import (
"encoding/json"
"errors"
"fmt"
"regexp"
Expand All @@ -19,6 +20,29 @@ type KeyPart struct {
useForHashing bool
}

func (kp KeyPart) MarshalJSON() ([]byte, error) {
return json.Marshal([]interface{}{kp.part, kp.useForHashing})
}

func (kp *KeyPart) UnmarshalJSON(data []byte) error {
var aux []interface{}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}

if len(aux) > 0 {
if part, ok := aux[0].(string); ok {
kp.part = part
}
if len(aux) > 1 {
if useForHashing, ok := aux[1].(bool); ok {
kp.useForHashing = useForHashing
}
}
}
return nil
}

type (
Key []KeyPart
LunarKey = string
Expand All @@ -31,8 +55,8 @@ var ErrKeyIsNil = errors.New("key is nil")

var hashtagPattern = regexp.MustCompile(`\{([^{}]+)\}`)

func (keyPart KeyPart) Part() string {
return keyPart.part
func (kp KeyPart) Part() string {
return kp.part
}

func UnhashedKeyPart(part string) KeyPart {
Expand Down Expand Up @@ -138,3 +162,20 @@ func ExtractHashTagFromRawKey(rawKey string) (HashtagExtraction, error) {

return HashtagExtraction{Found: false, Hashtag: ""}, nil
}

func MarshalKey(key Key) (string, error) {
marshalled, err := json.Marshal(key)
if err != nil {
return "", err
}
return string(marshalled), nil
}

func UnmarshalKey(marshalledKey string) (Key, error) {
var key Key
err := json.Unmarshal([]byte(marshalledKey), &key)
if err != nil {
return nil, err
}
return key, nil
}
51 changes: 51 additions & 0 deletions proxy/src/libs/toolkit-core/redis/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,54 @@ func TestItReturnsErrorIfMoreThanOneHashtagFound(t *testing.T) {
_, err := redis.ExtractHashTagFromRawKey(rawKey)
assert.NotNil(t, err)
}

func TestMarshalKey(t *testing.T) {
key := redis.NewKey().
Append(redis.UnhashedKeyPart("lunar")).
Append(redis.HashedKeyPart("policyA")).
Append(redis.UnhashedKeyPart("counter"))

marshalled, err := redis.MarshalKey(key)
assert.Nil(t, err)
assert.Equal(t, `[["lunar",false],["policyA",true],["counter",false]]`, marshalled)
}

func TestMarshalKeyEmptyKey(t *testing.T) {
key := redis.NewKey()

marshalled, err := redis.MarshalKey(key)
assert.Nil(t, err)
assert.Equal(t, `[]`, marshalled)
}

func TestUnmarshalKey(t *testing.T) {
key := redis.NewKey().
Append(redis.UnhashedKeyPart("lunar")).
Append(redis.HashedKeyPart("policyA")).
Append(redis.UnhashedKeyPart("counter"))

marshalledKey, err := redis.MarshalKey(key)
assert.Nil(t, err)

key, err = redis.UnmarshalKey(marshalledKey)
assert.Nil(t, err)
expectedKey := redis.NewKey().
Append(redis.UnhashedKeyPart("lunar")).
Append(redis.HashedKeyPart("policyA")).
Append(redis.UnhashedKeyPart("counter"))
assert.Equal(t, expectedKey, key)
}

func TestUnmarshalKeyEmptyKey(t *testing.T) {
marshalledKey := `[]`
key, err := redis.UnmarshalKey(marshalledKey)
assert.Nil(t, err)
expectedKey := redis.NewKey()
assert.Equal(t, expectedKey, key)
}

func TestUnmarshalKeyInvalidJSON(t *testing.T) {
marshalledKey := `[["lunar",false],["policyA",true],["counter",false]`
_, err := redis.UnmarshalKey(marshalledKey)
assert.NotNil(t, err)
}
Loading

0 comments on commit 8109d70

Please sign in to comment.