Skip to content

Commit

Permalink
use counters for msgs/bytes sent/recv
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Lloyd <[email protected]>
  • Loading branch information
Caleb Lloyd committed Mar 6, 2024
1 parent d65a28b commit 0154a95
Showing 1 changed file with 86 additions and 54 deletions.
140 changes: 86 additions & 54 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type statzDescs struct {
// Account scope metrics
accCount *prometheus.Desc
accConnCount *prometheus.Desc
accTotalConnCount *prometheus.Desc
accLeafCount *prometheus.Desc
accSubCount *prometheus.Desc
accBytesSent *prometheus.Desc
Expand Down Expand Up @@ -156,17 +157,22 @@ type StatzCollector struct {
noReplies *prometheus.CounterVec
}

type accountStatByServer struct {
totalConnCount float64
bytesSent float64
bytesRecv float64
msgsSent float64
msgsRecv float64
}

type accountStats struct {
accountID string

connCount float64
subCount float64
leafCount float64

bytesSent float64
bytesRecv float64
msgsSent float64
msgsRecv float64
byServer map[string]*accountStatByServer

jetstreamEnabled float64
jetstreamMemoryUsed float64
Expand Down Expand Up @@ -237,7 +243,7 @@ func (sc *StatzCollector) buildDescs() {

// A unlabelled description for the up/down
sc.natsUp = prometheus.NewDesc(prometheus.BuildFQName("nats", "core", "nats_up"),
"1 if connected to NATS, 0 otherwise. A gauge.", nil, sc.constLabels)
"1 if connected to NATS, 0 otherwise. A counter.", nil, sc.constLabels)

sc.descs.Info = newPromDesc("info", "General Server information Summary gauge", sc.serverInfoLabels)
sc.descs.Start = newPromDesc("start_time", "Server start time gauge", sc.serverLabels)
Expand All @@ -246,30 +252,30 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.Cores = newPromDesc("core_count", "Machine cores gauge", sc.serverLabels)
sc.descs.CPU = newPromDesc("cpu_percentage", "Server cpu utilization gauge", sc.serverLabels)
sc.descs.Connections = newPromDesc("connection_count", "Current number of client connections gauge", sc.serverLabels)
sc.descs.TotalConnections = newPromDesc("total_connection_count", "Total number of client connections serviced gauge", sc.serverLabels)
sc.descs.TotalConnections = newPromDesc("total_connection_count", "Total number of client connections serviced counter", sc.serverLabels)
sc.descs.ActiveAccounts = newPromDesc("active_account_count", "Number of active accounts gauge", sc.serverLabels)
sc.descs.NumSubs = newPromDesc("subs_count", "Current number of subscriptions gauge", sc.serverLabels)
sc.descs.SentMsgs = newPromDesc("sent_msgs_count", "Number of messages sent gauge", sc.serverLabels)
sc.descs.SentBytes = newPromDesc("sent_bytes", "Number of messages sent gauge", sc.serverLabels)
sc.descs.RecvMsgs = newPromDesc("recv_msgs_count", "Number of messages received gauge", sc.serverLabels)
sc.descs.RecvBytes = newPromDesc("recv_bytes", "Number of messages received gauge", sc.serverLabels)
sc.descs.SentMsgs = newPromDesc("sent_msgs_count", "Number of messages sent counter", sc.serverLabels)
sc.descs.SentBytes = newPromDesc("sent_bytes", "Number of bytes sent counter", sc.serverLabels)
sc.descs.RecvMsgs = newPromDesc("recv_msgs_count", "Number of messages received counter", sc.serverLabels)
sc.descs.RecvBytes = newPromDesc("recv_bytes", "Number of bytes received counter", sc.serverLabels)
sc.descs.SlowConsumers = newPromDesc("slow_consumer_count", "Number of slow consumers gauge", sc.serverLabels)
sc.descs.RTT = newPromDesc("rtt_nanoseconds", "RTT in nanoseconds gauge", sc.serverLabels)
sc.descs.Routes = newPromDesc("route_count", "Number of active routes gauge", sc.serverLabels)
sc.descs.Gateways = newPromDesc("gateway_count", "Number of active gateways gauge", sc.serverLabels)

// Routes
sc.descs.RouteSentMsgs = newPromDesc("route_sent_msg_count", "Number of messages sent over the route gauge", sc.routeLabels)
sc.descs.RouteSentBytes = newPromDesc("route_sent_bytes", "Number of bytes sent over the route gauge", sc.routeLabels)
sc.descs.RouteRecvMsgs = newPromDesc("route_recv_msg_count", "Number of messages received over the route gauge", sc.routeLabels)
sc.descs.RouteRecvBytes = newPromDesc("route_recv_bytes", "Number of bytes received over the route gauge", sc.routeLabels)
sc.descs.RouteSentMsgs = newPromDesc("route_sent_msg_count", "Number of messages sent over the route counter", sc.routeLabels)
sc.descs.RouteSentBytes = newPromDesc("route_sent_bytes", "Number of bytes sent over the route counter", sc.routeLabels)
sc.descs.RouteRecvMsgs = newPromDesc("route_recv_msg_count", "Number of messages received over the route counter", sc.routeLabels)
sc.descs.RouteRecvBytes = newPromDesc("route_recv_bytes", "Number of bytes received over the route counter", sc.routeLabels)
sc.descs.RoutePending = newPromDesc("route_pending_bytes", "Number of bytes pending in the route gauge", sc.routeLabels)

// Gateways
sc.descs.GatewaySentMsgs = newPromDesc("gateway_sent_msgs_count", "Number of messages sent over the gateway gauge", sc.gatewayLabels)
sc.descs.GatewaySentBytes = newPromDesc("gateway_sent_bytes", "Number of messages sent over the gateway gauge", sc.gatewayLabels)
sc.descs.GatewayRecvMsgs = newPromDesc("gateway_recv_msg_count", "Number of messages sent over the gateway gauge", sc.gatewayLabels)
sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway gauge", sc.gatewayLabels)
sc.descs.GatewaySentMsgs = newPromDesc("gateway_sent_msgs_count", "Number of messages sent over the gateway counter", sc.gatewayLabels)
sc.descs.GatewaySentBytes = newPromDesc("gateway_sent_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels)
sc.descs.GatewayRecvMsgs = newPromDesc("gateway_recv_msg_count", "Number of messages sent over the gateway counter", sc.gatewayLabels)
sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels)
sc.descs.GatewayNumInbound = newPromDesc("gateway_inbound_msg_count", "Number inbound messages through the gateway gauge", sc.gatewayLabels)

// Jetstream Info
Expand Down Expand Up @@ -314,10 +320,11 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.accLeafCount = newPromDesc("account_leaf_count", "The number of leafnode connections to this account", accLabel)
sc.descs.accSubCount = newPromDesc("account_sub_count", "The number of subscriptions on this account", accLabel)

sc.descs.accBytesSent = newPromDesc("account_bytes_sent", "The number of bytes sent on this account", accLabel)
sc.descs.accBytesRecv = newPromDesc("account_bytes_recv", "The number of bytes received on this account", accLabel)
sc.descs.accMsgsSent = newPromDesc("account_msgs_sent", "The number of messages sent on this account", accLabel)
sc.descs.accMsgsRecv = newPromDesc("account_msgs_recv", "The number of messages received on this account", accLabel)
sc.descs.accTotalConnCount = newPromDesc("account_total_conn_count", "Total number of client connections serviced for this account", append(accLabel, "server"))
sc.descs.accBytesSent = newPromDesc("account_bytes_sent", "The number of bytes sent on this account", append(accLabel, "server"))
sc.descs.accBytesRecv = newPromDesc("account_bytes_recv", "The number of bytes received on this account", append(accLabel, "server"))
sc.descs.accMsgsSent = newPromDesc("account_msgs_sent", "The number of messages sent on this account", append(accLabel, "server"))
sc.descs.accMsgsRecv = newPromDesc("account_msgs_recv", "The number of messages received on this account", append(accLabel, "server"))

sc.descs.accJetstreamEnabled = newPromDesc("account_jetstream_enabled", "Whether JetStream is enabled or not for this account", accLabel)
sc.descs.accJetstreamMemoryUsed = newPromDesc("account_jetstream_memory_used", "The number of bytes used by JetStream memory", accLabel)
Expand Down Expand Up @@ -565,15 +572,24 @@ func (sc *StatzCollector) pollAccountInfo() error {
for accID, acc := range accs {
sts := accountStats{accountID: accID}

sts.leafCount = float64(acc.LeafNodes)
sts.subCount = float64(acc.NumSubs)
sts.connCount = float64(acc.Conns)
sts.bytesSent = float64(acc.Sent.Bytes)
sts.bytesRecv = float64(acc.Received.Bytes)
sts.msgsSent = float64(acc.Sent.Msgs)
sts.msgsRecv = float64(acc.Received.Msgs)
// gauge metrics get aggregated counts
sts.leafCount = float64(acc.Aggregated.LeafNodes)
sts.subCount = float64(acc.Aggregated.NumSubs)
sts.connCount = float64(acc.Aggregated.Conns)

// counters get mapped by server
sts.byServer = make(map[string]*accountStatByServer, len(acc.ByServer))
for serverID, serverAccStat := range acc.ByServer {
sts.byServer[serverID] = &accountStatByServer{
totalConnCount: float64(serverAccStat.TotalConns),
bytesSent: float64(serverAccStat.Sent.Bytes),
bytesRecv: float64(serverAccStat.Received.Bytes),
msgsSent: float64(serverAccStat.Sent.Msgs),
msgsRecv: float64(serverAccStat.Received.Msgs),
}
}

accStats[acc.Account] = sts
accStats[accID] = sts
}
jsInfos := sc.getJSInfos(nc)
for accID, jsInfo := range jsInfos {
Expand Down Expand Up @@ -692,7 +708,12 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe
return jsAccInfos
}

func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.AccountStat, error) {
type accStat struct {
Aggregated *server.AccountStat
ByServer map[string]*server.AccountStat
}

func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*accStat, error) {
req := &server.AccountStatzOptions{
IncludeUnused: true,
}
Expand Down Expand Up @@ -729,20 +750,27 @@ func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.Account
}
}

accStatz := make(map[string]*server.AccountStat)
accStats := make(map[string]*accStat)
for _, statz := range res {
for _, acc := range statz.Accounts {
accInfo, ok := accStatz[acc.Account]
accInfo, ok := accStats[acc.Account]
if !ok {
accStatz[acc.Account] = acc
accCopy := *acc
accStats[acc.Account] = &accStat{
Aggregated: &accCopy,
ByServer: map[string]*server.AccountStat{
statz.ID: acc,
},
}
continue
}
mergeAccountStats(accInfo, acc)
accStatz[acc.Account] = acc

mergeAccountStats(acc, accInfo.Aggregated)
accInfo.ByServer[statz.ID] = acc
}
}

return accStatz, nil
return accStats, nil
}

