Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker pool for log pushers #1499

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Add worker pool for log pushers #1499

wants to merge 3 commits into from

Conversation

jefchien
Copy link
Contributor

@jefchien jefchien commented Jan 13, 2025

Description of the issue

One of the bottlenecks for throughput for the agent is that each target (log group/stream) has a single-threaded pusher which blocks until it has successfully sent the request or run out of retries. This limits the amount of requests that can be made and limits the size/rate of increase for the tailed log file.

Description of changes

By supporting concurrency with the sender, the agent is able to send and prepare batches more efficiently.

  • Moved the pusher.go into a pusher package and split the queue/batch functionality from the sender.
  • Added a simple WorkerPool that queues up tasks, which are taken by the first available worker.
  • Added a Concurrency field. If configured, the CloudWatch logs will have a shared pool of workers to send PLE requests.
  • Changed the perEventHeaderBytes from 200 back to 26. This was arbitrarily changed as part of Add New Metrics For Where Customer Are Using The Agent #913, but does not match the PLE specifications that it was meant to be taken from.

The maximum batch size is 1,048,576 bytes. This size is calculated as the sum of all event messages in UTF-8, plus 26 bytes for each log event.

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Tests

Added unit tests.

One example in the unit tests for comparison is the TestPusher. With a 50ms latency and 100000 events, the sender pool configuration with 5 workers took 1/3 of the time as the single-threaded sender.

    --- PASS: TestPusher/WithSender (0.67s)
    --- PASS: TestPusher/WithSenderPool (0.23s)

Benchmarks in progress.

Requirements

Before commit the code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make lint

@jefchien jefchien requested a review from a team as a code owner January 13, 2025 15:20
Split up queue/batch from sender.
translator/tocwconfig/sampleConfig/log_filter.conf Outdated Show resolved Hide resolved
For each configured target (log group/stream), the output plugin maintains a queue for log events that it batches.
Once each batch is full or the flush interval is reached, the current batch is sent using the PutLogEvents API to Amazon CloudWatch.

When concurrency is enabled, the pusher uses a shared worker pool to allow multiple concurrent sends.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this default behavior? Default of like min(4, # of cores)

Are we considering it experimental until we get some real world feedback?

Copy link
Contributor Author

@jefchien jefchien Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's the current plan.

workerCount atomic.Int32
wg sync.WaitGroup
stopCh chan struct{}
stopped atomic.Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my initial thought is mixing atomics and channels is kind of weird

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the stopped and the stopCh? I think you're right. I can probably remove the atomic.Bool and just use the channel. I initially just had stopped and didn't remove it once I added the channel.

client := c.createClient(logThrottleRetryer)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought container insights doesn't send through cloudwatch output plugin? The agent only uses emf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see you just moved it from createClient()...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically, we had functionality where if the log group matched the regex (^/aws/.*containerinsights/.*/(performance|prometheus)$), then we'd count it as container insights. I don't think this path is used anymore, but we can verify that and clean it up in a separate PR.

go p.worker()
}

func (p *workerPool) worker() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: worker() func doesn't have a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add one. I need to clean up the comments.

)

type Queue interface {
AddEvent(e logs.LogEvent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an example of when we would add a blocking event vs non blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also legacy functionality. The AddNonBlockingEvent was used for EMF logs. We've moved that over to OTEL, but don't know if there are any customers using TOML to still send EMF logs this way.

})

if err == nil {
m.logger.Debugf("successfully created log stream %v", t.Stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we log the error instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is nil here, so there's nothing to log.

"concurrency": {
"description": "The number of concurrent workers available for cloudwatch logs export",
"type": "integer",
"minimum": 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we define a max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't for the X-Ray concurrency field

"concurrency": {
"description": "Maximum number of concurrent calls to AWS X-Ray to upload documents",
"type": "integer",
"minimum": 1
},

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants