Skip to content

Commit

Permalink
feat(chstorage): query histograms too
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Nov 28, 2023
1 parent a7321ff commit 49d026d
Showing 1 changed file with 169 additions and 6 deletions.
175 changes: 169 additions & 6 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package chstorage
import (
"context"
"fmt"
"maps"
"math"
"strconv"
"strings"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -183,6 +188,7 @@ type seriesKey struct {
name string
attributes string
resource string
bucketKey [2]string
}

func (p *promQuerier) selectSeries(ctx context.Context, hints *storage.SelectHints, matchers ...*labels.Matcher) (_ storage.SeriesSet, rerr error) {
Expand Down Expand Up @@ -265,8 +271,9 @@ func (p *promQuerier) selectSeries(ctx context.Context, hints *storage.SelectHin
}

var (
points []storage.Series
histSeries []storage.Series
points []storage.Series
histSeries []storage.Series
expHistSeries []storage.Series
)
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
Expand All @@ -287,23 +294,39 @@ func (p *promQuerier) selectSeries(ctx context.Context, hints *storage.SelectHin
grp.Go(func() error {
ctx := grpCtx

query, err := buildQuery(p.tables.ExpHistograms)
query, err := buildQuery(p.tables.Histograms)
if err != nil {
return err
}

result, err := p.queryExpHistrograms(ctx, query)
result, err := p.queryHistograms(ctx, query)
if err != nil {
return errors.Wrap(err, "query histrograms")
return errors.Wrap(err, "query histograms")
}
histSeries = result
return nil
})
grp.Go(func() error {
ctx := grpCtx

query, err := buildQuery(p.tables.ExpHistograms)
if err != nil {
return err
}

result, err := p.queryExpHistograms(ctx, query)
if err != nil {
return errors.Wrap(err, "query exponential histograms")
}
expHistSeries = result
return nil
})
if err := grp.Wait(); err != nil {
return nil, err
}

points = append(points, histSeries...)
points = append(points, expHistSeries...)
return newSeriesSet(points), nil
}

Expand Down Expand Up @@ -376,7 +399,147 @@ func (p *promQuerier) queryPoints(ctx context.Context, query string) ([]storage.
return result, nil
}

func (p *promQuerier) queryExpHistrograms(ctx context.Context, query string) ([]storage.Series, error) {
func (p *promQuerier) queryHistograms(ctx context.Context, query string) ([]storage.Series, error) {
type seriesWithLabels struct {
series *series[pointData]
labels map[string]string
}
type histogramSample struct {
timestamp int64
rawAttributes, rawResource string
attributes, resource map[string]string
flags pmetric.DataPointFlags
}

var (
set = map[seriesKey]seriesWithLabels{}
addSample = func(
name string,
val float64,
sample histogramSample,
bucketKey [2]string,
) {
key := seriesKey{
name: name,
attributes: sample.rawAttributes,
resource: sample.rawResource,
bucketKey: bucketKey,
}
s, ok := set[key]
if !ok {
s = seriesWithLabels{
series: &series[pointData]{},
labels: map[string]string{},
}
set[key] = s
}

if sample.flags.NoRecordedValue() {
val = math.Float64frombits(value.StaleNaN)
}
s.series.data.values = append(s.series.data.values, val)
s.series.ts = append(s.series.ts, sample.timestamp)

s.labels["__name__"] = name
maps.Copy(s.labels, sample.attributes)
maps.Copy(s.labels, sample.resource)
if key := bucketKey[0]; key != "" {
s.labels[key] = bucketKey[1]
}
}
c = newHistogramColumns()
)
if err := p.ch.Do(ctx, ch.Query{
Body: query,
Result: c.Result(),
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < c.timestamp.Rows(); i++ {
name := c.name.Row(i)
timestamp := c.timestamp.Row(i)
count := c.count.Row(i)
sum := c.sum.Row(i)
_min := c.min.Row(i)
_max := c.max.Row(i)
bucketCounts := c.bucketCounts.Row(i)
explicitBounds := c.explicitBounds.Row(i)
flags := pmetric.DataPointFlags(c.flags.Row(i))
rawAttributes := c.attributes.Row(i)
rawResource := c.resource.Row(i)

var (
resource = map[string]string{}
attributes = map[string]string{}
)
if err := parseLabels(rawResource, resource); err != nil {
return errors.Wrap(err, "parse resource")
}
if err := parseLabels(rawAttributes, attributes); err != nil {
return errors.Wrap(err, "parse attributes")
}
sample := histogramSample{
timestamp: timestamp.UnixMilli(),
rawAttributes: rawAttributes,
rawResource: rawResource,
attributes: attributes,
resource: resource,
flags: flags,
}

if sum.Set {
addSample(name+"_sum", sum.Value, sample, [2]string{})
}
if _min.Set {
addSample(name+"_min", _min.Value, sample, [2]string{})
}
if _max.Set {
addSample(name+"_max", _max.Value, sample, [2]string{})
}
addSample(name+"_count", float64(count), sample, [2]string{})

var cumCount uint64
for i := 0; i < min(len(bucketCounts), len(explicitBounds)); i++ {
bound := explicitBounds[i]
cumCount += bucketCounts[i]

// Generate series with "_bucket" suffix and "le" label.
addSample("_bucket", float64(cumCount), sample, [2]string{
"le",
strconv.FormatFloat(bound, 'f', -1, 64),
})
}

{
// Generate series with "_bucket" suffix and "le" label.
addSample("_bucket", float64(count), sample, [2]string{
"le",
"+Inf",
})
}
}
return nil
},
}); err != nil {
return nil, errors.Wrap(err, "do query")
}

var (
result = make([]storage.Series, 0, len(set))
lb labels.ScratchBuilder
)
for _, s := range set {
lb.Reset()
for key, value := range s.labels {
lb.Add(key, value)
}
lb.Sort()
s.series.labels = lb.Labels()
result = append(result, s.series)
}

return result, nil
}

func (p *promQuerier) queryExpHistograms(ctx context.Context, query string) ([]storage.Series, error) {
type seriesWithLabels struct {
series *series[expHistData]
labels map[string]string
Expand Down

0 comments on commit 49d026d

Please sign in to comment.