func mergeStreamDetails(from, to *server.AccountDetail) {
Expand Down Expand Up @@ -936,13 +964,13 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
metrics.newGaugeMetric(sc.descs.Cores, float64(sm.Stats.Cores), labels)
metrics.newGaugeMetric(sc.descs.CPU, sm.Stats.CPU, labels)
metrics.newGaugeMetric(sc.descs.Connections, float64(sm.Stats.Connections), labels)
metrics.newGaugeMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels)
metrics.newCounterMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels)
metrics.newGaugeMetric(sc.descs.ActiveAccounts, float64(sm.Stats.ActiveAccounts), labels)
metrics.newGaugeMetric(sc.descs.NumSubs, float64(sm.Stats.NumSubs), labels)
metrics.newGaugeMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels)
metrics.newGaugeMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels)
metrics.newGaugeMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels)
metrics.newGaugeMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels)
metrics.newCounterMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels)
metrics.newCounterMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels)
metrics.newCounterMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels)
metrics.newCounterMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels)
metrics.newGaugeMetric(sc.descs.SlowConsumers, float64(sm.Stats.SlowConsumers), labels)
metrics.newGaugeMetric(sc.descs.RTT, float64(sc.rtts[sm.Server.ID]), labels)
metrics.newGaugeMetric(sc.descs.Routes, float64(len(sm.Stats.Routes)), labels)
Expand Down Expand Up @@ -1015,19 +1043,19 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
}
for _, rs := range sm.Stats.Routes {
labels = sc.routeLabelValues(sm, rs)
metrics.newGaugeMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels)
metrics.newGaugeMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels)
metrics.newGaugeMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels)
metrics.newGaugeMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels)
metrics.newCounterMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels)
metrics.newCounterMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels)
metrics.newCounterMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels)
metrics.newCounterMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels)
metrics.newGaugeMetric(sc.descs.RoutePending, float64(rs.Pending), labels)
}

