Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 22, 2023
1 parent f921292 commit b16fb47
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 33 deletions.
4 changes: 1 addition & 3 deletions _examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ go 1.20

replace github.com/centrifugal/centrifuge => ../

replace github.com/centrifugal/protocol => ../../protocol

require (
github.com/FZambia/tarantool v0.2.2
github.com/centrifugal/centrifuge v0.8.2
github.com/centrifugal/protocol v0.10.0
github.com/centrifugal/protocol v0.10.1-0.20231021200749-cfcfeb7e5916
github.com/cristalhq/jwt/v3 v3.0.0
github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5
github.com/gin-contrib/sessions v0.0.3
Expand Down
2 changes: 2 additions & 0 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20181103040241-659414
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/centrifugal/protocol v0.10.1-0.20231021200749-cfcfeb7e5916 h1:bzd1LAWZQlpKYRwVEOJ9V4XwcjUda4EzTMOHS0Ir0Uw=
github.com/centrifugal/protocol v0.10.1-0.20231021200749-cfcfeb7e5916/go.mod h1:Tq5I1mBpLHkLxNM9gfb3Gth+sTE2kKU5hH3cVgmVs9s=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
Expand Down
10 changes: 4 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec
var frameType protocol.FrameType
defer func() {
channelGroup := "_"
if metricChannel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesReceived {
if metricChannel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForTransportMessagesReceived {
channelGroup = c.node.config.GetChannelNamespaceLabel(metricChannel)
}
c.node.metrics.incTransportMessagesReceived(c.transport.Name(), frameType, channelGroup, cmdSize)
Expand Down Expand Up @@ -1188,8 +1188,6 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec

if cmd.Connect != nil {
frameType = protocol.FrameTypeConnect
} else if cmd.Ping != nil {
frameType = protocol.FrameTypePing
} else if cmd.Subscribe != nil {
metricChannel = cmd.Subscribe.Channel
frameType = protocol.FrameTypeSubscribe
Expand Down Expand Up @@ -1296,7 +1294,7 @@ func (c *Client) writeEncodedCommandReply(ch string, frameType protocol.FrameTyp
}

item := queue.Item{Data: replyData, FrameType: frameType}
if ch != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
if ch != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForTransportMessagesSent {
item.Channel = ch
}

Expand Down Expand Up @@ -2451,7 +2449,7 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
MaxQueueSize: c.node.config.ClientQueueMaxSize,
WriteFn: func(item queue.Item) error {
channelGroup := "_"
if item.Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
if item.Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForTransportMessagesSent {
channelGroup = c.node.config.GetChannelNamespaceLabel(item.Channel)
}
c.node.metrics.incTransportMessagesSent(c.transport.Name(), item.FrameType, channelGroup, len(item.Data))
Expand Down Expand Up @@ -2488,7 +2486,7 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
}
messages = append(messages, items[i].Data)
channelGroup := "_"
if items[i].Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
if items[i].Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForTransportMessagesSent {
channelGroup = c.node.config.GetChannelNamespaceLabel(items[i].Channel)
}
c.node.metrics.incTransportMessagesSent(c.transport.Name(), items[i].FrameType, channelGroup, len(items[i].Data))
Expand Down
25 changes: 14 additions & 11 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type Config struct {
// for client-side subscription requests.
// Zero value means 255.
ChannelMaxLength int
// MetricsNamespace is a Prometheus metrics namespace to use for internal metrics.
// If not set then the default namespace name `centrifuge` will be used.
MetricsNamespace string
// HistoryMaxPublicationLimit allows limiting the maximum number of publications to be
// asked over client API history call. This is useful when you have large streams and
// want to prevent a massive number of missed messages to be sent to a client when
Expand Down Expand Up @@ -92,19 +89,25 @@ type Config struct {
// When zero Centrifuge uses default 30 days which we believe is more than enough
// for most use cases.
HistoryMetaTTL time.Duration
// GetChannelNamespaceLabel if set will be used by Centrifuge to extract channel_group
// label for channel related metrics. Make sure to maintain low cardinality of returned
// values to avoid issues with Prometheus performance. This function may introduce
// sufficient overhead since it's called in hot paths - so should be fast.

// MetricsNamespace is a Prometheus metrics namespace to use for internal metrics.
// If not set then the default namespace name "centrifuge" will be used.
MetricsNamespace string
// GetChannelNamespaceLabel if set will be used by Centrifuge to extract channel_namespace
// label for some channel related metrics. Make sure to maintain low cardinality of returned
// values to avoid issues with Prometheus performance. This function may introduce sufficient
// overhead since it's called in hot paths - so it should be fast. Usage of this function for
// specific metrics must be enabled over ChannelNamespaceLabelForTransportMessagesSent and
// ChannelNamespaceLabelForTransportMessagesReceived options.
GetChannelNamespaceLabel func(channel string) string
// ChannelNamespaceLabelForMessagesSent enables using GetChannelNamespaceLabel
// ChannelNamespaceLabelForTransportMessagesSent enables using GetChannelNamespaceLabel
// function for extracting channel_namespace label for transport_messages_sent and
// transport_messages_sent_size.
ChannelNamespaceLabelForMessagesSent bool
// ChannelNamespaceLabelForMessagesReceived enables using GetChannelNamespaceLabel
ChannelNamespaceLabelForTransportMessagesSent bool
// ChannelNamespaceLabelForTransportMessagesReceived enables using GetChannelNamespaceLabel
// function for extracting channel_namespace label for transport_messages_received and
// transport_messages_received_size.
ChannelNamespaceLabelForMessagesReceived bool
ChannelNamespaceLabelForTransportMessagesReceived bool
}

const (
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replace github.com/centrifugal/protocol => ../protocol

require (
github.com/FZambia/eagle v0.1.0
github.com/centrifugal/protocol v0.10.0
github.com/centrifugal/protocol v0.10.1-0.20231021200749-cfcfeb7e5916
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0
github.com/igm/sockjs-go/v3 v3.0.2
Expand Down
20 changes: 8 additions & 12 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type metrics struct {
commandDurationPresence prometheus.Observer
commandDurationPresenceStats prometheus.Observer
commandDurationHistory prometheus.Observer
commandDurationPing prometheus.Observer
commandDurationSend prometheus.Observer
commandDurationRPC prometheus.Observer
commandDurationRefresh prometheus.Observer
Expand All @@ -103,8 +102,6 @@ func (m *metrics) observeCommandDuration(frameType protocol.FrameType, d time.Du
observer = m.commandDurationPresenceStats
case protocol.FrameTypeHistory:
observer = m.commandDurationHistory
case protocol.FrameTypePing:
observer = m.commandDurationPing
case protocol.FrameTypeSend:
observer = m.commandDurationSend
case protocol.FrameTypeRPC:
Expand Down Expand Up @@ -319,21 +316,21 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
Namespace: metricsNamespace,
Subsystem: "node",
Name: "messages_sent_count",
Help: "Number of messages sent.",
Help: "Number of messages sent by node to broker.",
}, []string{"type"})

m.messagesReceivedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "messages_received_count",
Help: "Number of messages received from engine.",
Help: "Number of messages received from broker.",
}, []string{"type"})

m.actionCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "action_count",
Help: "Number of node actions called.",
Help: "Number of various actions called.",
}, []string{"action"})

m.numClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Expand Down Expand Up @@ -361,7 +358,7 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
Namespace: metricsNamespace,
Subsystem: "node",
Name: "num_nodes",
Help: "Number of nodes in cluster.",
Help: "Number of nodes in the cluster.",
})

m.buildInfoGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -426,28 +423,28 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_sent",
Help: "Number of messages sent over specific transport.",
Help: "Number of messages sent to client connections over specific transport.",
}, []string{"transport", "frame_type", "channel_namespace"})

