Skip to content

Commit

Permalink
feat(chdump): add Clickhouse dump reader
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Dec 9, 2024
1 parent 853277a commit 94e6b6b
Show file tree
Hide file tree
Showing 5 changed files with 516 additions and 0 deletions.
86 changes: 86 additions & 0 deletions cmd/otelbench/chdump/attrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package chdump

import (
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/otelstorage"
)

type Attributes struct {
index *proto.ColBytes
col *proto.ColLowCardinalityRaw
hashes map[otelstorage.Hash]int

// values are filled up only when decoding.
values []otelstorage.Attrs
}

// NewAttributes creates a new [Attributes].
func NewAttributes() *Attributes {
ac := &Attributes{
index: new(proto.ColBytes),
hashes: map[otelstorage.Hash]int{},
}
ac.col = &proto.ColLowCardinalityRaw{
Index: ac.index,
Key: proto.KeyUInt64,
}
return ac
}

func (a Attributes) Type() proto.ColumnType {
return proto.ColumnTypeLowCardinality.Sub(proto.ColumnTypeString)
}

func (a Attributes) Rows() int {
return a.col.Rows()
}

func (a *Attributes) DecodeColumn(r *proto.Reader, rows int) error {
if err := a.col.DecodeColumn(r, rows); err != nil {
return errors.Wrap(err, "col")
}
for i := 0; i < a.index.Rows(); i++ {
v := a.index.Row(i)
m, err := decodeAttributes(v)
if err != nil {
return errors.Wrapf(err, "index value %d", i)
}
a.hashes[m.Hash()] = i
a.values = append(a.values, m)
}
return nil
}

func (a *Attributes) Reset() {
a.col.Reset()
a.index.Reset()
a.values = a.values[:0]
maps.Clear(a.hashes)
a.col.Key = proto.KeyUInt64
}

func (a *Attributes) DecodeState(r *proto.Reader) error {
return a.col.DecodeState(r)
}

func (a *Attributes) rowIdx(i int) int {
switch a.col.Key {
case proto.KeyUInt8:
return int(a.col.Keys8[i])
case proto.KeyUInt16:
return int(a.col.Keys16[i])
case proto.KeyUInt32:
return int(a.col.Keys32[i])
case proto.KeyUInt64:
return int(a.col.Keys64[i])
default:
panic("invalid key type")
}
}

func (a *Attributes) Row(i int) otelstorage.Attrs {
return a.values[a.rowIdx(i)]
}
79 changes: 79 additions & 0 deletions cmd/otelbench/chdump/attrs_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package chdump

import (
"github.com/go-faster/jx"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/otelstorage"
)

func decodeAttributes(s []byte) (otelstorage.Attrs, error) {
result := pcommon.NewMap()
if len(s) == 0 {
return otelstorage.Attrs(result), nil
}
err := decodeMap(jx.DecodeBytes(s), result)
return otelstorage.Attrs(result), err
}

func decodeValue(d *jx.Decoder, val pcommon.Value) error {
switch d.Next() {
case jx.String:
v, err := d.Str()
if err != nil {
return err
}
val.SetStr(v)
return nil
case jx.Number:
n, err := d.Num()
if err != nil {
return err
}
if n.IsInt() {
v, err := n.Int64()
if err != nil {
return err
}
val.SetInt(v)
} else {
v, err := n.Float64()
if err != nil {
return err
}
val.SetDouble(v)
}
return nil
case jx.Null:
if err := d.Null(); err != nil {
return err
}
// Do nothing, keep value empty.
return nil
case jx.Bool:
v, err := d.Bool()
if err != nil {
return err
}
val.SetBool(v)
return nil
case jx.Array:
s := val.SetEmptySlice()
return d.Arr(func(d *jx.Decoder) error {
rval := s.AppendEmpty()
return decodeValue(d, rval)
})
case jx.Object:
m := val.SetEmptyMap()
return decodeMap(d, m)
default:
return d.Skip()
}
}

func decodeMap(d *jx.Decoder, m pcommon.Map) error {
return d.Obj(func(d *jx.Decoder, key string) error {
rval := m.PutEmpty(key)
return decodeValue(d, rval)
})
}
104 changes: 104 additions & 0 deletions cmd/otelbench/chdump/chdump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Package chdump provides utilities to read ClickHouse data dumps created by oteldb.
package chdump

import (
"archive/tar"
"io"
"os"
"path"
"strings"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/klauspost/compress/zstd"
)

// ReadOptions defines options for [Read].
type ReadOptions struct {
OnTraces func(*Spans) error
// OnMetrics func(*Metrics) error
OnLogs func(*Logs) error
}