for _, gw := range sm.Stats.Gateways {
labels = sc.gatewayLabelValues(sm, gw)
metrics.newGaugeMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels)
metrics.newGaugeMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels)
metrics.newGaugeMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels)
metrics.newGaugeMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels)
metrics.newCounterMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels)
metrics.newCounterMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels)
metrics.newCounterMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels)
metrics.newCounterMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels)
metrics.newGaugeMetric(sc.descs.GatewayNumInbound, float64(gw.NumInbound), labels)
}
}
Expand All @@ -1042,10 +1070,14 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
metrics.newGaugeMetric(sc.descs.accLeafCount, stat.leafCount, id)
metrics.newGaugeMetric(sc.descs.accSubCount, stat.subCount, id)

metrics.newCounterMetric(sc.descs.accBytesSent, stat.bytesSent, id)
metrics.newCounterMetric(sc.descs.accBytesRecv, stat.bytesRecv, id)
metrics.newCounterMetric(sc.descs.accMsgsSent, stat.msgsSent, id)
metrics.newCounterMetric(sc.descs.accMsgsRecv, stat.msgsRecv, id)
for serverID, serverAccStats := range stat.byServer {
byServerLabels := append(id, serverID)
metrics.newCounterMetric(sc.descs.accTotalConnCount, serverAccStats.totalConnCount, byServerLabels)
metrics.newCounterMetric(sc.descs.accBytesSent, serverAccStats.bytesSent, byServerLabels)
metrics.newCounterMetric(sc.descs.accBytesRecv, serverAccStats.bytesRecv, byServerLabels)
metrics.newCounterMetric(sc.descs.accMsgsSent, serverAccStats.msgsSent, byServerLabels)
metrics.newCounterMetric(sc.descs.accMsgsRecv, serverAccStats.msgsRecv, byServerLabels)
}

metrics.newGaugeMetric(sc.descs.accJetstreamEnabled, stat.jetstreamEnabled, id)
metrics.newGaugeMetric(sc.descs.accJetstreamMemoryUsed, stat.jetstreamMemoryUsed, id)
Expand Down

0 comments on commit 0154a95

Please sign in to comment.