Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration options for hardcoded values #111

Merged
merged 3 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ func run() error {
// Rabbit MQ: "rabbit://myqueue|rabbit://myexchange"
// NATS: "nats://example.mysubject1|nats://example.mysubject2"
// Kafka: "kafka://my-group?topic=my-topic1|kafka://my-topic2"
MessengerURLs []string `env:"MESSENGER_URLS"`
MessengerURLs []string `env:"MESSENGER_URLS"`
MessengerErrorMaxBackoff time.Duration `env:"MESSENGER_ERROR_MAX_BACKOFF, default=3m"`

AutoscalingInterval time.Duration `env:"AUTOSCALING_INTERVAL, default=3s"`
AutoscalingTimeWindow time.Duration `env:"AUTOSCALING_TIME_WINDOW, default=30s"`

MetricsBindAddress string `env:"METRICS_BIND_ADDRESS, default=:8082"`
HealthProbeBindAddress string `env:"HEALTH_PROBE_BIND_ADDRESS, default=:8081"`
Expand Down Expand Up @@ -184,8 +188,9 @@ func run() error {
if err != nil {
return fmt.Errorf("setting up autoscaler: %w", err)
}
autoscaler.Interval = 3 * time.Second
autoscaler.AverageCount = 10 // 10 * 3 seconds = 30 sec avg
autoscaler.Interval = cfg.AutoscalingInterval
// 10 average count = 30 sec window / 3 sec interval
autoscaler.AverageCount = int(cfg.AutoscalingTimeWindow / cfg.AutoscalingInterval)
autoscaler.LeaderElection = le
autoscaler.Deployments = deploymentManager
autoscaler.ConcurrencyPerReplica = cfg.Concurrency
Expand Down Expand Up @@ -213,6 +218,7 @@ func run() error {
msgURL.requests,
msgURL.responses,
msgURL.maxHandlers,
cfg.MessengerErrorMaxBackoff,
deploymentManager,
endpointManager,
queueManager,
Expand Down
28 changes: 15 additions & 13 deletions pkg/messenger/messager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type Messenger struct {

HTTPC *http.Client

MaxHandlers int
MaxHandlers int
ErrorMaxBackoff time.Duration

requests *pubsub.Subscription
responses *pubsub.Topic
Expand All @@ -42,6 +43,7 @@ func NewMessenger(
requestsURL string,
responsesURL string,
maxHandlers int,
errorMaxBackoff time.Duration,
deployments DeploymentManager,
endpoints EndpointManager,
queues QueueManager,
Expand All @@ -58,13 +60,14 @@ func NewMessenger(
}

return &Messenger{
Deployments: deployments,
Endpoints: endpoints,
Queues: queues,
HTTPC: httpClient,
requests: requests,
responses: responses,
MaxHandlers: maxHandlers,
Deployments: deployments,
Endpoints: endpoints,
Queues: queues,
HTTPC: httpClient,
requests: requests,
responses: responses,
MaxHandlers: maxHandlers,
ErrorMaxBackoff: errorMaxBackoff,
}, nil
}

Expand Down Expand Up @@ -103,7 +106,7 @@ recvLoop:
// * Some request-generation job sending a million malformed requests into a topic.
// (Slow until an admin can intervene)
if consecutiveErrors := m.getConsecutiveErrors(); consecutiveErrors > 0 {
wait := consecutiveErrBackoff(consecutiveErrors)
wait := consecutiveErrBackoff(consecutiveErrors, m.ErrorMaxBackoff)
log.Printf("after %d consecutive errors, waiting %v before processing next message", consecutiveErrors, wait)
time.Sleep(wait)
}
Expand All @@ -118,11 +121,10 @@ recvLoop:
return nil
}

func consecutiveErrBackoff(n int) time.Duration {
func consecutiveErrBackoff(n int, max time.Duration) time.Duration {
d := time.Duration(n) * time.Second
const maxBackoff = 3 * time.Minute
if d > maxBackoff {
return maxBackoff
if d > max {
return max
}
return d
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func TestMain(m *testing.M) {
memRequestsURL,
memResponsesURL,
1000,
time.Minute,
deploymentManager,
endpointManager,
queueManager,
Expand Down
Loading