Skip to content

Commit

Permalink
Support pganalyze tracestate to set start time of the span (#475)
Browse files Browse the repository at this point in the history
Add the capability for the collector to read a tracestate for pganalyze and use the time specified with `t` as the start time of the tracing span.
  • Loading branch information
keiko713 authored Nov 21, 2023
1 parent fea04b2 commit def6956
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 3 deletions.
38 changes: 35 additions & 3 deletions logs/querysample/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"strings"
"time"

"github.com/pganalyze/collector/state"
Expand Down Expand Up @@ -32,6 +33,39 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg
)
}

func startAndEndTime(traceState trace.TraceState, sample state.PostgresQuerySample) (startTime time.Time, endTime time.Time) {
if pganalyzeState := traceState.Get("pganalyze"); pganalyzeState != "" {
// A pganalyze traceState allows the client to pass the query start time (sent time)
// on the client side, in nano second precision, like pganalyze=t:1697666938.6297212
// If there are multiple values in a pganalzye traceState, they are separated by semicolon
// like pganalyze=t:1697666938.6297212;x=123
for _, part := range strings.Split(strings.TrimSpace(pganalyzeState), ";") {
if strings.Contains(part, ":") {
keyAndValue := strings.SplitN(part, ":", 2)
if strings.TrimSpace(keyAndValue[0]) == "t" {
if start, err := util.TimeFromStr(keyAndValue[1]); err == nil {
startTime = start
// With this, we're adding the query duration to the start time.
// This could result creating inaccurate spans, as the start time passed
// from the client side using tracestate is the time of the query is sent
// from the client to the server.
// This means, we will ignore the network time between the client and the
// server, as well as the machine clock difference between them.
endTime = startTime.Add(time.Duration(sample.RuntimeMs) * time.Millisecond)
return
}
}
}
}
}
// If no start time was found in the tracestate, calculate start and end time based on sample data
duration := time.Duration(sample.RuntimeMs) * time.Millisecond
startTime = sample.OccurredAt.Add(-1 * duration)
endTime = sample.OccurredAt

return
}

func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) {
exportCount := 0
for _, sample := range samples {
Expand All @@ -49,9 +83,7 @@ func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l
trace.WithInstrumentationVersion(util.CollectorVersion),
trace.WithSchemaURL(semconv.SchemaURL),
)
duration := -1 * time.Duration(sample.RuntimeMs) * time.Millisecond
startTime := sample.OccurredAt.Add(duration)
endTime := sample.OccurredAt
startTime, endTime := startAndEndTime(trace.SpanContextFromContext(ctx).TraceState(), sample)
_, span := tracer.Start(ctx, otelSpanName, trace.WithTimestamp(startTime))
// See https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/
// however note that "db.postgresql.plan" is non-standard.
Expand Down
80 changes: 80 additions & 0 deletions logs/querysample/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package querysample

import (
"testing"
"time"

"github.com/pganalyze/collector/state"
"go.opentelemetry.io/otel/trace"
)

type startAndEndTimeTestPair struct {
testName string
traceState trace.TraceState
sample state.PostgresQuerySample
startTime time.Time
endTime time.Time
}

func TestStartAndEndTime(t *testing.T) {
currentTime := time.Date(2023, time.January, 1, 1, 2, 3, 456*1000*1000, time.UTC)
traceState := trace.TraceState{}
otelTraceState, err := traceState.Insert("ot", "p:8;r:62")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}
pganalyzeTraceStateWithoutT, err := otelTraceState.Insert("pganalyze", "x:foo;y:bar")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}
pganalyzeTraceState, err := otelTraceState.Insert("pganalyze", "t:1697666938.6297212")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}
// 1697666938.6297212 = 2023-10-18 22:08:58.6297212
pganalyzeTime, err := time.Parse("2006-01-02T15:04:05.999999999", "2023-10-18T22:08:58.6297212")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}

var startAndEndTimeTests = []startAndEndTimeTestPair{
{
"No trace state",
trace.TraceState{},
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
currentTime.Add(-1 * 1000 * time.Millisecond),
currentTime,
},
{
"No pganalyze trace state",
otelTraceState,
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
currentTime.Add(-1 * 1000 * time.Millisecond),
currentTime,
},
{
"pganalyze trace state without t",
pganalyzeTraceStateWithoutT,
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
currentTime.Add(-1 * 1000 * time.Millisecond),
currentTime,
},
{
"pganalyze trace state",
pganalyzeTraceState,
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
pganalyzeTime,
pganalyzeTime.Add(1000 * time.Millisecond),
},
}

for _, pair := range startAndEndTimeTests {
startTime, endTime := startAndEndTime(pair.traceState, pair.sample)
if pair.startTime != startTime {
t.Errorf("For %s: expected startTime to be %v, but was %v\n", pair.testName, pair.startTime, startTime)
}
if pair.endTime != endTime {
t.Errorf("For %s: expected endTime to be %v, but was %v\n", pair.testName, pair.endTime, endTime)
}
}
}
45 changes: 45 additions & 0 deletions util/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package util

import (
"errors"
"strconv"
"strings"
"time"
)

// TimeFromStr returns a time of the given value in string.
// The value is the value of time either as a floating point number of seconds since the Epoch, or it can also be a integer number of seconds.
func TimeFromStr(value string) (time.Time, error) {
IntegerAndDecimal := strings.SplitN(value, ".", 2)
secInStr := IntegerAndDecimal[0]
sec, err := strconv.ParseInt(secInStr, 10, 64)
if err != nil {
return time.Time{}, err
}
if len(IntegerAndDecimal) == 1 {
return time.Unix(sec, 0).UTC(), nil
}
decimalInStr := IntegerAndDecimal[1]
if decimalInStr == "" {
decimalInStr = "0"
}
if len(decimalInStr) > 9 {
// decimal length shouldn't be more than nanoseconds (9)
return time.Time{}, errors.New("decimal length is longer than nanoseconds (9)")
}
nsecInStr := rightPad(decimalInStr, 9, "0")
nsec, err := strconv.ParseInt(nsecInStr, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, nsec).UTC(), nil
}

// rightPad returns the string that is right padded with the given pad string.
func rightPad(str string, length int, pad string) string {
if len(str) >= length {
return str
}
padding := strings.Repeat(pad, length-len(str))
return str + padding
}
63 changes: 63 additions & 0 deletions util/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util_test

import (
"reflect"
"testing"
"time"

"github.com/pganalyze/collector/util"
)

var timeFromStrTests = []struct {
input string
expected time.Time
expectErr bool
}{
{
"1697666938.629721234",
time.Unix(1697666938, 629721234).UTC(),
false,
},
{
"1697666938.629",
time.Unix(1697666938, 629000000).UTC(),
false,
},
{
"1697666938",
time.Unix(1697666938, 0).UTC(),
false,
},
{
"",
time.Time{},
true,
},
{
"not a time",
time.Time{},
true,
},
{
"1697666938.baddecimal",
time.Time{},
true,
},
{
"1697666938.6297212340000", // nsec too long
time.Time{},
true,
},
}

func TestTimeFromStr(t *testing.T) {
for _, test := range timeFromStrTests {
actual, err := util.TimeFromStr(test.input)
if (err != nil) != test.expectErr {
t.Errorf("TimeFromStr(%s): expected err: %t; actual: %s", test.input, test.expectErr, err)
}
if !reflect.DeepEqual(actual, test.expected) {
t.Errorf("TimeFromStr(%s): expected %v; actual %v", test.input, test.expected, actual)
}
}
}

0 comments on commit def6956

Please sign in to comment.