Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chdump): add Clickhouse dump reader #564

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions cmd/otelbench/chdump/attrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package chdump

import (
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/otelstorage"
)

type Attributes struct {
index *proto.ColBytes
col *proto.ColLowCardinalityRaw
hashes map[otelstorage.Hash]int

// values are filled up only when decoding.
values []otelstorage.Attrs
}

// NewAttributes creates a new [Attributes].
func NewAttributes() *Attributes {
ac := &Attributes{
index: new(proto.ColBytes),
hashes: map[otelstorage.Hash]int{},
}
ac.col = &proto.ColLowCardinalityRaw{
Index: ac.index,
Key: proto.KeyUInt64,
}
return ac
}

func (a Attributes) Type() proto.ColumnType {
return proto.ColumnTypeLowCardinality.Sub(proto.ColumnTypeString)
}

func (a Attributes) Rows() int {
return a.col.Rows()
}

func (a *Attributes) DecodeColumn(r *proto.Reader, rows int) error {
if err := a.col.DecodeColumn(r, rows); err != nil {
return errors.Wrap(err, "col")
}
for i := 0; i < a.index.Rows(); i++ {
v := a.index.Row(i)
m, err := decodeAttributes(v)
if err != nil {
return errors.Wrapf(err, "index value %d", i)
}
a.hashes[m.Hash()] = i
a.values = append(a.values, m)
}
return nil
}

func (a *Attributes) Reset() {
a.col.Reset()
a.index.Reset()
a.values = a.values[:0]
maps.Clear(a.hashes)
a.col.Key = proto.KeyUInt64
}

func (a *Attributes) DecodeState(r *proto.Reader) error {
return a.col.DecodeState(r)
}

func (a *Attributes) rowIdx(i int) int {
switch a.col.Key {
case proto.KeyUInt8:
return int(a.col.Keys8[i])
case proto.KeyUInt16:
return int(a.col.Keys16[i])
case proto.KeyUInt32:
return int(a.col.Keys32[i])
case proto.KeyUInt64:
return int(a.col.Keys64[i])
default:
panic("invalid key type")
}
}

func (a *Attributes) Row(i int) otelstorage.Attrs {
return a.values[a.rowIdx(i)]
}
79 changes: 79 additions & 0 deletions cmd/otelbench/chdump/attrs_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package chdump

import (
"github.com/go-faster/jx"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/otelstorage"
)

func decodeAttributes(s []byte) (otelstorage.Attrs, error) {
result := pcommon.NewMap()
if len(s) == 0 {
return otelstorage.Attrs(result), nil
}
err := decodeMap(jx.DecodeBytes(s), result)
return otelstorage.Attrs(result), err
}

func decodeValue(d *jx.Decoder, val pcommon.Value) error {
switch d.Next() {
case jx.String:
v, err := d.Str()
if err != nil {
return err
}
val.SetStr(v)
return nil
case jx.Number:
n, err := d.Num()
if err != nil {
return err
}
if n.IsInt() {
v, err := n.Int64()
if err != nil {
return err
}
val.SetInt(v)
} else {
v, err := n.Float64()
if err != nil {
return err
}
val.SetDouble(v)
}
return nil
case jx.Null:
if err := d.Null(); err != nil {
return err
}
// Do nothing, keep value empty.
return nil
case jx.Bool:
v, err := d.Bool()
if err != nil {
return err
}
val.SetBool(v)
return nil
case jx.Array:
s := val.SetEmptySlice()
return d.Arr(func(d *jx.Decoder) error {
rval := s.AppendEmpty()
return decodeValue(d, rval)
})
case jx.Object:
m := val.SetEmptyMap()
return decodeMap(d, m)
default:
return d.Skip()
}
}

func decodeMap(d *jx.Decoder, m pcommon.Map) error {
return d.Obj(func(d *jx.Decoder, key string) error {
rval := m.PutEmpty(key)
return decodeValue(d, rval)
})
}
167 changes: 167 additions & 0 deletions cmd/otelbench/chdump/chdump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Package chdump provides utilities to read ClickHouse data dumps created by oteldb.
package chdump

import (
"archive/tar"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strings"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/klauspost/compress/zstd"
)

// ReadOptions defines options for [Read].
type ReadOptions struct {
OnTraces func(*Spans) error
OnPoints func(*Points) error
OnExpHistograms func(*ExpHistograms) error
OnLogs func(*Logs) error
}

// ReadTar reads dump from given tar file.
func ReadTar(r io.Reader, opts ReadOptions) error {
tr := tar.NewReader(r)
iter := &tarIter{tr: tr}
return readDump(iter, opts)
}

// ReadDir reads dump from unpacked directory.
func ReadDir(dir string, opts ReadOptions) error {
entries, err := os.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
iter := &dirIter{dir: dir, entries: entries}
return readDump(iter, opts)
}

// readDump reads dump from given tar file.
func readDump(iter dumpIter, opts ReadOptions) error {
for {
name, file, err := iter.Next()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrap(err, "next")
}
if err := func() error {
defer func() {
_ = file.Close()
}()

table := strings.TrimSuffix(name, ".bin.zstd")
switch table {
case "traces_spans":
return readBlock(file, NewSpans(), opts.OnTraces)
case "metrics_points":
return readBlock(file, NewPoints(), opts.OnPoints)
case "metrics_exp_histograms":
return readBlock(file, NewExpHistograms(), opts.OnExpHistograms)
case "logs":
return readBlock(file, NewLogs(), opts.OnLogs)
}

return nil
}(); err != nil {
return errors.Wrapf(err, "read %q", name)
}
}
}

type dumpIter interface {
Next() (string, io.ReadCloser, error)
}

type tarIter struct {
tr *tar.Reader
}

func (t *tarIter) Next() (string, io.ReadCloser, error) {
h, err := t.tr.Next()
if err != nil {
return "", nil, err
}
name := path.Clean(h.Name)
return name, io.NopCloser(t.tr), nil
}

type dirIter struct {
dir string
entries []fs.DirEntry
idx int
}

func (d *dirIter) Next() (string, io.ReadCloser, error) {
for {
if d.idx >= len(d.entries) {
return "", nil, io.EOF
}

entry := d.entries[d.idx]
if entry.IsDir() {
d.idx++
continue
}

f, err := os.Open(filepath.Join(d.dir, entry.Name()))
if err != nil {
return "", nil, err
}
name := filepath.Clean(entry.Name())

d.idx++
return name, f, nil
}
}

const protoVersion = 51902

func readBlock[
TableColumns interface {
Reset()
Result() proto.Results
},
](
r io.Reader,
cols TableColumns,
cb func(TableColumns) error,
) error {
if cb == nil {
return nil
}

zr, err := zstd.NewReader(r)
if err != nil {
return errors.Wrap(err, "open zstd dump")
}
defer zr.Close()

var (
chr = proto.NewReader(zr)
block = &proto.Block{}

blockIdx int
results = cols.Result()
)
for {
cols.Reset()

if err := block.DecodeBlock(chr, protoVersion, results); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrapf(err, "decode block %d", blockIdx)
}

if err := cb(cols); err != nil {
return errors.Wrap(err, "callback")
}
blockIdx++
}
}
Loading
Loading