Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: scrape hot threads CPU consumption percentage #533

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions collector/hot_threads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright 2021 The Prometheus Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Copyright 2021 The Prometheus Authors
// Copyright 2022 The Prometheus Authors

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/prometheus/client_golang/prometheus"
"io/ioutil"
"net/http"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports are not formatted properly. The stdlib imports should be first in alphabetical order, followed by an empty line, followed by the other imports in alphabetical order. I believe go fmt should do that automatically, or one of the other go tools.

"net/url"
"os"
"path"
"regexp"
"strconv"
"strings"
)

var (
defaultHotThreadsLabels = []string{"node", "thread_name", "thread_id"}

defaultHotThreadsLabelValues = func(HotThreads string) []string {
return []string{
HotThreads,
}
}
NODE_OUTPUT_SEPERATOR = ":::"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why all of this data is being parsed by regex. It seems error prone. I think some tests would be very useful.

HOT_THREADS_OP_REGEX = `^?([0-9]*[.])?[0-9]+%.*`
CPU_PERCENTAGE_REGEX = `^?([0-9]*[.])?[0-9]+%`
)

// HotThreads information struct
type HotThreads struct {
logger log.Logger
url *url.URL

HotThreadsMetrics HotThreadsMetric
HotThreadsFailureMetrics HotThreadsStepFailureMetric

jsonParseFailures prometheus.Counter
}

type HotThreadsMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(HotThreadsExp float64) float64
Labels func(HotThreadsDataNode string, HotThreadsName, HotThreadsId string) []string
}

type HotThreadsStepFailureMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(HotThreadsExp int64) float64
Labels func(HotThreadsIndex string, HotThreadsPolicy string, action string, step string) []string
}

func getEnv(key, defaultVal string) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think env vars should be read in a collector. All configuration and values should be passed in.

value, exists := os.LookupEnv(key)
if !exists {
value = defaultVal
}
return value
}

// NewHotThreadsExplain defines HotThreads Prometheus metrics
func NewHotThreads(logger log.Logger, url *url.URL) *HotThreads {
return &HotThreads{
logger: logger,
url: url,

HotThreadsMetrics: HotThreadsMetric{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "hot_threads", "cpu_usage_percentage"),
"Hot Threads cpu usage on data nodes",
defaultHotThreadsLabels, nil,
),
Value: func(HotThreadsCpuPercentage float64) float64 {
return float64(HotThreadsCpuPercentage)
},
Labels: func(HotThreadsDataNode string, HotThreadsName, HotThreadsId string) []string {
return []string{HotThreadsDataNode, HotThreadsName, HotThreadsId}
},
},

jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "hot_threads", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
}
}

// Describe HotThreads
func (s *HotThreads) Describe(ch chan<- *prometheus.Desc) {
ch <- s.jsonParseFailures.Desc()
ch <- s.HotThreadsMetrics.Desc
}