// Read reads dump from given tar file.
func Read(tarFile string, opts ReadOptions) error {
ar, err := os.Open(tarFile)

Check failure on line 25 in cmd/otelbench/chdump/chdump.go

View workflow job for this annotation

GitHub Actions / lint / run

G304: Potential file inclusion via variable (gosec)
if err != nil {
return errors.Wrap(err, "open archive")
}
defer func() {
_ = ar.Close()
}()

tr := tar.NewReader(ar)
for {
h, err := tr.Next()
switch {
case err == io.EOF:
return nil
case err != nil:
return err
}

name := path.Clean(h.Name)
table := strings.TrimSuffix(name, ".bin.zstd")
switch table {
case "logs":
if err := readBlock(tr, NewLogs(), opts.OnLogs); err != nil {
return errors.Wrapf(err, "read %q", h.Name)
}
case "traces_spans":
if err := readBlock(tr, NewSpans(), opts.OnTraces); err != nil {
return errors.Wrapf(err, "read %q", h.Name)
}
case "metrics_points":
case "metrics_exp_histograms":
}
}
}

const protoVersion = 51902

func readBlock[
TableColumns interface {
Reset()
Result() proto.Results
},
](
r io.Reader,
cols TableColumns,
cb func(TableColumns) error,
) error {
if cb == nil {
return nil
}

zr, err := zstd.NewReader(r)
if err != nil {
return errors.Wrap(err, "open zstd dump")
}
defer zr.Close()

var (
chr = proto.NewReader(zr)
block = &proto.Block{}

blockIdx int
results = cols.Result()
)
for {
cols.Reset()

if err := block.DecodeBlock(chr, protoVersion, results); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrapf(err, "decode block %d", blockIdx)
}

if err := cb(cols); err != nil {
return errors.Wrap(err, "callback")
}
blockIdx++
}
}
108 changes: 108 additions & 0 deletions cmd/otelbench/chdump/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package chdump

import (
"iter"
"slices"

"github.com/ClickHouse/ch-go/proto"

"github.com/go-faster/oteldb/internal/otelstorage"
)

// Logs is a parsed dump of logs.
type Logs struct {
ServiceInstanceID *proto.ColLowCardinality[string]
ServiceName *proto.ColLowCardinality[string]
ServiceNamespace *proto.ColLowCardinality[string]

Timestamp *proto.ColDateTime64

SeverityNumber *proto.ColUInt8
SeverityText *proto.ColLowCardinality[string]

TraceFlags *proto.ColUInt8
TraceID *proto.ColRawOf[otelstorage.TraceID]
SpanID *proto.ColRawOf[otelstorage.SpanID]

Body *proto.ColStr

Attributes *Attributes
Resource *Attributes

Scope *Attributes
ScopeName *proto.ColLowCardinality[string]
ScopeVersion *proto.ColLowCardinality[string]
}

// NewLogs creates a new [Logs].
func NewLogs() *Logs {
c := &Logs{
ServiceInstanceID: new(proto.ColStr).LowCardinality(),
ServiceName: new(proto.ColStr).LowCardinality(),
ServiceNamespace: new(proto.ColStr).LowCardinality(),

Timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),

SeverityText: new(proto.ColStr).LowCardinality(),
SeverityNumber: new(proto.ColUInt8),

TraceFlags: new(proto.ColUInt8),
TraceID: new(proto.ColRawOf[otelstorage.TraceID]),
SpanID: new(proto.ColRawOf[otelstorage.SpanID]),

Body: new(proto.ColStr),

Attributes: NewAttributes(),
Resource: NewAttributes(),

Scope: NewAttributes(),
ScopeName: new(proto.ColStr).LowCardinality(),
ScopeVersion: new(proto.ColStr).LowCardinality(),
}

return c
}

// Result returns a [Logs] result.
func (c *Logs) Result() proto.Results {
return slices.Collect(c.columns())
}

// Reset resets [Logs] data.
func (c *Logs) Reset() {
for col := range c.columns() {
col.Data.Reset()
}
}

func (c *Logs) columns() iter.Seq[proto.ResultColumn] {
return func(yield func(proto.ResultColumn) bool) {
for _, col := range []proto.ResultColumn{
{Name: "service_instance_id", Data: c.ServiceInstanceID},
{Name: "service_name", Data: c.ServiceName},
{Name: "service_namespace", Data: c.ServiceNamespace},

{Name: "timestamp", Data: c.Timestamp},

{Name: "severity_text", Data: c.SeverityText},
{Name: "severity_number", Data: c.SeverityNumber},

{Name: "trace_id", Data: c.TraceID},
{Name: "span_id", Data: c.SpanID},
{Name: "trace_flags", Data: c.TraceFlags},

{Name: "body", Data: c.Body},

{Name: "attribute", Data: c.Attributes},
{Name: "resource", Data: c.Resource},

{Name: "scope", Data: c.Scope},
{Name: "scope_name", Data: c.ScopeName},
{Name: "scope_version", Data: c.ScopeVersion},
} {
if !yield(col) {
return
}
}
}
}
Loading

0 comments on commit 94e6b6b

Please sign in to comment.