Skip to content

Commit

Permalink
chore: minor fixes (#1501)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Co-authored-by: Sidhant Kohli <[email protected]>
Co-authored-by: jyu6 <[email protected]>
  • Loading branch information
3 people authored Feb 15, 2024
1 parent 392315a commit 2254f2a
Show file tree
Hide file tree
Showing 29 changed files with 195 additions and 119 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/imdario/mergo v0.3.13
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/nats.go v1.32.0
github.com/numaproj/numaflow-go v0.5.3-0.20231213060340-dbd9016bbcb6
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
Expand All @@ -44,7 +44,7 @@ require (
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.17.0
golang.org/x/crypto v0.18.0
golang.org/x/net v0.17.0
golang.org/x/oauth2 v0.7.0
golang.org/x/sync v0.3.0
Expand Down Expand Up @@ -153,7 +153,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
Expand Down Expand Up @@ -191,8 +191,8 @@ require (
go.mongodb.org/mongo-driver v1.7.3 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,10 @@ github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus=
github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -936,8 +936,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -1150,13 +1150,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
24 changes: 24 additions & 0 deletions pkg/isb/stores/jetstream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,27 @@ var isbWriteTimeout = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "write_timeout_total",
Help: "Total number of jetstream write timeouts",
}, []string{"buffer"})

// isbWriteTime is a histogram to Observe isb write time for a buffer
var isbWriteTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "isb_jetstream",
Name: "write_time_total",
Help: "Processing times of Writes for jetstream",
Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10),
}, []string{"buffer"})

// isbReadTime is a histogram to Observe isb read time for a buffer
var isbReadTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "isb_jetstream",
Name: "read_time_total",
Help: "Processing times of reads for jetstream",
Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10),
}, []string{"buffer"})

