From c234ffa8740c6a7d7527d781ad0de9d9053c26f9 Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Fri, 1 Dec 2023 12:53:09 +0300 Subject: [PATCH] fix(logqlengine): adapt to OTEL --- internal/logql/logqlengine/engine_test.go | 103 +++++++++++++++++---- internal/logql/logqlengine/label_set.go | 4 +- internal/logql/logqlengine/otel_adapter.go | 9 +- 3 files changed, 94 insertions(+), 22 deletions(-) diff --git a/internal/logql/logqlengine/engine_test.go b/internal/logql/logqlengine/engine_test.go index b0aa2ebf..d0ff0f3f 100644 --- a/internal/logql/logqlengine/engine_test.go +++ b/internal/logql/logqlengine/engine_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/go-faster/jx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" @@ -56,13 +57,88 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta for _, l := range m.lines { ts = ts.Add(step) - records = append(records, logstorage.Record{ + body := l.line + rec := logstorage.Record{ Timestamp: otelstorage.NewTimestampFromTime(ts), Body: l.line, Attrs: otelstorage.Attrs(l.attrs), ScopeAttrs: otelstorage.Attrs(scopeAttrs), ResourceAttrs: otelstorage.Attrs(resAttrs), - }) + } + if rec.Attrs == otelstorage.Attrs(pcommon.Map{}) { + rec.Attrs = otelstorage.Attrs(pcommon.NewMap()) + } + if dec := jx.DecodeStr(body); dec.Next() == jx.Object { + rec.Body = "" + if err := dec.Obj(func(d *jx.Decoder, key string) error { + switch key { + case logstorage.LabelBody: + v, err := d.Str() + if err != nil { + return err + } + rec.Body = v + return nil + case logstorage.LabelTraceID: + v, err := d.Str() + if err != nil { + return err + } + traceID, err := otelstorage.ParseTraceID(v) + if err != nil { + return err + } + rec.TraceID = traceID + return nil + default: + switch d.Next() { + case jx.String: + v, err := d.Str() + if err != nil { + return err + } + rec.Attrs.AsMap().PutStr(key, v) + return nil + case jx.Bool: + v, err := d.Bool() + if err != nil { + return err + } + rec.Attrs.AsMap().PutBool(key, v) + return nil + case jx.Number: + v, err := d.Num() + if err != nil { + return err + } + if v.IsInt() { + n, err := v.Int64() + if err != nil { + return err + } + rec.Attrs.AsMap().PutInt(key, n) + } else { + n, err := v.Float64() + if err != nil { + return err + } + rec.Attrs.AsMap().PutDouble(key, n) + } + return nil + default: + v, err := d.Raw() + if err != nil { + return err + } + rec.Attrs.AsMap().PutStr(key, string(v)) + return nil + } + } + }); err != nil { + return nil, err + } + } + records = append(records, rec) } return iterators.Slice(records), nil @@ -128,20 +204,7 @@ func TestEngineEvalStream(t *testing.T) { { `{resource="test"}`, inputLines, - []resultLine{ - { - `{"id": 1, "foo": "4m", "bar": "1s", "baz": "1kb"}`, - map[string]string{}, - }, - { - `{"id": 2, "foo": "5m", "bar": "2s", "baz": "1mb"}`, - map[string]string{}, - }, - { - `{"id": 3, "foo": "6m", "bar": "3s", "baz": "1gb"}`, - map[string]string{}, - }, - }, + resultLines, false, }, { @@ -428,7 +491,13 @@ func TestEngineEvalStream(t *testing.T) { line: e.line, labels: e.labels, } - assert.Equal(t, result, tt.wantData[i]) + wanna := tt.wantData[i] + if jx.Valid([]byte(wanna.line)) { + assert.JSONEq(t, wanna.line, result.line) + } else { + assert.Equal(t, wanna.line, result.line) + } + assert.Equal(t, wanna.labels, result.labels) } }) } diff --git a/internal/logql/logqlengine/label_set.go b/internal/logql/logqlengine/label_set.go index aa1572a2..120fb6f9 100644 --- a/internal/logql/logqlengine/label_set.go +++ b/internal/logql/logqlengine/label_set.go @@ -88,7 +88,9 @@ func (l *LabelSet) SetFromRecord(record logstorage.Record) { if severity := record.SeverityNumber; severity != plog.SeverityNumberUnspecified { l.Set(logstorage.LabelSeverity, pcommon.NewValueStr(severity.String())) } - l.Set(logstorage.LabelBody, pcommon.NewValueStr(record.Body)) + if body := record.Body; body != "" { + l.Set(logstorage.LabelBody, pcommon.NewValueStr(body)) + } l.SetAttrs(record.Attrs, record.ScopeAttrs, record.ResourceAttrs) } diff --git a/internal/logql/logqlengine/otel_adapter.go b/internal/logql/logqlengine/otel_adapter.go index e5434d51..601e3ae1 100644 --- a/internal/logql/logqlengine/otel_adapter.go +++ b/internal/logql/logqlengine/otel_adapter.go @@ -12,10 +12,11 @@ func LineFromRecord(record logstorage.Record) string { // Create JSON object from record. e := &jx.Encoder{} e.Obj(func(e *jx.Encoder) { - e.Field(logstorage.LabelBody, func(e *jx.Encoder) { - e.Str(record.Body) - }) - + if len(record.Body) != 0 { + e.Field(logstorage.LabelBody, func(e *jx.Encoder) { + e.Str(record.Body) + }) + } if m := record.Attrs.AsMap(); m != (pcommon.Map{}) { record.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool { e.Field(k, func(e *jx.Encoder) {