Skip to content

Commit

Permalink
feat(chtracker): retrive reports concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Dec 7, 2024
1 parent 2cb91bd commit 8f5dbb3
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions cmd/otelbench/chtracker/chtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chtracker

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand All @@ -16,6 +17,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/tempoapi"
)
Expand Down Expand Up @@ -99,13 +101,35 @@ func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, Tracke
t.queriesMux.Lock()
defer t.queriesMux.Unlock()

for _, tq := range t.queries {
reports, err := t.retrieveReports(ctx, tq)
if err != nil {
return errors.Wrapf(err, "retrieve reports for %q", tq.TraceID)
grp, grpCtx := errgroup.WithContext(ctx)
type retrivalResult struct {
Reports []QueryReport
Err error
}
queries := make([]retrivalResult, len(t.queries))
for i, tq := range t.queries {
i, tq := i, tq
grp.Go(func() error {
r, err := t.retrieveReports(grpCtx, tq)
if err != nil {
err = errors.Wrapf(err, "retrieve reports for %q", tq.TraceID)
}
queries[i] = retrivalResult{Reports: r, Err: err}
return nil
})
}
if err := grp.Wait(); err != nil {
return errors.Wrap(err, "retrieve reports")
}

for i, result := range queries {
tq := t.queries[i]

if result.Err != nil {
fmt.Printf("Failed to retrieve reports for %q: %s\n", tq.TraceID, result.Err)
}

if err := cb(ctx, tq, reports); err != nil {
if err := cb(ctx, tq, result.Reports); err != nil {
return errors.Wrap(err, "report callback")
}
}
Expand Down

0 comments on commit 8f5dbb3

Please sign in to comment.