// isbAckTime is a histogram to Observe isb ack time for a buffer
var isbAckTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "isb_jetstream",
Name: "ack_time_total",
Help: "Processing times of acks for jetstream",
Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10),
}, []string{"buffer"})
12 changes: 10 additions & 2 deletions pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type jetStreamReader struct {
name string
stream string
subject string
client *jsclient.NATSClient
client *jsclient.Client
sub *nats.Subscription
opts *readOptions
inProgressTickDuration time.Duration
Expand All @@ -45,7 +45,7 @@ type jetStreamReader struct {
}

// NewJetStreamBufferReader is used to provide a new JetStream buffer reader connection
func NewJetStreamBufferReader(ctx context.Context, client *jsclient.NATSClient, name, stream, subject string, partitionIdx int32, opts ...ReadOption) (isb.BufferReader, error) {
func NewJetStreamBufferReader(ctx context.Context, client *jsclient.Client, name, stream, subject string, partitionIdx int32, opts ...ReadOption) (isb.BufferReader, error) {
log := logging.FromContext(ctx).With("bufferReader", name).With("stream", stream).With("subject", subject)
o := defaultReadOptions()
for _, opt := range opts {
Expand Down Expand Up @@ -113,6 +113,10 @@ func (jr *jetStreamReader) Pending(_ context.Context) (int64, error) {
}

func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
labels := map[string]string{"buffer": jr.GetName()}
defer func(t time.Time) {
isbReadTime.With(labels).Observe(float64(time.Since(t).Microseconds()))
}(time.Now())
var err error
var result []*isb.ReadMessage
msgs, err := jr.sub.Fetch(int(count), nats.MaxWait(jr.opts.readTimeOut))
Expand Down Expand Up @@ -144,6 +148,10 @@ func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMess
}

func (jr *jetStreamReader) Ack(_ context.Context, offsets []isb.Offset) []error {
labels := map[string]string{"buffer": jr.GetName()}
defer func(t time.Time) {
isbAckTime.With(labels).Observe(float64(time.Since(t).Microseconds()))
}(time.Now())
errs := make([]error, len(offsets))
done := make(chan struct{})
wg := &sync.WaitGroup{}
Expand Down
23 changes: 19 additions & 4 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ type jetStreamWriter struct {
partitionIdx int32
stream string
subject string
client *jsclient.NATSClient
client *jsclient.Client
js nats.JetStreamContext
opts *writeOptions
isFull *atomic.Bool
log *zap.SugaredLogger
}

// NewJetStreamBufferWriter is used to provide a new instance of JetStreamBufferWriter
func NewJetStreamBufferWriter(ctx context.Context, client *jsclient.NATSClient, name, stream, subject string, partitionIdx int32, opts ...WriteOption) (isb.BufferWriter, error) {
func NewJetStreamBufferWriter(ctx context.Context, client *jsclient.Client, name, stream, subject string, partitionIdx int32, opts ...WriteOption) (isb.BufferWriter, error) {
o := defaultWriteOptions()
for _, opt := range opts {
if opt != nil {
Expand Down Expand Up @@ -197,7 +197,13 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message,
Subject: jw.subject,
Data: payload,
}
if future, err := jw.js.PublishMsgAsync(m, nats.MsgId(message.Header.ID)); err != nil { // nats.MsgId() is for exactly-once writing
var pubOpts []nats.PubOpt
// nats.MsgId() is for exactly-once writing
// we don't need to set MsgId for control message
if message.Header.Kind != isb.WMB {
pubOpts = append(pubOpts, nats.MsgId(message.Header.ID))
}
if future, err := jw.js.PublishMsgAsync(m, pubOpts...); err != nil { // nats.MsgId() is for exactly-once writing
errs[index] = err
} else {
futures[index] = future
Expand Down Expand Up @@ -244,6 +250,9 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message,
}

func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message, errs []error, metricsLabels map[string]string) ([]isb.Offset, []error) {
defer func(t time.Time) {
isbWriteTime.With(metricsLabels).Observe(float64(time.Since(t).Microseconds()))
}(time.Now())
var writeOffsets = make([]isb.Offset, len(messages))
wg := new(sync.WaitGroup)
for index, msg := range messages {
Expand All @@ -259,7 +268,13 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
Subject: jw.subject,
Data: payload,
}
if pubAck, err := jw.js.PublishMsg(m, nats.MsgId(message.Header.ID), nats.AckWait(2*time.Second)); err != nil { // nats.MsgId() is for exactly-once writing
pubOpts := []nats.PubOpt{nats.AckWait(2 * time.Second)}
// nats.MsgId() is for exactly-once writing
// we don't need to set MsgId for control message
if message.Header.Kind != isb.WMB {
pubOpts = append(pubOpts, nats.MsgId(message.Header.ID))
}
if pubAck, err := jw.js.PublishMsg(m, pubOpts...); err != nil {
errs[idx] = err
isbWriteErrors.With(metricsLabels).Inc()
} else {
Expand Down
7 changes: 3 additions & 4 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ import (

type jetStreamSvc struct {
pipelineName string

jsClient *jsclient.NATSClient
js nats.JetStreamContext
jsClient *jsclient.Client
js nats.JetStreamContext
}

func NewISBJetStreamSvc(pipelineName string, opts ...JSServiceOption) (ISBService, error) {
Expand All @@ -51,7 +50,7 @@ func NewISBJetStreamSvc(pipelineName string, opts ...JSServiceOption) (ISBServic

type JSServiceOption func(*jetStreamSvc) error

func WithJetStreamClient(jsClient *jsclient.NATSClient) JSServiceOption {
func WithJetStreamClient(jsClient *jsclient.Client) JSServiceOption {
return func(j *jetStreamSvc) error {
j.jsClient = jsClient
return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,13 @@ var (
Help: "Total number of closed windows",
}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})
)

// Ctrl Message Metric
var (
// CtrlMessagesCount is used to indicate the number of total ctrl messages sent.
CtrlMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "idlemanager",
Name: "ctrl_msg_total",
Help: "Total number of ctrl Messages sent",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
)
6 changes: 3 additions & 3 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
for toVertexName, toVertexBuffer := range df.toBuffers {
if publisher, ok := df.wmPublishers[toVertexName]; ok {
for _, bufferPartition := range toVertexBuffer {
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, dfv1.VertexTypeReduceUDF, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, df.vertexName, df.pipelineName, dfv1.VertexTypeReduceUDF, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
}
}
}
Expand All @@ -237,7 +237,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
for toVertexName, toVertexBuffer := range df.toBuffers {
if publisher, ok := df.wmPublishers[toVertexName]; ok {
for _, bufferPartition := range toVertexBuffer {
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, dfv1.VertexTypeReduceUDF, wmb.Watermark(watermark))
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, df.vertexName, df.pipelineName, dfv1.VertexTypeReduceUDF, df.vertexReplica, wmb.Watermark(watermark))
}
}
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage)
for toVertexName, toVertexBuffer := range df.toBuffers {
if publisher, ok := df.wmPublishers[toVertexName]; ok {
for _, bufferPartition := range toVertexBuffer {
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, dfv1.VertexTypeReduceUDF, wmb.Watermark(watermark))
idlehandler.PublishIdleWatermark(ctx, bufferPartition, publisher, df.idleManager, df.log, df.vertexName, df.pipelineName, dfv1.VertexTypeReduceUDF, df.vertexReplica, wmb.Watermark(watermark))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/pnf/processandforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (p *processAndForward) publishWM(ctx context.Context, endTime time.Time) {
for index, activePartition := range activeWatermarkBuffers[toVertexName] {
if !activePartition {
if publisher, ok := p.wmPublishers[toVertexName]; ok {
idlehandler.PublishIdleWatermark(ctx, p.toBuffers[toVertexName][index], publisher, p.idleManager, p.log, dfv1.VertexTypeReduceUDF, wm)
idlehandler.PublishIdleWatermark(ctx, p.toBuffers[toVertexName][index], publisher, p.idleManager, p.log, p.vertexName, p.pipelineName, dfv1.VertexTypeReduceUDF, p.vertexReplica, wm)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/shared/clients/nats/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewClientPool(ctx context.Context, opts ...Option) (*ClientPool, error) {

// NextAvailableClient returns the next available NATS client. This code need not be optimized because this is
// not in hot code path. It is only during connection creation/startup.
func (p *ClientPool) NextAvailableClient() *NATSClient {
func (p *ClientPool) NextAvailableClient() *Client {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -61,7 +61,7 @@ func (p *ClientPool) NextAvailableClient() *NATSClient {
// get the first client and move it to the back of the list
front := p.clients.Front()
p.clients.MoveToBack(front)
return front.Value.(*NATSClient)
return front.Value.(*Client)
}

// CloseAll closes all the clients in the pool
Expand All @@ -70,6 +70,6 @@ func (p *ClientPool) CloseAll() {
defer p.mutex.Unlock()

for e := p.clients.Front(); e != nil; e = e.Next() {
e.Value.(*NATSClient).Close()
e.Value.(*Client).Close()
}
}
Loading

0 comments on commit 2254f2a

Please sign in to comment.