From 8f5dbb3787b194d805a715c0b9d9c75691428fda Mon Sep 17 00:00:00 2001 From: tdakkota Date: Sat, 7 Dec 2024 16:33:09 +0300 Subject: [PATCH] feat(chtracker): retrive reports concurrently --- cmd/otelbench/chtracker/chtracker.go | 34 ++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/cmd/otelbench/chtracker/chtracker.go b/cmd/otelbench/chtracker/chtracker.go index e3d0d664..5f16422a 100644 --- a/cmd/otelbench/chtracker/chtracker.go +++ b/cmd/otelbench/chtracker/chtracker.go @@ -3,6 +3,7 @@ package chtracker import ( "context" + "fmt" "net/http" "sync" "time" @@ -16,6 +17,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" "github.com/go-faster/oteldb/internal/tempoapi" ) @@ -99,13 +101,35 @@ func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, Tracke t.queriesMux.Lock() defer t.queriesMux.Unlock() - for _, tq := range t.queries { - reports, err := t.retrieveReports(ctx, tq) - if err != nil { - return errors.Wrapf(err, "retrieve reports for %q", tq.TraceID) + grp, grpCtx := errgroup.WithContext(ctx) + type retrivalResult struct { + Reports []QueryReport + Err error + } + queries := make([]retrivalResult, len(t.queries)) + for i, tq := range t.queries { + i, tq := i, tq + grp.Go(func() error { + r, err := t.retrieveReports(grpCtx, tq) + if err != nil { + err = errors.Wrapf(err, "retrieve reports for %q", tq.TraceID) + } + queries[i] = retrivalResult{Reports: r, Err: err} + return nil + }) + } + if err := grp.Wait(); err != nil { + return errors.Wrap(err, "retrieve reports") + } + + for i, result := range queries { + tq := t.queries[i] + + if result.Err != nil { + fmt.Printf("Failed to retrieve reports for %q: %s\n", tq.TraceID, result.Err) } - if err := cb(ctx, tq, reports); err != nil { + if err := cb(ctx, tq, result.Reports); err != nil { return errors.Wrap(err, "report callback") } }