Skip to content

Commit

Permalink
feat(chstorage): insert labels too
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Nov 20, 2023
1 parent d106e10 commit 8600fdf
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 9 deletions.
26 changes: 26 additions & 0 deletions internal/chstorage/columns_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,29 @@ func (c *metricColumns) Result() proto.Results {
{Name: "resource", Data: &c.resource},
}
}

type labelsColumns struct {
name *proto.ColLowCardinality[string]
value proto.ColStr
}

func newLabelsColumns() *labelsColumns {
return &labelsColumns{
name: new(proto.ColStr).LowCardinality(),
}
}

func (c *labelsColumns) Input() proto.Input {
input := proto.Input{
{Name: "name", Data: c.name},
{Name: "value", Data: c.value},
}
return input
}

func (c *labelsColumns) Result() proto.Results {
return proto.Results{
{Name: "name", Data: c.name},
{Name: "value", Data: &c.value},
}
}
58 changes: 49 additions & 9 deletions internal/chstorage/inserter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,65 @@ import (
"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"golang.org/x/sync/errgroup"
)

// ConsumeMetrics inserts given metrics.
func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
c := newMetricColumns()
if err := i.mapMetrics(c, metrics); err != nil {
var (
points = newMetricColumns()

labels = newLabelsColumns()
collectLabels = func(m pcommon.Map) {
m.Range(func(k string, v pcommon.Value) bool {
labels.name.Append(k)
// FIXME(tdakkota): annoying allocations
labels.value.Append(v.AsString())
return true
})
}
)

if err := i.mapMetrics(points, metrics, collectLabels); err != nil {
return errors.Wrap(err, "map metrics")
}

input := c.Input()
if err := i.ch.Do(ctx, ch.Query{
Body: input.Into(i.tables.Points),
Input: input,
}); err != nil {
return errors.Wrap(err, "insert")
{
grp, grpCtx := errgroup.WithContext(ctx)

grp.Go(func() error {
ctx := grpCtx

input := points.Input()
if err := i.ch.Do(ctx, ch.Query{
Body: input.Into(i.tables.Points),
Input: input,
}); err != nil {
return errors.Wrap(err, "insert points")
}
return nil
})
grp.Go(func() error {
ctx := grpCtx

input := labels.Input()
if err := i.ch.Do(ctx, ch.Query{
Body: input.Into(i.tables.Labels),
Input: input,
}); err != nil {
return errors.Wrap(err, "insert labels")
}
return nil
})
if err := grp.Wait(); err != nil {
return err
}
}

return nil
}

func (i *Inserter) mapMetrics(c *metricColumns, metrics pmetric.Metrics) error {
func (i *Inserter) mapMetrics(c *metricColumns, metrics pmetric.Metrics, collectLabels func(attrs pcommon.Map)) error {
var (
addPoints = func(
name string,
Expand All @@ -50,6 +88,7 @@ func (i *Inserter) mapMetrics(c *metricColumns, metrics pmetric.Metrics) error {
return errors.Errorf("unexpected metric %q value type: %v", name, typ)
}

collectLabels(attrs)
c.name.Append(name)
c.ts.Append(ts)
c.value.Append(val)
Expand All @@ -64,6 +103,7 @@ func (i *Inserter) mapMetrics(c *metricColumns, metrics pmetric.Metrics) error {
for i := 0; i < resMetrics.Len(); i++ {
resMetric := resMetrics.At(i)
resAttrs := resMetric.Resource().Attributes()
collectLabels(resAttrs)

scopeMetrics := resMetric.ScopeMetrics()
for i := 0; i < scopeMetrics.Len(); i++ {
Expand Down

0 comments on commit 8600fdf

Please sign in to comment.