Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] add long tests for vulture to test traceql metrics query range #4444

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 297 additions & 8 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"crypto/tls"
"errors"
"flag"
Expand All @@ -12,10 +13,12 @@ import (
"net/url"
"os"
"reflect"
"strconv"
"time"

"github.com/go-test/deep"
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
zaplogfmt "github.com/jsternberg/zap-logfmt"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
Expand All @@ -24,6 +27,9 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/grafana/dskit/user"

testUtil "github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/httpclient"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -60,6 +66,10 @@ type traceMetrics struct {

const (
defaultJaegerGRPCEndpoint = 14250
specialSpanName = "specialVultureSpanName"
specialServiceName = "specialVultureServiceName"
specialAttributeKey = "specialVultureAttributeKey"
specialAttributeValue = "specialVultureAttributeValue"
)

type vultureConfiguration struct {
Expand Down Expand Up @@ -119,17 +129,18 @@ func main() {
}
httpClient := httpclient.New(vultureConfig.tempoQueryURL, vultureConfig.tempoOrgID)

tickerWrite, tickerRead, tickerSearch, err := initTickers(vultureConfig.tempoWriteBackoffDuration, vultureConfig.tempoReadBackoffDuration, vultureConfig.tempoSearchBackoffDuration)
if err != nil {
panic(err)
}
//tickerWrite, tickerRead, tickerSearch, err := initTickers(vultureConfig.tempoWriteBackoffDuration, vultureConfig.tempoReadBackoffDuration, vultureConfig.tempoSearchBackoffDuration)
// if err != nil {
// panic(err)
// }
startTime := time.Now()
r := rand.New(rand.NewSource(startTime.Unix()))
interval := vultureConfig.tempoWriteBackoffDuration
//interval := vultureConfig.tempoWriteBackoffDuration

doWrite(jaegerClient, tickerWrite, interval, vultureConfig, logger)
doRead(httpClient, tickerRead, startTime, interval, r, vultureConfig, logger)
doSearch(httpClient, tickerSearch, startTime, interval, r, vultureConfig, logger)
//doWrite(jaegerClient, tickerWrite, interval, vultureConfig, logger)
//doRead(httpClient, tickerRead, startTime, interval, r, vultureConfig, logger)
//doSearch(httpClient, tickerSearch, startTime, interval, r, vultureConfig, logger)
doLongTests(jaegerClient, httpClient, vultureConfig, *r, logger)

http.Handle(prometheusPath, promhttp.Handler())
log.Fatal(http.ListenAndServe(prometheusListenAddress, nil))
Expand Down Expand Up @@ -310,6 +321,284 @@ func doSearch(httpClient httpclient.TempoHTTPClient, tickerSearch *time.Ticker,
}
}

type SpanTracker struct {
count int
timeStamps []int64
spanCountSameName []int
spanCountSameService []int
spanCountSameAttribute []int
spanName string
serviceName string
attributeKey string
attributeValue string
}

func newSpanTracker() *SpanTracker {
timeNow := strconv.Itoa(int(time.Now().UnixMilli()))
return &SpanTracker{
serviceName: specialServiceName + timeNow,
spanName: specialSpanName + timeNow,
attributeKey: specialAttributeKey,
attributeValue: specialAttributeValue + timeNow,
}
}

func (st *SpanTracker) MakeBatch(r rand.Rand, l *zap.Logger) *thrift.Batch {
// make starting batch with service name
startingSpanCount := r.Intn(4) + 1
batch := testUtil.MakeThriftBatchWithSpanCountResourceAndSpanAttr(startingSpanCount, st.serviceName, "span-name", "vulture", "vulture", "key", st.attributeKey)

// reuse trace id & start time
traceIDHigh := batch.Spans[0].TraceIdHigh
traceIDLow := batch.Spans[0].TraceIdLow
startTime := batch.Spans[0].StartTime

// inject more spans with special name
spanNameSpanCount := r.Intn(4) + 1
for i := 0; i < spanNameSpanCount; i++ {
batch.Spans = append(batch.Spans, &thrift.Span{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: rand.Int63(),
ParentSpanId: 0,
OperationName: st.spanName,
Flags: 0,
StartTime: startTime,
Duration: 1,
})
}

// inject more spans with special attribute
spanAttributeSpanCount := r.Intn(4) + 1
for i := 0; i < spanAttributeSpanCount; i++ {
batch.Spans = append(batch.Spans, &thrift.Span{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: rand.Int63(),
ParentSpanId: 0,
OperationName: "my operation",
Flags: 0,
StartTime: startTime,
Duration: 1,
Tags: []*thrift.Tag{
{
Key: st.attributeKey,
VStr: &st.attributeValue,
},
},
})
}
st.count++
st.timeStamps = append(st.timeStamps, startTime)
st.spanCountSameName = append(st.spanCountSameName, spanNameSpanCount)
st.spanCountSameAttribute = append(st.spanCountSameAttribute, spanAttributeSpanCount)
st.spanCountSameService = append(st.spanCountSameService, startingSpanCount+spanNameSpanCount+spanAttributeSpanCount)
return batch
}

func (st *SpanTracker) GetSpanCount(scenario string, start, end int) int {
tracker := st.spanCountSameName
if scenario == "service" {
tracker = st.spanCountSameService
} else if scenario == "attribute" {
tracker = st.spanCountSameAttribute
}

spanCount := 0
for i := start; i <= end; i++ {
spanCount += tracker[i]
}
return spanCount
}

func (st *SpanTracker) GetSpanCounts(scenario string, start, end int) []int {
tracker := st.spanCountSameName
if scenario == "service" {
tracker = st.spanCountSameService
} else if scenario == "attribute" {
tracker = st.spanCountSameAttribute
}

spanCounts := make([]int, end-start + 1)
for i := start; i <= end; i++ {
spanCounts[i-start] = tracker[i]
}
return spanCounts
}

func (st *SpanTracker) GetTraceCount(start, end int) int {
count := 0

for i := start; i <= end; i++ {
if st.spanCountSameService[i] != 0 {
count++
}
}
return count
}

func (st *SpanTracker) GetRandomStartEndPosition(r rand.Rand) (int, int) {
// with rhythm we are ingesting spans at a higher latency than before
// so we are allowing 3 minutes slack time (by excluding the last 3 values)
// choosing a random start and end time with at least 20 data points
count := len(st.timeStamps) - 3
start := r.Intn(count - 20) // ensuring 20 counts
end := 20 + start
return start, end // the end is inclusive
}

func (st *SpanTracker) ValidateTraceQLSearches(tempoClient httpclient.TempoHTTPClient, startPosition, endPosition int, logger *zap.Logger) error {
serviceQuery := fmt.Sprintf(`{resource.service.name = "%s"}`, st.serviceName)
nameQuery := fmt.Sprintf(`{span:name = "%s"}`, st.spanName)
attributeQuery := fmt.Sprintf(`{span.%s = "%s"}`, st.attributeKey, st.attributeValue)

scenarios := []string{serviceQuery, nameQuery, attributeQuery}

for _, scenario := range scenarios {
queryType := "service"
if scenario == nameQuery {
queryType = "name"
} else if scenario == attributeQuery {
queryType = "attribute"
}

// add some slack to start and end time in the query
start := time.UnixMicro(st.timeStamps[startPosition]).Add(-1 * time.Second).Unix()
end := time.UnixMicro(st.timeStamps[endPosition]).Add(1 * time.Second).Unix()
resp, err := tempoClient.SearchTraceQLWithRangeAndLimit(scenario, start, end, 5000, 100)
if err != nil {
logger.Error("error searching Tempo traceql query", zap.Error(err))
return err
}
pass := true
expectedCount := st.GetTraceCount(startPosition, endPosition)
if len(resp.Traces) != expectedCount {
pass = false
logger.Error("incorrect number of traces returned", zap.String("scenarios", scenario), zap.Int("expected", expectedCount), zap.Int("actual", len(resp.Traces)))
}
actualSpanCount := 0
for _, trace := range resp.Traces {
for _, spanset := range trace.SpanSets{
actualSpanCount += len(spanset.Spans)
}
}
expectedSpanCount := st.GetSpanCount(queryType, startPosition, endPosition)
if actualSpanCount != expectedSpanCount {
pass = false
logger.Error("incorrect number of spans returned", zap.String("scenarios", scenario), zap.Int("expected", expectedSpanCount), zap.Int("actual", actualSpanCount))
// metricTracesErrors.WithLabelValues("notfound_search_attribute").Add(float64(metrics.notFoundSearchAttribute))
}

if !pass {
metricTracesErrors.WithLabelValues("traceql_incorrect_result").Add(float64(1))
}
}
return nil
}

func (st *SpanTracker) ValidateTraceQLMetricsSearches(tempoClient httpclient.TempoHTTPClient, startPosition, endPosition int, ticketDuration time.Duration, logger *zap.Logger) error{
serviceQuery := fmt.Sprintf(`{resource.service.name = "%s"} | rate()`, st.serviceName)
nameQuery := fmt.Sprintf(`{span:name = "%s"} | rate()`, st.spanName)
attributeQuery := fmt.Sprintf(`{span.%s = "%s"} | rate()`, st.attributeKey, st.attributeValue)

scenarios := []string{serviceQuery, nameQuery, attributeQuery}

for _, scenario := range scenarios {
queryType := "service"
if scenario == nameQuery {
queryType = "name"
} else if scenario == attributeQuery {
queryType = "attribute"
}
spanCounts := st.GetSpanCounts(queryType, startPosition, endPosition)
start := time.UnixMicro(st.timeStamps[startPosition]).Add(-1 * time.Second).Unix()
end := time.UnixMicro(st.timeStamps[endPosition]).Add(1 * time.Second).Unix()
if end > time.Now().Unix() {
end = time.Now().Unix()
}
stepSecond := ticketDuration.Seconds()
step := int64(ticketDuration)

resp, err := tempoClient.SearchQueryRange(scenario, start, end, step)
if err != nil {
logger.Error("error searching Tempo query range query", zap.Error(err), zap.String("query", scenario))
return err
}
// since we send and record count every 30 seconds and we set the step to 30 seconds
// we expect the count to be the same between span tracker count and time series
pass := true
for i, sample := range resp.Series[0].Samples {
if i >= len(spanCounts) { continue } // for when start/end time creates additional samples
expectedSpanCountRate := float64(spanCounts[i])/stepSecond
if (sample.Value != 0 && sample.Value != expectedSpanCountRate) || (sample.Value == 0 && spanCounts[i] != 0) {
logger.Error("incorrect number of spans returned for query range test", zap.String("scenarios", scenario), zap.Float64("expected", expectedSpanCountRate), zap.Float64("actual", sample.Value))
}
}
if !pass {
metricTracesErrors.WithLabelValues("metrics_query_incorrect_result").Add(float64(1))
}


}
return nil
}

func doLongTests(jaegerClient util.JaegerClient, tempoClient httpclient.TempoHTTPClient, config vultureConfiguration, r rand.Rand, l *zap.Logger) {

// run every 30 seconds
ticketDuration := time.Duration(30) * time.Second
ticker := time.NewTicker(ticketDuration)
spanTracker := newSpanTracker()

go func() {
for range ticker.C {
// create a new span tracker every 500 times to clear out old data
if len(spanTracker.timeStamps) >= 500 {
spanTracker = newSpanTracker()
}

// emit traces and keep track of span counts
ctx := user.InjectOrgID(context.Background(), config.tempoOrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
logger.Error("error injecting org id", zap.Error(err))
continue
}

batch := spanTracker.MakeBatch(r, l)
err = jaegerClient.EmitBatch(ctx, batch)
if err != nil {
logger.Error("error pushing batch to Tempo", zap.Error(err))
// don't record the last span count if it failed but still record the timestamps for metrics queries
spanTracker.spanCountSameName[len(spanTracker.spanCountSameName)-1] = 0
spanTracker.spanCountSameService[len(spanTracker.spanCountSameService)-1] = 0
spanTracker.spanCountSameAttribute[len(spanTracker.spanCountSameAttribute)-1] = 0
spanTracker.count--
continue
}
logger.Info("pushed batch to Tempo", zap.Int("count", spanTracker.count))

// only search after at least 30 pushes
if spanTracker.count < 30 {
logger.Info("pushed only", zap.Int("count", spanTracker.count))
continue
}

// choose random start/end for searches (the end position is inclusive)
startPosition, endPosition := spanTracker.GetRandomStartEndPosition(r)
logger.Info("random positions", zap.Int("start", startPosition), zap.Int("end", endPosition))

// traceql
spanTracker.ValidateTraceQLSearches(tempoClient, startPosition, endPosition, l)

// metrics searches
spanTracker.ValidateTraceQLMetricsSearches(tempoClient, startPosition, endPosition, ticketDuration, l)

}
}()

}

func pushMetrics(metrics traceMetrics) {
metricTracesInspected.Add(float64(metrics.requested))
metricTracesErrors.WithLabelValues("incorrectresult").Add(float64(metrics.incorrectResult))
Expand Down
5 changes: 5 additions & 0 deletions cmd/tempo-vulture/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,8 @@ func (m *MockHTTPClient) SetOverrides(limits *userconfigurableoverrides.Limits,
func (m *MockHTTPClient) WithTransport(t http.RoundTripper) {
panic("unimplemented")
}

//nolint:all
func (m *MockHTTPClient) SearchQueryRange(query string, start int64, end int64, step int64) (*tempopb.QueryRangeResponse, error) {
panic("unimplemented")
}
Loading
Loading