func (s *HotThreads) getAndParseURL(u *url.URL, hotThreads *[]HotThreadsRsp) error {
res, err := http.Get(u.String())
if err != nil {
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
_ = level.Warn(s.logger).Log(
"msg", "failed to get resp body",
"err", err,
)
}

defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(s.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()

if res.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

sb := string(body)
hotThreadsNodeOp := strings.Split(string(sb), NODE_OUTPUT_SEPERATOR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sb being cast to a string? It's cast to a string above.


for _, nodeData := range hotThreadsNodeOp {
nodeName := strings.Trim(strings.Split(nodeData, "}")[0], " {")

hotThreadsOpRegex := regexp.MustCompile(HOT_THREADS_OP_REGEX)
allHotThreads := hotThreadsOpRegex.FindAllString(nodeData, -1)
cpuPercentageRegex := regexp.MustCompile(CPU_PERCENTAGE_REGEX)

for _, v := range allHotThreads {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here seems like some complex parsing. I think it would be best for this to be in a separate function with good unit tests. The signature of the function and name should provide a clear definition of what the functionality is.

Is there not an alternative to this parsing? What format is the data in?

cpu := string(cpuPercentageRegex.FindString(v))
cpu = strings.Trim(cpu, "%")
threadName := ""
threadId := ""
data := strings.Split(v, "usage by thread")
if len(data) > 1 {
// longThreadName would be one of these string patterns -
// "process reaper"
// "elasticsearch[keepAlive/7.0.1]"
// "elasticsearch[dragoneye-es-managed-data-6][refresh][T#3]"
// "elasticsearch[elasticsearch-data-0][[geonames][0]: Lucene Merge Thread #12]"

longThreadName := data[1]
threadName = longThreadName
threadId = ""
// does not contain "[]" or ":" with exception of elasticsearch[keepAlive/7.0.1]
if strings.Contains(longThreadName, "[") || strings.Contains(longThreadName, ":") {
if strings.Contains(longThreadName, "keepAlive") {
threadName = "keepAlive"
threadId = ""
} else {
if strings.Contains(longThreadName, "Lucene Merge Thread") {
// lucene merge thread like - elasticsearch[elasticsearch-data-0][[geonames][0]: Lucene Merge Thread #12]
thread := strings.Trim(strings.Split(longThreadName, ":")[1], "[]'")
threadName = "merge"
threadId = strings.Split(thread, "#")[1]
} else {
// search, write, refresh, transport_worker etc. like - elasticsearch[elasticsearch-data-0][write][T#2]
threadName = strings.Trim(strings.Split(longThreadName, "][")[1], "[]'")
threadId = strings.Trim((strings.Split(longThreadName, "][")[2]), "T#[]'")
}
}
}
}
cpuPercentage := 0.0
cpuPercentage, err := strconv.ParseFloat(cpu, 64)
if err != nil {
_ = level.Warn(s.logger).Log(
"msg", "error parsing cpu percentage",
"info", err,
)
}
t := &HotThreadsRsp{CpuPercentage: cpuPercentage, Node: nodeName, ThreadName: threadName, ThreadId: threadId}
*hotThreads = append(*hotThreads, *t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are all of these pointer dereferences?

}
}

return nil
}

func (s *HotThreads) fetchAndDecodeHotThreads() ([]HotThreadsRsp, error) {

u := *s.url
u.Path = path.Join(u.Path, "/_nodes/hot_threads")

var MAX_HOT_THREADS_COUNT = getEnv("MAX_HOT_THREADS_COUNT", "3")
var HOT_THREADS_SECOND_SAMPLING_INTERVAL = getEnv("HOT_THREADS_SECOND_SAMPLING_INTERVAL", "500ms")

q := u.Query()
q.Set("threads", MAX_HOT_THREADS_COUNT)
q.Set("interval", HOT_THREADS_SECOND_SAMPLING_INTERVAL)
u.RawQuery = q.Encode()
u.RawPath = q.Encode()
var ifr []HotThreadsRsp
err := s.getAndParseURL(&u, &ifr)

if err != nil {
return ifr, err
}
return ifr, err
}

// Collect gets cluster hot threads metric values
func (s *HotThreads) Collect(ch chan<- prometheus.Metric) {

defer func() {
ch <- s.jsonParseFailures
}()

ir, err := s.fetchAndDecodeHotThreads()
if err != nil {
_ = level.Warn(s.logger).Log(
"msg", "failed to fetch and decode HotThreads stats",
"err", err,
)
return
}

for _, t := range ir {
ch <- prometheus.MustNewConstMetric(
s.HotThreadsMetrics.Desc,
s.HotThreadsMetrics.Type,
s.HotThreadsMetrics.Value(t.CpuPercentage),
s.HotThreadsMetrics.Labels(t.Node, t.ThreadName, t.ThreadId)...,
)
}
}
21 changes: 21 additions & 0 deletions collector/hot_threads_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

// _nodes/hot_threads response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be a separate file.

type HotThreadsRsp struct {
Node string `json:"node"`
ThreadName string `json:"thread_name"`
ThreadId string `json:"thread_id"`
CpuPercentage float64 `json:"cpu_percentage"`
}
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there are any go.mod changes so I don't believe that there should be any go.sum changes either.

github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
Expand All @@ -76,6 +77,7 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func main() {
esExportSnapshots = kingpin.Flag("es.snapshots",
"Export stats for the cluster snapshots.").
Default("false").Bool()
esExportHotThreads = kingpin.Flag("es.hot_threads",
"Export stats for hot threads on data nodes.").
Default("false").Envar("ES_HOT_THREADS").Bool()
esExportSLM = kingpin.Flag("es.slm",
"Export stats for SLM snapshots.").
Default("false").Bool()
Expand Down Expand Up @@ -213,6 +216,9 @@ func main() {
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
}

if *esExportHotThreads {
prometheus.MustRegister(collector.NewHotThreads(logger, esURL))
}
// create a http server
server := &http.Server{}

Expand Down