From 1acf68d7779ebf3ab3a4a627e738947de4ce9fac Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 6 Dec 2024 11:35:19 +0300 Subject: [PATCH 1/6] feat(logqlbench): add label queries --- cmd/otelbench/logqlbench/logqlbench.go | 13 +- cmd/otelbench/logqlbench/queries.go | 395 +++++++++++++++++++++---- cmd/otelbench/logqlbench/send.go | 87 +----- 3 files changed, 343 insertions(+), 152 deletions(-) diff --git a/cmd/otelbench/logqlbench/logqlbench.go b/cmd/otelbench/logqlbench/logqlbench.go index 0ea91774..462d190b 100644 --- a/cmd/otelbench/logqlbench/logqlbench.go +++ b/cmd/otelbench/logqlbench/logqlbench.go @@ -40,6 +40,7 @@ type LogQLBenchmark struct { client *lokiapi.Client start time.Time end time.Time + limit int } // Setup setups benchmark using given flags. @@ -53,6 +54,7 @@ func (p *LogQLBenchmark) Setup(cmd *cobra.Command) error { if p.end, err = lokihandler.ParseTimestamp(p.EndTime, time.Time{}); err != nil { return errors.Wrap(err, "parse end time") } + p.limit = 1000 p.tracker, err = chtracker.Setup[Query](ctx, "logql", p.TrackerOptions) if err != nil { @@ -131,12 +133,13 @@ func (p *LogQLBenchmark) Run(ctx context.Context) error { var reports []LogQLReportQuery if err := p.tracker.Report(ctx, func(ctx context.Context, tq chtracker.TrackedQuery[Query], queries []chtracker.QueryReport) error { + header := tq.Meta.Header() reports = append(reports, LogQLReportQuery{ - ID: tq.Meta.ID, - Title: tq.Meta.Title, - Description: tq.Meta.Description, - Query: tq.Meta.Query, - Matchers: tq.Meta.Match, + ID: header.ID, + Title: header.Title, + Description: header.Description, + Query: tq.Meta.Query(), + Matchers: tq.Meta.Matchers(), DurationNanos: tq.Duration.Nanoseconds(), Queries: queries, Timeout: tq.Timeout, diff --git a/cmd/otelbench/logqlbench/queries.go b/cmd/otelbench/logqlbench/queries.go index 469cd412..fd49686d 100644 --- a/cmd/otelbench/logqlbench/queries.go +++ b/cmd/otelbench/logqlbench/queries.go @@ -8,39 +8,311 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/yaml" + "github.com/go-faster/oteldb/internal/lokiapi" "github.com/go-faster/oteldb/internal/lokihandler" ) -// ConfigQuery defines LogQL query parameters. -type ConfigQuery struct { - Title string `yaml:"title,omitempty"` - Description string `yaml:"description,omitempty"` - Start string `yaml:"start,omitempty"` - End string `yaml:"end,omitempty"` - Step time.Duration `yaml:"step,omitempty"` - Query string `yaml:"query,omitempty"` - Match []string `yaml:"match,omitempty"` +// Query is a benchmarked query. +type Query interface { + Header() QueryHeader + Query() string + Matchers() []string + Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error } -// Query is a benchmarked query. -type Query struct { - ID int - Type string +var _ = []Query{ + &InstantQuery{}, + &RangeQuery{}, + &SeriesQuery{}, + &LabelsQuery{}, + &LabelValuesQuery{}, +} + +// QueryHeader is common for all queries. +type QueryHeader struct { + ID int `yaml:"-"` + + Title string `yaml:"title,omitempty"` + Description string `yaml:"description,omitempty"` +} + +// InstantQuery is an instant (`/loki/api/v1/query`) query. +type InstantQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + LogQL string `yaml:"query,omitempty"` +} + +// Header returns the query header. +func (q *InstantQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *InstantQuery) Query() string { + return q.LogQL +} + +// Matchers returns selectors for the query. +func (q *InstantQuery) Matchers() []string { + return nil +} + +// Execute executes the instant query. +func (q *InstantQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + + resp, err := client.Query(ctx, lokiapi.QueryParams{ + Time: toLokiTimestamp(start), + Query: q.LogQL, + Limit: lokiapi.NewOptInt(p.limit), + }) + if err != nil { + return errors.Wrap(err, "query") + } + + if isEmptyQueryResponse(resp.Data) && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +// RangeQuery is a range (`/loki/api/v1/query_range`) query. +type RangeQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Step time.Duration `yaml:"step,omitempty"` + LogQL string `yaml:"query,omitempty"` +} + +// Header returns the query header. +func (q *RangeQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *RangeQuery) Query() string { + return q.LogQL +} + +// Matchers returns selectors for the query. +func (q *RangeQuery) Matchers() []string { + return nil +} + +// Execute executes the range query. +func (q *RangeQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } + + resp, err := client.QueryRange(ctx, lokiapi.QueryRangeParams{ + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + Query: q.LogQL, + Step: toLokiDuration(q.Step), + Limit: lokiapi.NewOptInt(p.limit), + }) + if err != nil { + return errors.Wrap(err, "query range") + } + + if isEmptyQueryResponse(resp.Data) && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +func isEmptyQueryResponse(data lokiapi.QueryResponseData) bool { + switch typ := data.Type; typ { + case lokiapi.StreamsResultQueryResponseData: + streams := data.StreamsResult + return len(streams.Result) == 0 + case lokiapi.ScalarResultQueryResponseData: + return false + case lokiapi.VectorResultQueryResponseData: + vector := data.VectorResult + return len(vector.Result) == 0 + case lokiapi.MatrixResultQueryResponseData: + matrix := data.MatrixResult + return len(matrix.Result) == 0 + default: + return true + } +} + +// SeriesQuery is a series (`/loki/api/v1/series`) query. +type SeriesQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Match []string `yaml:"match,omitempty"` +} + +// Header returns the query header. +func (q *SeriesQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *SeriesQuery) Query() string { + return "" +} + +// Matchers returns selectors for the query. +func (q *SeriesQuery) Matchers() []string { + return q.Match +} + +// Execute executes the series query. +func (q *SeriesQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } - Title string - Description string - Start time.Time - End time.Time - Step time.Duration - Query string - Match []string + resp, err := client.Series(ctx, lokiapi.SeriesParams{ + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + Match: q.Match, + }) + if err != nil { + return errors.Wrap(err, "query series") + } + + if len(resp.Data) == 0 && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +// LabelsQuery is a labels (`/loki/api/v1/labels`) query. +type LabelsQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` +} + +// Header returns the query header. +func (q *LabelsQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *LabelsQuery) Query() string { + return "" +} + +// Matchers returns selectors for the query. +func (q *LabelsQuery) Matchers() []string { + return nil +} + +// Execute executes the labels query. +func (q *LabelsQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } + + resp, err := client.Labels(ctx, lokiapi.LabelsParams{ + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + }) + if err != nil { + return errors.Wrap(err, "query labels") + } + + if len(resp.Data) == 0 && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +// LabelValuesQuery is a label values (`/loki/api/v1/label_values`) query. +type LabelValuesQuery struct { + QueryHeader `yaml:"header,inline"` + + Name string `yaml:"name"` + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Match string `yaml:"match,omitempty"` +} + +// Header returns the query header. +func (q *LabelValuesQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *LabelValuesQuery) Query() string { + return q.Name +} + +// Matchers returns selectors for the query. +func (q *LabelValuesQuery) Matchers() []string { + return []string{q.Match} +} + +// Execute executes the label values query. +func (q *LabelValuesQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } + var matcher lokiapi.OptString + if q.Match != "" { + matcher.SetTo(q.Match) + } + + resp, err := client.LabelValues(ctx, lokiapi.LabelValuesParams{ + Name: q.Name, + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + Query: matcher, + }) + if err != nil { + return errors.Wrap(err, "query label values") + } + + if len(resp.Data) == 0 && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil } // Input defines queries config. type Input struct { - Instant []ConfigQuery `yaml:"instant"` - Range []ConfigQuery `yaml:"range"` - Series []ConfigQuery `yaml:"series"` + Instant []InstantQuery `yaml:"instant"` + Range []RangeQuery `yaml:"range"` + Series []SeriesQuery `yaml:"series"` + Labels []LabelsQuery `yaml:"labels"` + LabelValues []LabelValuesQuery `yaml:"label_values"` } func (p *LogQLBenchmark) each(ctx context.Context, fn func(ctx context.Context, q Query) error) error { @@ -54,58 +326,59 @@ func (p *LogQLBenchmark) each(ctx context.Context, fn func(ctx context.Context, return errors.Wrap(err, "unmarshal input") } - var id int - mapQuery := func(typ string, cq ConfigQuery) (Query, error) { - q := Query{ - Type: typ, - ID: id, - Title: cq.Title, - Description: cq.Description, - Step: cq.Step, - Query: cq.Query, + var ( + id int + nextID = func() (r int) { + r = id + id++ + return } + ) - var err error - q.Start, err = lokihandler.ParseTimestamp(cq.Start, p.start) - if err != nil { - return q, errors.Wrap(err, "parse start") - } - q.End, err = lokihandler.ParseTimestamp(cq.End, p.end) - if err != nil { - return q, errors.Wrap(err, "parse end") - } + for i := range input.Instant { + q := &input.Instant[i] + q.ID = nextID() - id++ - q.ID = id - return q, nil + if err := fn(ctx, q); err != nil { + return errors.Wrapf(err, "instant query %d: %q", i, q.Query()) + } } - for _, cq := range input.Instant { - q, err := mapQuery("instant", cq) - if err != nil { - return err - } + for i := range input.Range { + q := &input.Range[i] + q.ID = nextID() + if err := fn(ctx, q); err != nil { - return errors.Wrap(err, "callback") + return errors.Wrapf(err, "range query %d: %q", i, q.Query()) } } - for _, cq := range input.Range { - q, err := mapQuery("range", cq) - if err != nil { - return err - } + + for i := range input.Series { + q := &input.Series[i] + q.ID = nextID() + if err := fn(ctx, q); err != nil { - return errors.Wrap(err, "callback") + return errors.Wrapf(err, "series query %d: %#v", i, q.Matchers()) } } - for _, cq := range input.Series { - q, err := mapQuery("series", cq) - if err != nil { - return err + + for i := range input.Labels { + q := &input.Labels[i] + q.ID = nextID() + + if err := fn(ctx, q); err != nil { + return errors.Wrapf(err, "labels query %d", i) } + } + + for i := range input.LabelValues { + q := &input.LabelValues[i] + q.ID = nextID() + if err := fn(ctx, q); err != nil { - return errors.Wrap(err, "callback") + return errors.Wrapf(err, "label values query %d: %q", i, q.Query()) } } + return nil } diff --git a/cmd/otelbench/logqlbench/send.go b/cmd/otelbench/logqlbench/send.go index 7f43c56e..f0918698 100644 --- a/cmd/otelbench/logqlbench/send.go +++ b/cmd/otelbench/logqlbench/send.go @@ -2,12 +2,9 @@ package logqlbench import ( "context" - "fmt" "strconv" "time" - "github.com/go-faster/errors" - "github.com/go-faster/oteldb/internal/lokiapi" ) @@ -33,87 +30,5 @@ func (p *LogQLBenchmark) send(ctx context.Context, q Query) error { ctx, cancel := context.WithTimeout(ctx, p.RequestTimeout) defer cancel() - isEmpty := func(data lokiapi.QueryResponseData) bool { - switch typ := data.Type; typ { - case lokiapi.StreamsResultQueryResponseData: - streams := data.StreamsResult - return len(streams.Result) == 0 - case lokiapi.ScalarResultQueryResponseData: - return false - case lokiapi.VectorResultQueryResponseData: - vector := data.VectorResult - return len(vector.Result) == 0 - case lokiapi.MatrixResultQueryResponseData: - matrix := data.MatrixResult - return len(matrix.Result) == 0 - default: - return true - } - } - - const limit = 1000 - switch q.Type { - case "instant": - queryInfo := fmt.Sprintf("instant %q (start: %s, limit: %d)", - q.Query, - q.Start, limit, - ) - - resp, err := p.client.Query(ctx, lokiapi.QueryParams{ - Query: q.Query, - Time: toLokiTimestamp(q.Start), - Limit: lokiapi.NewOptInt(limit), - }) - if err != nil { - return errors.Wrapf(err, "send %s", queryInfo) - } - - if isEmpty(resp.Data) && !p.AllowEmpty { - return errors.Errorf("unexpected empty data: %s", queryInfo) - } - return nil - case "range": - queryInfo := fmt.Sprintf("range %q (start: %s, end: %s, step: %s, limit: %d)", - q.Query, - q.Start, q.End, q.Step, - limit, - ) - - resp, err := p.client.QueryRange(ctx, lokiapi.QueryRangeParams{ - Query: q.Query, - Start: toLokiTimestamp(q.Start), - End: toLokiTimestamp(q.End), - Step: toLokiDuration(q.Step), - Limit: lokiapi.NewOptInt(limit), - }) - if err != nil { - return errors.Wrapf(err, "send %s", queryInfo) - } - - if isEmpty(resp.Data) && !p.AllowEmpty { - return errors.Errorf("unexpected empty data: %s", queryInfo) - } - return nil - case "series": - queryInfo := fmt.Sprintf("series %v (start: %s, end: %s)", - q.Match, - q.Start, q.End, - ) - - resp, err := p.client.Series(ctx, lokiapi.SeriesParams{ - Start: toLokiTimestamp(q.Start), - End: toLokiTimestamp(q.End), - Match: q.Match, - }) - if err != nil { - return errors.Wrapf(err, "send %s", queryInfo) - } - - if len(resp.Data) == 0 && !p.AllowEmpty { - return errors.Errorf("unexpected empty data: %s", queryInfo) - } - return nil - default: - return errors.Errorf("unknown query type %q", q.Type) - } + return q.Execute(ctx, p.client, p) } From 365247d3722222068fe8845226a44f9df35af96d Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 6 Dec 2024 15:58:24 +0300 Subject: [PATCH 2/6] fix(otelbench): add query type to report --- cmd/otelbench/logql_analyze.go | 1 + cmd/otelbench/logqlbench/logqlbench.go | 1 + cmd/otelbench/logqlbench/queries.go | 37 ++++++++++++++++++++++++++ cmd/otelbench/logqlbench/result.go | 1 + 4 files changed, 40 insertions(+) diff --git a/cmd/otelbench/logql_analyze.go b/cmd/otelbench/logql_analyze.go index e875bafa..d1611887 100644 --- a/cmd/otelbench/logql_analyze.go +++ b/cmd/otelbench/logql_analyze.go @@ -89,6 +89,7 @@ func (a LogQLAnalyze) renderBenchstat(report logqlbench.LogQLReport, w io.Writer Name: bytes.Join( [][]byte{ []byte(`LogQL`), + []byte(q.Type), name, }, []byte{'/'}, diff --git a/cmd/otelbench/logqlbench/logqlbench.go b/cmd/otelbench/logqlbench/logqlbench.go index 462d190b..48c774cb 100644 --- a/cmd/otelbench/logqlbench/logqlbench.go +++ b/cmd/otelbench/logqlbench/logqlbench.go @@ -136,6 +136,7 @@ func (p *LogQLBenchmark) Run(ctx context.Context) error { header := tq.Meta.Header() reports = append(reports, LogQLReportQuery{ ID: header.ID, + Type: string(tq.Meta.Type()), Title: header.Title, Description: header.Description, Query: tq.Meta.Query(), diff --git a/cmd/otelbench/logqlbench/queries.go b/cmd/otelbench/logqlbench/queries.go index fd49686d..5a8284b2 100644 --- a/cmd/otelbench/logqlbench/queries.go +++ b/cmd/otelbench/logqlbench/queries.go @@ -14,12 +14,24 @@ import ( // Query is a benchmarked query. type Query interface { + Type() QueryType Header() QueryHeader Query() string Matchers() []string Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error } +// QueryType is a type of query. +type QueryType string + +const ( + InstantQueryType QueryType = "instant" + RangeQueryType QueryType = "range" + SeriesQueryType QueryType = "series" + LabelsQueryType QueryType = "labels" + LabelValuesQueryType QueryType = "label_values" +) + var _ = []Query{ &InstantQuery{}, &RangeQuery{}, @@ -44,6 +56,11 @@ type InstantQuery struct { LogQL string `yaml:"query,omitempty"` } +// Type returns the query type. +func (q *InstantQuery) Type() QueryType { + return InstantQueryType +} + // Header returns the query header. func (q *InstantQuery) Header() QueryHeader { return q.QueryHeader @@ -91,6 +108,11 @@ type RangeQuery struct { LogQL string `yaml:"query,omitempty"` } +// Type returns the query type. +func (q *RangeQuery) Type() QueryType { + return RangeQueryType +} + // Header returns the query header. func (q *RangeQuery) Header() QueryHeader { return q.QueryHeader @@ -161,6 +183,11 @@ type SeriesQuery struct { Match []string `yaml:"match,omitempty"` } +// Type returns the query type. +func (q *SeriesQuery) Type() QueryType { + return SeriesQueryType +} + // Header returns the query header. func (q *SeriesQuery) Header() QueryHeader { return q.QueryHeader @@ -210,6 +237,11 @@ type LabelsQuery struct { End string `yaml:"end,omitempty"` } +// Type returns the query type. +func (q *LabelsQuery) Type() QueryType { + return LabelsQueryType +} + // Header returns the query header. func (q *LabelsQuery) Header() QueryHeader { return q.QueryHeader @@ -260,6 +292,11 @@ type LabelValuesQuery struct { Match string `yaml:"match,omitempty"` } +// Type returns the query type. +func (q *LabelValuesQuery) Type() QueryType { + return LabelValuesQueryType +} + // Header returns the query header. func (q *LabelValuesQuery) Header() QueryHeader { return q.QueryHeader diff --git a/cmd/otelbench/logqlbench/result.go b/cmd/otelbench/logqlbench/result.go index 12e0b492..06e2393e 100644 --- a/cmd/otelbench/logqlbench/result.go +++ b/cmd/otelbench/logqlbench/result.go @@ -8,6 +8,7 @@ type LogQLReport struct { type LogQLReportQuery struct { ID int `yaml:"id,omitempty"` + Type string `yaml:"type,omitempty"` Query string `yaml:"query,omitempty"` Title string `yaml:"title,omitempty"` Description string `yaml:"description,omitempty"` From 2cb91bd6b8265a082909dfea05463cae0cceb0be Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 6 Dec 2024 15:58:48 +0300 Subject: [PATCH 3/6] chore(ch-log-bench-read): add more benchmark queries --- dev/local/ch-log-bench-read/testdata/logql.yml | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/dev/local/ch-log-bench-read/testdata/logql.yml b/dev/local/ch-log-bench-read/testdata/logql.yml index ad0654ae..8370c219 100644 --- a/dev/local/ch-log-bench-read/testdata/logql.yml +++ b/dev/local/ch-log-bench-read/testdata/logql.yml @@ -1,7 +1,21 @@ range: - - title: Lookup by materialzied attribute + - title: Lookup by materialized attribute query: |- {level!="DEBUG", service_name="go-faster.oteldb"} - title: Lookup by regular attribute query: |- {operationName="SearchTagValuesV2"} +series: + - title: Lookup by materialized attribute + match: + - |- + { level!="DEBUG", service_name="go-faster.oteldb" } + - title: Lookup by regular attribute + match: + - |- + { operationName="SearchTagValuesV2" } +labels: + - title: Lookup labels +label_values: + - title: Lookup by materialized attribute + name: service_name From 8f5dbb3787b194d805a715c0b9d9c75691428fda Mon Sep 17 00:00:00 2001 From: tdakkota Date: Sat, 7 Dec 2024 16:33:09 +0300 Subject: [PATCH 4/6] feat(chtracker): retrive reports concurrently --- cmd/otelbench/chtracker/chtracker.go | 34 ++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/cmd/otelbench/chtracker/chtracker.go b/cmd/otelbench/chtracker/chtracker.go index e3d0d664..5f16422a 100644 --- a/cmd/otelbench/chtracker/chtracker.go +++ b/cmd/otelbench/chtracker/chtracker.go @@ -3,6 +3,7 @@ package chtracker import ( "context" + "fmt" "net/http" "sync" "time" @@ -16,6 +17,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" "github.com/go-faster/oteldb/internal/tempoapi" ) @@ -99,13 +101,35 @@ func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, Tracke t.queriesMux.Lock() defer t.queriesMux.Unlock() - for _, tq := range t.queries { - reports, err := t.retrieveReports(ctx, tq) - if err != nil { - return errors.Wrapf(err, "retrieve reports for %q", tq.TraceID) + grp, grpCtx := errgroup.WithContext(ctx) + type retrivalResult struct { + Reports []QueryReport + Err error + } + queries := make([]retrivalResult, len(t.queries)) + for i, tq := range t.queries { + i, tq := i, tq + grp.Go(func() error { + r, err := t.retrieveReports(grpCtx, tq) + if err != nil { + err = errors.Wrapf(err, "retrieve reports for %q", tq.TraceID) + } + queries[i] = retrivalResult{Reports: r, Err: err} + return nil + }) + } + if err := grp.Wait(); err != nil { + return errors.Wrap(err, "retrieve reports") + } + + for i, result := range queries { + tq := t.queries[i] + + if result.Err != nil { + fmt.Printf("Failed to retrieve reports for %q: %s\n", tq.TraceID, result.Err) } - if err := cb(ctx, tq, reports); err != nil { + if err := cb(ctx, tq, result.Reports); err != nil { return errors.Wrap(err, "report callback") } } From f93e8f8544077ae743a3354c276113f7df9d9aa6 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Sun, 8 Dec 2024 11:05:52 +0300 Subject: [PATCH 5/6] fix(otelbench): properly handle lack of reports --- cmd/otelbench/chtracker/chtracker.go | 9 ++------- cmd/otelbench/logqlbench/logqlbench.go | 12 +++++++++++- cmd/otelbench/logqlbench/result.go | 1 + cmd/otelbench/promql_bench.go | 6 +++++- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/cmd/otelbench/chtracker/chtracker.go b/cmd/otelbench/chtracker/chtracker.go index 5f16422a..aa2497be 100644 --- a/cmd/otelbench/chtracker/chtracker.go +++ b/cmd/otelbench/chtracker/chtracker.go @@ -3,7 +3,6 @@ package chtracker import ( "context" - "fmt" "net/http" "sync" "time" @@ -93,7 +92,7 @@ func (t *Tracker[Q]) Track(ctx context.Context, meta Q, cb func(context.Context, } // Report iterates over tracked queries. -func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, TrackedQuery[Q], []QueryReport) error) error { +func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, TrackedQuery[Q], []QueryReport, error) error) error { if err := t.Flush(ctx); err != nil { return err } @@ -125,11 +124,7 @@ func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, Tracke for i, result := range queries { tq := t.queries[i] - if result.Err != nil { - fmt.Printf("Failed to retrieve reports for %q: %s\n", tq.TraceID, result.Err) - } - - if err := cb(ctx, tq, result.Reports); err != nil { + if err := cb(ctx, tq, result.Reports, result.Err); err != nil { return errors.Wrap(err, "report callback") } } diff --git a/cmd/otelbench/logqlbench/logqlbench.go b/cmd/otelbench/logqlbench/logqlbench.go index 48c774cb..62c9cf93 100644 --- a/cmd/otelbench/logqlbench/logqlbench.go +++ b/cmd/otelbench/logqlbench/logqlbench.go @@ -132,7 +132,16 @@ func (p *LogQLBenchmark) Run(ctx context.Context) error { var reports []LogQLReportQuery if err := p.tracker.Report(ctx, - func(ctx context.Context, tq chtracker.TrackedQuery[Query], queries []chtracker.QueryReport) error { + func(ctx context.Context, tq chtracker.TrackedQuery[Query], queries []chtracker.QueryReport, retriveErr error) error { + var errMsg string + if retriveErr != nil { + if errors.Is(retriveErr, context.DeadlineExceeded) { + errMsg = "no queries" + } else { + errMsg = retriveErr.Error() + } + } + header := tq.Meta.Header() reports = append(reports, LogQLReportQuery{ ID: header.ID, @@ -144,6 +153,7 @@ func (p *LogQLBenchmark) Run(ctx context.Context) error { DurationNanos: tq.Duration.Nanoseconds(), Queries: queries, Timeout: tq.Timeout, + ReportError: errMsg, }) return nil }, diff --git a/cmd/otelbench/logqlbench/result.go b/cmd/otelbench/logqlbench/result.go index 06e2393e..1a07b36a 100644 --- a/cmd/otelbench/logqlbench/result.go +++ b/cmd/otelbench/logqlbench/result.go @@ -16,4 +16,5 @@ type LogQLReportQuery struct { Matchers []string `yaml:"matchers,omitempty"` Queries []chtracker.QueryReport `yaml:"queries,omitempty"` Timeout bool `yaml:"timeout,omitempty"` + ReportError string `yaml:"report_error,omitempty"` } diff --git a/cmd/otelbench/promql_bench.go b/cmd/otelbench/promql_bench.go index 01d2a8df..0fc38df3 100644 --- a/cmd/otelbench/promql_bench.go +++ b/cmd/otelbench/promql_bench.go @@ -419,7 +419,11 @@ func (p *PromQL) Run(ctx context.Context) error { var reports []PromQLReportQuery if err := p.tracker.Report(ctx, - func(ctx context.Context, tq chtracker.TrackedQuery[promQLQuery], queries []chtracker.QueryReport) error { + func(ctx context.Context, tq chtracker.TrackedQuery[promQLQuery], queries []chtracker.QueryReport, retriveErr error) error { + if retriveErr != nil { + return retriveErr + } + entry := PromQLReportQuery{ ID: tq.Meta.ID, DurationNanos: tq.Duration.Nanoseconds(), From d7ea5016874671d7e891acc412417e2184a4206f Mon Sep 17 00:00:00 2001 From: tdakkota Date: Sun, 8 Dec 2024 11:06:13 +0300 Subject: [PATCH 6/6] fix(otelbench): note that query is failed due to timeout --- cmd/otelbench/logql_analyze.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/otelbench/logql_analyze.go b/cmd/otelbench/logql_analyze.go index d1611887..c083a212 100644 --- a/cmd/otelbench/logql_analyze.go +++ b/cmd/otelbench/logql_analyze.go @@ -57,7 +57,11 @@ func (a LogQLAnalyze) renderPretty(report logqlbench.LogQLReport, w io.Writer) e d := time.Duration(nanos) * time.Nanosecond return d.Round(time.Millisecond / 20).String() } - fmt.Fprintln(&buf, " duration:", formatNanos(q.DurationNanos)) + fmt.Fprint(&buf, " duration:", formatNanos(q.DurationNanos)) + if q.Timeout { + fmt.Fprint(&buf, " (timeout)") + } + fmt.Fprintln(&buf) if len(q.Queries) > 0 { fmt.Fprintln(&buf, " sql queries:", len(q.Queries)) @@ -81,6 +85,10 @@ func (a LogQLAnalyze) renderPretty(report logqlbench.LogQLReport, w io.Writer) e func (a LogQLAnalyze) renderBenchstat(report logqlbench.LogQLReport, w io.Writer) error { var recs []benchfmt.Result for _, q := range report.Queries { + if q.ReportError != "" { + continue + } + name := normalizeBenchName(q.Title) if len(name) == 0 { name = fmt.Appendf(name, "Query%d", q.ID)