Skip to content

Commit

Permalink
feat(chdump): add Clickhouse dump reader
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Dec 10, 2024
1 parent 853277a commit 35afe5c
Show file tree
Hide file tree
Showing 6 changed files with 747 additions and 0 deletions.
86 changes: 86 additions & 0 deletions cmd/otelbench/chdump/attrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package chdump

import (
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/otelstorage"
)

type Attributes struct {
index *proto.ColBytes
col *proto.ColLowCardinalityRaw
hashes map[otelstorage.Hash]int

// values are filled up only when decoding.
values []otelstorage.Attrs
}

// NewAttributes creates a new [Attributes].
func NewAttributes() *Attributes {
ac := &Attributes{
index: new(proto.ColBytes),
hashes: map[otelstorage.Hash]int{},
}
ac.col = &proto.ColLowCardinalityRaw{
Index: ac.index,
Key: proto.KeyUInt64,
}
return ac
}

func (a Attributes) Type() proto.ColumnType {
return proto.ColumnTypeLowCardinality.Sub(proto.ColumnTypeString)
}

func (a Attributes) Rows() int {
return a.col.Rows()
}

func (a *Attributes) DecodeColumn(r *proto.Reader, rows int) error {
if err := a.col.DecodeColumn(r, rows); err != nil {
return errors.Wrap(err, "col")
}
for i := 0; i < a.index.Rows(); i++ {
v := a.index.Row(i)
m, err := decodeAttributes(v)
if err != nil {
return errors.Wrapf(err, "index value %d", i)
}
a.hashes[m.Hash()] = i
a.values = append(a.values, m)
}
return nil
}

func (a *Attributes) Reset() {
a.col.Reset()
a.index.Reset()
a.values = a.values[:0]
maps.Clear(a.hashes)
a.col.Key = proto.KeyUInt64
}

func (a *Attributes) DecodeState(r *proto.Reader) error {
return a.col.DecodeState(r)
}

func (a *Attributes) rowIdx(i int) int {
switch a.col.Key {
case proto.KeyUInt8:
return int(a.col.Keys8[i])
case proto.KeyUInt16:
return int(a.col.Keys16[i])
case proto.KeyUInt32:
return int(a.col.Keys32[i])
case proto.KeyUInt64:
return int(a.col.Keys64[i])
default:
panic("invalid key type")
}
}

func (a *Attributes) Row(i int) otelstorage.Attrs {
return a.values[a.rowIdx(i)]
}
79 changes: 79 additions & 0 deletions cmd/otelbench/chdump/attrs_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package chdump

import (
"github.com/go-faster/jx"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/otelstorage"
)

func decodeAttributes(s []byte) (otelstorage.Attrs, error) {
result := pcommon.NewMap()
if len(s) == 0 {
return otelstorage.Attrs(result), nil
}
err := decodeMap(jx.DecodeBytes(s), result)
return otelstorage.Attrs(result), err
}

func decodeValue(d *jx.Decoder, val pcommon.Value) error {
switch d.Next() {
case jx.String:
v, err := d.Str()
if err != nil {
return err
}
val.SetStr(v)
return nil
case jx.Number:
n, err := d.Num()
if err != nil {
return err
}
if n.IsInt() {
v, err := n.Int64()
if err != nil {
return err
}
val.SetInt(v)
} else {
v, err := n.Float64()
if err != nil {
return err
}
val.SetDouble(v)
}
return nil
case jx.Null:
if err := d.Null(); err != nil {
return err
}
// Do nothing, keep value empty.
return nil
case jx.Bool:
v, err := d.Bool()
if err != nil {
return err
}
val.SetBool(v)
return nil
case jx.Array:
s := val.SetEmptySlice()
return d.Arr(func(d *jx.Decoder) error {
rval := s.AppendEmpty()
return decodeValue(d, rval)
})
case jx.Object:
m := val.SetEmptyMap()
return decodeMap(d, m)
default:
return d.Skip()
}
}

func decodeMap(d *jx.Decoder, m pcommon.Map) error {
return d.Obj(func(d *jx.Decoder, key string) error {
rval := m.PutEmpty(key)
return decodeValue(d, rval)
})
}
167 changes: 167 additions & 0 deletions cmd/otelbench/chdump/chdump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Package chdump provides utilities to read ClickHouse data dumps created by oteldb.
package chdump

import (
"archive/tar"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strings"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/klauspost/compress/zstd"
)

// ReadOptions defines options for [Read].
type ReadOptions struct {
OnTraces func(*Spans) error
OnPoints func(*Points) error
OnExpHistograms func(*ExpHistograms) error
OnLogs func(*Logs) error
}

// ReadTar reads dump from given tar file.
func ReadTar(r io.Reader, opts ReadOptions) error {
tr := tar.NewReader(r)
iter := &tarIter{tr: tr}
return readDump(iter, opts)
}

// ReadDir reads dump from unpacked directory.
func ReadDir(dir string, opts ReadOptions) error {
entries, err := os.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
iter := &dirIter{dir: dir, entries: entries}
return readDump(iter, opts)
}

// readDump reads dump from given tar file.
func readDump(iter dumpIter, opts ReadOptions) error {
for {
name, file, err := iter.Next()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrap(err, "next")
}
if err := func() error {
defer func() {
_ = file.Close()
}()

table := strings.TrimSuffix(name, ".bin.zstd")
switch table {
case "traces_spans":
return readBlock(file, NewSpans(), opts.OnTraces)
case "metrics_points":
return readBlock(file, NewPoints(), opts.OnPoints)
case "metrics_exp_histograms":
return readBlock(file, NewExpHistograms(), opts.OnExpHistograms)
case "logs":
return readBlock(file, NewLogs(), opts.OnLogs)
}

return nil
}(); err != nil {
return errors.Wrapf(err, "read %q", name)
}
}
}

type dumpIter interface {
Next() (string, io.ReadCloser, error)
}

type tarIter struct {
tr *tar.Reader
}

func (t *tarIter) Next() (string, io.ReadCloser, error) {
h, err := t.tr.Next()
if err != nil {
return "", nil, err
}
name := path.Clean(h.Name)
return name, io.NopCloser(t.tr), nil
}

type dirIter struct {
dir string
entries []fs.DirEntry
idx int
}

func (d *dirIter) Next() (string, io.ReadCloser, error) {
for {
if d.idx >= len(d.entries) {
return "", nil, io.EOF
}

entry := d.entries[d.idx]
if entry.IsDir() {
d.idx++
continue
}

f, err := os.Open(filepath.Join(d.dir, entry.Name()))
if err != nil {
return "", nil, err
}
name := filepath.Clean(entry.Name())

d.idx++
return name, f, nil
}
}

const protoVersion = 51902

func readBlock[
TableColumns interface {
Reset()
Result() proto.Results
},
](
r io.Reader,
cols TableColumns,
cb func(TableColumns) error,
) error {
if cb == nil {
return nil
}

zr, err := zstd.NewReader(r)
if err != nil {
return errors.Wrap(err, "open zstd dump")
}
defer zr.Close()

var (
chr = proto.NewReader(zr)
block = &proto.Block{}

blockIdx int
results = cols.Result()
)
for {
cols.Reset()

if err := block.DecodeBlock(chr, protoVersion, results); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrapf(err, "decode block %d", blockIdx)
}

if err := cb(cols); err != nil {
return errors.Wrap(err, "callback")
}
blockIdx++
}
}
Loading

0 comments on commit 35afe5c

Please sign in to comment.