From 35afe5c925e0da834e19b5e2b0a5cbc6e7a768de Mon Sep 17 00:00:00 2001 From: tdakkota Date: Mon, 9 Dec 2024 17:28:07 +0300 Subject: [PATCH] feat(chdump): add Clickhouse dump reader --- cmd/otelbench/chdump/attrs.go | 86 +++++++++++++++ cmd/otelbench/chdump/attrs_json.go | 79 ++++++++++++++ cmd/otelbench/chdump/chdump.go | 167 ++++++++++++++++++++++++++++ cmd/otelbench/chdump/logs.go | 108 +++++++++++++++++++ cmd/otelbench/chdump/metrics.go | 168 +++++++++++++++++++++++++++++ cmd/otelbench/chdump/traces.go | 139 ++++++++++++++++++++++++ 6 files changed, 747 insertions(+) create mode 100644 cmd/otelbench/chdump/attrs.go create mode 100644 cmd/otelbench/chdump/attrs_json.go create mode 100644 cmd/otelbench/chdump/chdump.go create mode 100644 cmd/otelbench/chdump/logs.go create mode 100644 cmd/otelbench/chdump/metrics.go create mode 100644 cmd/otelbench/chdump/traces.go diff --git a/cmd/otelbench/chdump/attrs.go b/cmd/otelbench/chdump/attrs.go new file mode 100644 index 00000000..1be43a43 --- /dev/null +++ b/cmd/otelbench/chdump/attrs.go @@ -0,0 +1,86 @@ +package chdump + +import ( + "github.com/ClickHouse/ch-go/proto" + "github.com/go-faster/errors" + "golang.org/x/exp/maps" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +type Attributes struct { + index *proto.ColBytes + col *proto.ColLowCardinalityRaw + hashes map[otelstorage.Hash]int + + // values are filled up only when decoding. + values []otelstorage.Attrs +} + +// NewAttributes creates a new [Attributes]. +func NewAttributes() *Attributes { + ac := &Attributes{ + index: new(proto.ColBytes), + hashes: map[otelstorage.Hash]int{}, + } + ac.col = &proto.ColLowCardinalityRaw{ + Index: ac.index, + Key: proto.KeyUInt64, + } + return ac +} + +func (a Attributes) Type() proto.ColumnType { + return proto.ColumnTypeLowCardinality.Sub(proto.ColumnTypeString) +} + +func (a Attributes) Rows() int { + return a.col.Rows() +} + +func (a *Attributes) DecodeColumn(r *proto.Reader, rows int) error { + if err := a.col.DecodeColumn(r, rows); err != nil { + return errors.Wrap(err, "col") + } + for i := 0; i < a.index.Rows(); i++ { + v := a.index.Row(i) + m, err := decodeAttributes(v) + if err != nil { + return errors.Wrapf(err, "index value %d", i) + } + a.hashes[m.Hash()] = i + a.values = append(a.values, m) + } + return nil +} + +func (a *Attributes) Reset() { + a.col.Reset() + a.index.Reset() + a.values = a.values[:0] + maps.Clear(a.hashes) + a.col.Key = proto.KeyUInt64 +} + +func (a *Attributes) DecodeState(r *proto.Reader) error { + return a.col.DecodeState(r) +} + +func (a *Attributes) rowIdx(i int) int { + switch a.col.Key { + case proto.KeyUInt8: + return int(a.col.Keys8[i]) + case proto.KeyUInt16: + return int(a.col.Keys16[i]) + case proto.KeyUInt32: + return int(a.col.Keys32[i]) + case proto.KeyUInt64: + return int(a.col.Keys64[i]) + default: + panic("invalid key type") + } +} + +func (a *Attributes) Row(i int) otelstorage.Attrs { + return a.values[a.rowIdx(i)] +} diff --git a/cmd/otelbench/chdump/attrs_json.go b/cmd/otelbench/chdump/attrs_json.go new file mode 100644 index 00000000..fa122153 --- /dev/null +++ b/cmd/otelbench/chdump/attrs_json.go @@ -0,0 +1,79 @@ +package chdump + +import ( + "github.com/go-faster/jx" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +func decodeAttributes(s []byte) (otelstorage.Attrs, error) { + result := pcommon.NewMap() + if len(s) == 0 { + return otelstorage.Attrs(result), nil + } + err := decodeMap(jx.DecodeBytes(s), result) + return otelstorage.Attrs(result), err +} + +func decodeValue(d *jx.Decoder, val pcommon.Value) error { + switch d.Next() { + case jx.String: + v, err := d.Str() + if err != nil { + return err + } + val.SetStr(v) + return nil + case jx.Number: + n, err := d.Num() + if err != nil { + return err + } + if n.IsInt() { + v, err := n.Int64() + if err != nil { + return err + } + val.SetInt(v) + } else { + v, err := n.Float64() + if err != nil { + return err + } + val.SetDouble(v) + } + return nil + case jx.Null: + if err := d.Null(); err != nil { + return err + } + // Do nothing, keep value empty. + return nil + case jx.Bool: + v, err := d.Bool() + if err != nil { + return err + } + val.SetBool(v) + return nil + case jx.Array: + s := val.SetEmptySlice() + return d.Arr(func(d *jx.Decoder) error { + rval := s.AppendEmpty() + return decodeValue(d, rval) + }) + case jx.Object: + m := val.SetEmptyMap() + return decodeMap(d, m) + default: + return d.Skip() + } +} + +func decodeMap(d *jx.Decoder, m pcommon.Map) error { + return d.Obj(func(d *jx.Decoder, key string) error { + rval := m.PutEmpty(key) + return decodeValue(d, rval) + }) +} diff --git a/cmd/otelbench/chdump/chdump.go b/cmd/otelbench/chdump/chdump.go new file mode 100644 index 00000000..d87bc0ca --- /dev/null +++ b/cmd/otelbench/chdump/chdump.go @@ -0,0 +1,167 @@ +// Package chdump provides utilities to read ClickHouse data dumps created by oteldb. +package chdump + +import ( + "archive/tar" + "io" + "io/fs" + "os" + "path" + "path/filepath" + "strings" + + "github.com/ClickHouse/ch-go/proto" + "github.com/go-faster/errors" + "github.com/klauspost/compress/zstd" +) + +// ReadOptions defines options for [Read]. +type ReadOptions struct { + OnTraces func(*Spans) error + OnPoints func(*Points) error + OnExpHistograms func(*ExpHistograms) error + OnLogs func(*Logs) error +} + +// ReadTar reads dump from given tar file. +func ReadTar(r io.Reader, opts ReadOptions) error { + tr := tar.NewReader(r) + iter := &tarIter{tr: tr} + return readDump(iter, opts) +} + +// ReadDir reads dump from unpacked directory. +func ReadDir(dir string, opts ReadOptions) error { + entries, err := os.ReadDir(dir) + if err != nil { + return errors.Wrap(err, "read dir") + } + iter := &dirIter{dir: dir, entries: entries} + return readDump(iter, opts) +} + +// readDump reads dump from given tar file. +func readDump(iter dumpIter, opts ReadOptions) error { + for { + name, file, err := iter.Next() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return errors.Wrap(err, "next") + } + if err := func() error { + defer func() { + _ = file.Close() + }() + + table := strings.TrimSuffix(name, ".bin.zstd") + switch table { + case "traces_spans": + return readBlock(file, NewSpans(), opts.OnTraces) + case "metrics_points": + return readBlock(file, NewPoints(), opts.OnPoints) + case "metrics_exp_histograms": + return readBlock(file, NewExpHistograms(), opts.OnExpHistograms) + case "logs": + return readBlock(file, NewLogs(), opts.OnLogs) + } + + return nil + }(); err != nil { + return errors.Wrapf(err, "read %q", name) + } + } +} + +type dumpIter interface { + Next() (string, io.ReadCloser, error) +} + +type tarIter struct { + tr *tar.Reader +} + +func (t *tarIter) Next() (string, io.ReadCloser, error) { + h, err := t.tr.Next() + if err != nil { + return "", nil, err + } + name := path.Clean(h.Name) + return name, io.NopCloser(t.tr), nil +} + +type dirIter struct { + dir string + entries []fs.DirEntry + idx int +} + +func (d *dirIter) Next() (string, io.ReadCloser, error) { + for { + if d.idx >= len(d.entries) { + return "", nil, io.EOF + } + + entry := d.entries[d.idx] + if entry.IsDir() { + d.idx++ + continue + } + + f, err := os.Open(filepath.Join(d.dir, entry.Name())) + if err != nil { + return "", nil, err + } + name := filepath.Clean(entry.Name()) + + d.idx++ + return name, f, nil + } +} + +const protoVersion = 51902 + +func readBlock[ + TableColumns interface { + Reset() + Result() proto.Results + }, +]( + r io.Reader, + cols TableColumns, + cb func(TableColumns) error, +) error { + if cb == nil { + return nil + } + + zr, err := zstd.NewReader(r) + if err != nil { + return errors.Wrap(err, "open zstd dump") + } + defer zr.Close() + + var ( + chr = proto.NewReader(zr) + block = &proto.Block{} + + blockIdx int + results = cols.Result() + ) + for { + cols.Reset() + + if err := block.DecodeBlock(chr, protoVersion, results); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return errors.Wrapf(err, "decode block %d", blockIdx) + } + + if err := cb(cols); err != nil { + return errors.Wrap(err, "callback") + } + blockIdx++ + } +} diff --git a/cmd/otelbench/chdump/logs.go b/cmd/otelbench/chdump/logs.go new file mode 100644 index 00000000..18295265 --- /dev/null +++ b/cmd/otelbench/chdump/logs.go @@ -0,0 +1,108 @@ +package chdump + +import ( + "iter" + "slices" + + "github.com/ClickHouse/ch-go/proto" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// Logs is a parsed dump of logs. +type Logs struct { + ServiceInstanceID *proto.ColLowCardinality[string] + ServiceName *proto.ColLowCardinality[string] + ServiceNamespace *proto.ColLowCardinality[string] + + Timestamp *proto.ColDateTime64 + + SeverityNumber *proto.ColUInt8 + SeverityText *proto.ColLowCardinality[string] + + TraceFlags *proto.ColUInt8 + TraceID *proto.ColRawOf[otelstorage.TraceID] + SpanID *proto.ColRawOf[otelstorage.SpanID] + + Body *proto.ColStr + + Attributes *Attributes + Resource *Attributes + + Scope *Attributes + ScopeName *proto.ColLowCardinality[string] + ScopeVersion *proto.ColLowCardinality[string] +} + +// NewLogs creates a new [Logs]. +func NewLogs() *Logs { + c := &Logs{ + ServiceInstanceID: new(proto.ColStr).LowCardinality(), + ServiceName: new(proto.ColStr).LowCardinality(), + ServiceNamespace: new(proto.ColStr).LowCardinality(), + + Timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano), + + SeverityText: new(proto.ColStr).LowCardinality(), + SeverityNumber: new(proto.ColUInt8), + + TraceFlags: new(proto.ColUInt8), + TraceID: new(proto.ColRawOf[otelstorage.TraceID]), + SpanID: new(proto.ColRawOf[otelstorage.SpanID]), + + Body: new(proto.ColStr), + + Attributes: NewAttributes(), + Resource: NewAttributes(), + + Scope: NewAttributes(), + ScopeName: new(proto.ColStr).LowCardinality(), + ScopeVersion: new(proto.ColStr).LowCardinality(), + } + + return c +} + +// Result returns a [Logs] result. +func (c *Logs) Result() proto.Results { + return slices.Collect(c.columns()) +} + +// Reset resets [Logs] data. +func (c *Logs) Reset() { + for col := range c.columns() { + col.Data.Reset() + } +} + +func (c *Logs) columns() iter.Seq[proto.ResultColumn] { + return func(yield func(proto.ResultColumn) bool) { + for _, col := range []proto.ResultColumn{ + {Name: "service_instance_id", Data: c.ServiceInstanceID}, + {Name: "service_name", Data: c.ServiceName}, + {Name: "service_namespace", Data: c.ServiceNamespace}, + + {Name: "timestamp", Data: c.Timestamp}, + + {Name: "severity_text", Data: c.SeverityText}, + {Name: "severity_number", Data: c.SeverityNumber}, + + {Name: "trace_id", Data: c.TraceID}, + {Name: "span_id", Data: c.SpanID}, + {Name: "trace_flags", Data: c.TraceFlags}, + + {Name: "body", Data: c.Body}, + + {Name: "attribute", Data: c.Attributes}, + {Name: "resource", Data: c.Resource}, + + {Name: "scope", Data: c.Scope}, + {Name: "scope_name", Data: c.ScopeName}, + {Name: "scope_version", Data: c.ScopeVersion}, + } { + if !yield(col) { + return + } + } + } +} diff --git a/cmd/otelbench/chdump/metrics.go b/cmd/otelbench/chdump/metrics.go new file mode 100644 index 00000000..3e0c610a --- /dev/null +++ b/cmd/otelbench/chdump/metrics.go @@ -0,0 +1,168 @@ +package chdump + +import ( + "iter" + "slices" + + "github.com/ClickHouse/ch-go/proto" +) + +// Points is a parsed dump of metric points. +type Points struct { + Name *proto.ColLowCardinality[string] + NameNormalized *proto.ColLowCardinality[string] + Timestamp *proto.ColDateTime64 + + Mapping *proto.ColEnum8 + Value *proto.ColFloat64 + + Flags *proto.ColUInt8 + Attributes *Attributes + Resource *Attributes + Scope *Attributes +} + +// NewPoints creates a new [Points]. +func NewPoints() *Points { + c := &Points{ + Name: new(proto.ColStr).LowCardinality(), + NameNormalized: new(proto.ColStr).LowCardinality(), + Timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano), + + Mapping: new(proto.ColEnum8), + Value: new(proto.ColFloat64), + + Flags: new(proto.ColUInt8), + Attributes: NewAttributes(), + Resource: NewAttributes(), + Scope: NewAttributes(), + } + + return c +} + +// Result returns a [Points] result. +func (c *Points) Result() proto.Results { + return slices.Collect(c.columns()) +} + +// Reset resets [Points] data. +func (c *Points) Reset() { + for col := range c.columns() { + col.Data.Reset() + } +} + +func (c *Points) columns() iter.Seq[proto.ResultColumn] { + return func(yield func(proto.ResultColumn) bool) { + for _, col := range []proto.ResultColumn{ + {Name: "name", Data: c.Name}, + {Name: "name_normalized", Data: c.NameNormalized}, + {Name: "timestamp", Data: c.Timestamp}, + + {Name: "mapping", Data: c.Mapping}, + {Name: "value", Data: c.Value}, + + {Name: "flags", Data: c.Flags}, + {Name: "attribute", Data: c.Attributes}, + {Name: "resource", Data: c.Resource}, + {Name: "scope", Data: c.Scope}, + } { + if !yield(col) { + return + } + } + } +} + +// ExpHistograms is a parsed dump of metric ExpHistograms. +type ExpHistograms struct { + Name *proto.ColLowCardinality[string] + NameNormalized *proto.ColLowCardinality[string] + Timestamp *proto.ColDateTime64 + + Count *proto.ColUInt64 + Sum *proto.ColNullable[float64] + Min *proto.ColNullable[float64] + Max *proto.ColNullable[float64] + Scale *proto.ColInt32 + ZeroCount *proto.ColUInt64 + PositiveOffset *proto.ColInt32 + PositiveBucketCounts *proto.ColArr[uint64] + NegativeOffset *proto.ColInt32 + NegativeBucketCounts *proto.ColArr[uint64] + + Flags *proto.ColUInt8 + Attributes *Attributes + Scope *Attributes + Resource *Attributes +} + +// NewExpHistograms creates a new [ExpHistograms]. +func NewExpHistograms() *ExpHistograms { + c := &ExpHistograms{ + Name: new(proto.ColStr).LowCardinality(), + NameNormalized: new(proto.ColStr).LowCardinality(), + Timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano), + + Count: new(proto.ColUInt64), + Sum: new(proto.ColFloat64).Nullable(), + Min: new(proto.ColFloat64).Nullable(), + Max: new(proto.ColFloat64).Nullable(), + Scale: new(proto.ColInt32), + ZeroCount: new(proto.ColUInt64), + PositiveOffset: new(proto.ColInt32), + PositiveBucketCounts: new(proto.ColUInt64).Array(), + NegativeOffset: new(proto.ColInt32), + NegativeBucketCounts: new(proto.ColUInt64).Array(), + + Flags: new(proto.ColUInt8), + Scope: NewAttributes(), + Attributes: NewAttributes(), + Resource: NewAttributes(), + } + + return c +} + +// Result returns a [ExpHistograms] result. +func (c *ExpHistograms) Result() proto.Results { + return slices.Collect(c.columns()) +} + +// Reset resets [ExpHistograms] data. +func (c *ExpHistograms) Reset() { + for col := range c.columns() { + col.Data.Reset() + } +} + +func (c *ExpHistograms) columns() iter.Seq[proto.ResultColumn] { + return func(yield func(proto.ResultColumn) bool) { + for _, col := range []proto.ResultColumn{ + {Name: "name", Data: c.Name}, + {Name: "name_normalized", Data: c.NameNormalized}, + {Name: "timestamp", Data: c.Timestamp}, + + {Name: "exp_histogram_count", Data: c.Count}, + {Name: "exp_histogram_sum", Data: c.Sum}, + {Name: "exp_histogram_min", Data: c.Min}, + {Name: "exp_histogram_max", Data: c.Max}, + {Name: "exp_histogram_scale", Data: c.Scale}, + {Name: "exp_histogram_zerocount", Data: c.ZeroCount}, + {Name: "exp_histogram_positive_offset", Data: c.PositiveOffset}, + {Name: "exp_histogram_positive_bucket_counts", Data: c.PositiveBucketCounts}, + {Name: "exp_histogram_negative_offset", Data: c.NegativeOffset}, + {Name: "exp_histogram_negative_bucket_counts", Data: c.NegativeBucketCounts}, + + {Name: "flags", Data: c.Flags}, + {Name: "attribute", Data: c.Attributes}, + {Name: "scope", Data: c.Scope}, + {Name: "resource", Data: c.Resource}, + } { + if !yield(col) { + return + } + } + } +} diff --git a/cmd/otelbench/chdump/traces.go b/cmd/otelbench/chdump/traces.go new file mode 100644 index 00000000..5910a6ab --- /dev/null +++ b/cmd/otelbench/chdump/traces.go @@ -0,0 +1,139 @@ +package chdump + +import ( + "iter" + "slices" + "time" + + "github.com/ClickHouse/ch-go/proto" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// Spans is a parsed dump of traces_spans. +type Spans struct { + ServiceInstanceID *proto.ColLowCardinality[string] + ServiceName *proto.ColLowCardinality[string] + ServiceNamespace *proto.ColLowCardinality[string] + + TraceID *proto.ColRawOf[otelstorage.TraceID] + SpanID *proto.ColRawOf[otelstorage.SpanID] + TraceState *proto.ColStr + ParentSpanID *proto.ColRawOf[otelstorage.SpanID] + Name *proto.ColLowCardinality[string] + Kind *proto.ColEnum8 + Start *proto.ColDateTime64 + End *proto.ColDateTime64 + StatusCode *proto.ColUInt8 + StatusMessage *proto.ColLowCardinality[string] + BatchID *proto.ColUUID + + Attributes *Attributes + Resource *Attributes + + Scope *Attributes + ScopeName *proto.ColLowCardinality[string] + ScopeVersion *proto.ColLowCardinality[string] + + EventsNames *proto.ColArr[string] + EventsTimestamps *proto.ColArr[time.Time] + EventsAttributes *proto.ColArr[[]byte] + + LinksTraceIDs *proto.ColArr[otelstorage.TraceID] + LinksSpanIDs *proto.ColArr[otelstorage.SpanID] + LinksTracestates *proto.ColArr[string] + LinksAttributes *proto.ColArr[[]byte] +} + +// NewSpans creates a new [Spans]. +func NewSpans() *Spans { + c := &Spans{ + ServiceInstanceID: new(proto.ColStr).LowCardinality(), + ServiceName: new(proto.ColStr).LowCardinality(), + ServiceNamespace: new(proto.ColStr).LowCardinality(), + + TraceID: new(proto.ColRawOf[otelstorage.TraceID]), + SpanID: new(proto.ColRawOf[otelstorage.SpanID]), + TraceState: new(proto.ColStr), + ParentSpanID: new(proto.ColRawOf[otelstorage.SpanID]), + Name: new(proto.ColStr).LowCardinality(), + Kind: new(proto.ColEnum8), + Start: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano), + End: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano), + StatusCode: new(proto.ColUInt8), + StatusMessage: new(proto.ColStr).LowCardinality(), + BatchID: new(proto.ColUUID), + + Attributes: NewAttributes(), + Resource: NewAttributes(), + + Scope: NewAttributes(), + ScopeName: new(proto.ColStr).LowCardinality(), + ScopeVersion: new(proto.ColStr).LowCardinality(), + + EventsNames: new(proto.ColStr).Array(), + EventsTimestamps: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano).Array(), + EventsAttributes: new(proto.ColBytes).Array(), + + LinksTraceIDs: proto.NewArray(&proto.ColRawOf[otelstorage.TraceID]{}), + LinksSpanIDs: proto.NewArray(&proto.ColRawOf[otelstorage.SpanID]{}), + LinksTracestates: new(proto.ColStr).Array(), + LinksAttributes: new(proto.ColBytes).Array(), + } + + return c +} + +// Result returns a [Spans] result. +func (c *Spans) Result() proto.Results { + return slices.Collect(c.columns()) +} + +// Reset resets [Spans] data. +func (c *Spans) Reset() { + for col := range c.columns() { + col.Data.Reset() + } +} + +func (c *Spans) columns() iter.Seq[proto.ResultColumn] { + return func(yield func(proto.ResultColumn) bool) { + for _, col := range []proto.ResultColumn{ + {Name: "service_instance_id", Data: c.ServiceInstanceID}, + {Name: "service_name", Data: c.ServiceName}, + {Name: "service_namespace", Data: c.ServiceNamespace}, + + {Name: "trace_id", Data: c.TraceID}, + {Name: "span_id", Data: c.SpanID}, + {Name: "trace_state", Data: c.TraceState}, + {Name: "parent_span_id", Data: c.ParentSpanID}, + {Name: "name", Data: c.Name}, + {Name: "kind", Data: c.Kind}, + {Name: "start", Data: c.Start}, + {Name: "end", Data: c.End}, + {Name: "status_code", Data: c.StatusCode}, + {Name: "status_message", Data: c.StatusMessage}, + {Name: "batch_id", Data: c.BatchID}, + + {Name: "attribute", Data: c.Attributes}, + {Name: "resource", Data: c.Resource}, + + {Name: "scope", Data: c.Scope}, + {Name: "scope_name", Data: c.ScopeName}, + {Name: "scope_version", Data: c.ScopeVersion}, + + {Name: "events_timestamps", Data: c.EventsTimestamps}, + {Name: "events_names", Data: c.EventsNames}, + {Name: "events_attributes", Data: c.EventsAttributes}, + + {Name: "links_trace_ids", Data: c.LinksTraceIDs}, + {Name: "links_span_ids", Data: c.LinksSpanIDs}, + {Name: "links_tracestates", Data: c.LinksTracestates}, + {Name: "links_attributes", Data: c.LinksAttributes}, + } { + if !yield(col) { + return + } + } + } +}