-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics.go
99 lines (82 loc) · 2.5 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package flow
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
)
// Metrics collects and increments any int numbers, can be used to safely count things in flow.
// Flow creates empty metrics object and puts it to the context. Consumers (user provided handlers) can
// retrieve it directly from the context by doing metrics := ctx.Value(flow.MetricsContextKey).(*flow.Metrics)
// Provided GetMetrics does exactly the same thing.
// Flow also has a helper method Metrics() to retrieve metrics from ctx.
type Metrics struct {
startTime time.Time
userLock sync.RWMutex
userData map[string]int
}
type contextKey string
// MetricsContextKey used as metrics key in ctx
const MetricsContextKey contextKey = "metrics"
// CidContextKey used as concurrentID key in ctx. It doesn't do anything magical, just represents a special
// case metric set by flow internally. Flow sets cid to indicate which of parallel handlers is processing data subset.
const CidContextKey contextKey = "cid"
// NewMetrics makes thread-safe map to collect any counts/metrics
func NewMetrics() *Metrics {
return &Metrics{startTime: time.Now(), userData: map[string]int{}}
}
// GetMetrics from context
func GetMetrics(ctx context.Context) *Metrics {
res, ok := ctx.Value(MetricsContextKey).(*Metrics)
if !ok {
return NewMetrics()
}
return res
}
// Add increments value for given key and returns new value
func (m *Metrics) Add(key string, delta int) int {
m.userLock.Lock()
defer m.userLock.Unlock()
m.userData[key] += delta
return m.userData[key]
}
// Inc increments value for given key by one
func (m *Metrics) Inc(key string) int {
return m.Add(key, 1)
}
// Set value for given key
func (m *Metrics) Set(key string, val int) {
m.userLock.Lock()
defer m.userLock.Unlock()
m.userData[key] = val
}
// Get returns value for given key
func (m *Metrics) Get(key string) int {
m.userLock.RLock()
defer m.userLock.RUnlock()
return m.userData[key]
}
// String returns sorted key:val string representation of metrics and adds duration
func (m *Metrics) String() string {
duration := time.Since(m.startTime)
m.userLock.RLock()
defer m.userLock.RUnlock()
sortedKeys := func() (res []string) {
for k := range m.userData {
res = append(res, k)
}
sort.Strings(res)
return res
}()
var udata = make([]string, len(sortedKeys))
for i, k := range sortedKeys {
udata[i] = fmt.Sprintf("%s:%d", k, m.userData[k])
}
um := ""
if len(udata) > 0 {
um = fmt.Sprintf("[%s]", strings.Join(udata, ", "))
}
return fmt.Sprintf("%v %s", duration, um)
}