From 49d026d56670ea7230c2575b5078728c6960dfce Mon Sep 17 00:00:00 2001 From: tdakkota Date: Tue, 28 Nov 2023 04:52:20 +0300 Subject: [PATCH] feat(chstorage): query histograms too --- internal/chstorage/querier_metrics.go | 175 +++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 6 deletions(-) diff --git a/internal/chstorage/querier_metrics.go b/internal/chstorage/querier_metrics.go index e453f77c..741f4c75 100644 --- a/internal/chstorage/querier_metrics.go +++ b/internal/chstorage/querier_metrics.go @@ -3,6 +3,9 @@ package chstorage import ( "context" "fmt" + "maps" + "math" + "strconv" "strings" "time" @@ -10,9 +13,11 @@ import ( "github.com/ClickHouse/ch-go/proto" "github.com/go-faster/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" @@ -183,6 +188,7 @@ type seriesKey struct { name string attributes string resource string + bucketKey [2]string } func (p *promQuerier) selectSeries(ctx context.Context, hints *storage.SelectHints, matchers ...*labels.Matcher) (_ storage.SeriesSet, rerr error) { @@ -265,8 +271,9 @@ func (p *promQuerier) selectSeries(ctx context.Context, hints *storage.SelectHin } var ( - points []storage.Series - histSeries []storage.Series + points []storage.Series + histSeries []storage.Series + expHistSeries []storage.Series ) grp, grpCtx := errgroup.WithContext(ctx) grp.Go(func() error { @@ -287,23 +294,39 @@ func (p *promQuerier) selectSeries(ctx context.Context, hints *storage.SelectHin grp.Go(func() error { ctx := grpCtx - query, err := buildQuery(p.tables.ExpHistograms) + query, err := buildQuery(p.tables.Histograms) if err != nil { return err } - result, err := p.queryExpHistrograms(ctx, query) + result, err := p.queryHistograms(ctx, query) if err != nil { - return errors.Wrap(err, "query histrograms") + return errors.Wrap(err, "query histograms") } histSeries = result return nil }) + grp.Go(func() error { + ctx := grpCtx + + query, err := buildQuery(p.tables.ExpHistograms) + if err != nil { + return err + } + + result, err := p.queryExpHistograms(ctx, query) + if err != nil { + return errors.Wrap(err, "query exponential histograms") + } + expHistSeries = result + return nil + }) if err := grp.Wait(); err != nil { return nil, err } points = append(points, histSeries...) + points = append(points, expHistSeries...) return newSeriesSet(points), nil } @@ -376,7 +399,147 @@ func (p *promQuerier) queryPoints(ctx context.Context, query string) ([]storage. return result, nil } -func (p *promQuerier) queryExpHistrograms(ctx context.Context, query string) ([]storage.Series, error) { +func (p *promQuerier) queryHistograms(ctx context.Context, query string) ([]storage.Series, error) { + type seriesWithLabels struct { + series *series[pointData] + labels map[string]string + } + type histogramSample struct { + timestamp int64 + rawAttributes, rawResource string + attributes, resource map[string]string + flags pmetric.DataPointFlags + } + + var ( + set = map[seriesKey]seriesWithLabels{} + addSample = func( + name string, + val float64, + sample histogramSample, + bucketKey [2]string, + ) { + key := seriesKey{ + name: name, + attributes: sample.rawAttributes, + resource: sample.rawResource, + bucketKey: bucketKey, + } + s, ok := set[key] + if !ok { + s = seriesWithLabels{ + series: &series[pointData]{}, + labels: map[string]string{}, + } + set[key] = s + } + + if sample.flags.NoRecordedValue() { + val = math.Float64frombits(value.StaleNaN) + } + s.series.data.values = append(s.series.data.values, val) + s.series.ts = append(s.series.ts, sample.timestamp) + + s.labels["__name__"] = name + maps.Copy(s.labels, sample.attributes) + maps.Copy(s.labels, sample.resource) + if key := bucketKey[0]; key != "" { + s.labels[key] = bucketKey[1] + } + } + c = newHistogramColumns() + ) + if err := p.ch.Do(ctx, ch.Query{ + Body: query, + Result: c.Result(), + OnResult: func(ctx context.Context, block proto.Block) error { + for i := 0; i < c.timestamp.Rows(); i++ { + name := c.name.Row(i) + timestamp := c.timestamp.Row(i) + count := c.count.Row(i) + sum := c.sum.Row(i) + _min := c.min.Row(i) + _max := c.max.Row(i) + bucketCounts := c.bucketCounts.Row(i) + explicitBounds := c.explicitBounds.Row(i) + flags := pmetric.DataPointFlags(c.flags.Row(i)) + rawAttributes := c.attributes.Row(i) + rawResource := c.resource.Row(i) + + var ( + resource = map[string]string{} + attributes = map[string]string{} + ) + if err := parseLabels(rawResource, resource); err != nil { + return errors.Wrap(err, "parse resource") + } + if err := parseLabels(rawAttributes, attributes); err != nil { + return errors.Wrap(err, "parse attributes") + } + sample := histogramSample{ + timestamp: timestamp.UnixMilli(), + rawAttributes: rawAttributes, + rawResource: rawResource, + attributes: attributes, + resource: resource, + flags: flags, + } + + if sum.Set { + addSample(name+"_sum", sum.Value, sample, [2]string{}) + } + if _min.Set { + addSample(name+"_min", _min.Value, sample, [2]string{}) + } + if _max.Set { + addSample(name+"_max", _max.Value, sample, [2]string{}) + } + addSample(name+"_count", float64(count), sample, [2]string{}) + + var cumCount uint64 + for i := 0; i < min(len(bucketCounts), len(explicitBounds)); i++ { + bound := explicitBounds[i] + cumCount += bucketCounts[i] + + // Generate series with "_bucket" suffix and "le" label. + addSample("_bucket", float64(cumCount), sample, [2]string{ + "le", + strconv.FormatFloat(bound, 'f', -1, 64), + }) + } + + { + // Generate series with "_bucket" suffix and "le" label. + addSample("_bucket", float64(count), sample, [2]string{ + "le", + "+Inf", + }) + } + } + return nil + }, + }); err != nil { + return nil, errors.Wrap(err, "do query") + } + + var ( + result = make([]storage.Series, 0, len(set)) + lb labels.ScratchBuilder + ) + for _, s := range set { + lb.Reset() + for key, value := range s.labels { + lb.Add(key, value) + } + lb.Sort() + s.series.labels = lb.Labels() + result = append(result, s.series) + } + + return result, nil +} + +func (p *promQuerier) queryExpHistograms(ctx context.Context, query string) ([]storage.Series, error) { type seriesWithLabels struct { series *series[expHistData] labels map[string]string