m.transportMessagesSentSize = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_sent_size",
Help: "Size in bytes of messages sent over specific transport.",
Help: "Size in bytes of messages sent to client connections over specific transport.",
}, []string{"transport", "frame_type", "channel_namespace"})

m.transportMessagesReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_received",
Help: "Number of messages received over specific transport.",
Help: "Number of messages received from client connections over specific transport.",
}, []string{"transport", "frame_type", "channel_namespace"})

m.transportMessagesReceivedSize = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_received_size",
Help: "Size in bytes of messages received over specific transport.",
Help: "Size in bytes of messages received from client connections over specific transport.",
}, []string{"transport", "frame_type", "channel_namespace"})

m.messagesReceivedCountPublication = m.messagesReceivedCount.WithLabelValues("publication")
Expand Down Expand Up @@ -494,7 +491,6 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
m.commandDurationPresence = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypePresence))
m.commandDurationPresenceStats = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypePresenceStats))
m.commandDurationHistory = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypeHistory))
m.commandDurationPing = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypePing))
m.commandDurationSend = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypeSend))
m.commandDurationRPC = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypeRPC))
m.commandDurationRefresh = m.commandDurationSummary.WithLabelValues(labelForMethod(protocol.FrameTypeRefresh))
Expand Down

0 comments on commit b16fb47

Please sign in to comment.