Skip to content

Commit

Permalink
add producerMgr pub with timeout func
Browse files Browse the repository at this point in the history
  • Loading branch information
liushengjie committed Jul 23, 2019
1 parent 1b759be commit 2b60aae
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
23 changes: 23 additions & 0 deletions producer.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,29 @@ func (self *TopicProducerMgr) MultiPublishAndTrace(topic string, traceIDList []u
return id, offset, rawSize, err
}

func (self *TopicProducerMgr) PublishWithTimeout(topic string, body []byte, time time.Duration) error {
_, err := self.doCommandWithTimeoutAndRetry(topic, nil, time, 2, func(pid int) (*Command, error) {
if pid < 0 || pid == OLD_VERSION_PID {
// pub to old nsqd that not support partition
return Publish(topic, body), nil
}
return PublishWithPart(topic, strconv.Itoa(pid), body), nil
})
return err
}

func (self *TopicProducerMgr) PublishWithTimeoutAndPartition(topic string, partition int, body []byte, time time.Duration) error {
_, err := self.doCommandWithTimeoutAndRetryAndPartition(topic, partition, time, 2, func(pid int) (*Command, error) {
if pid < 0 || pid == OLD_VERSION_PID {
// pub to old nsqd that not support partition
return Publish(topic, body), nil
}
return PublishWithPart(topic, strconv.Itoa(pid), body), nil
})
return err
}


func (self *TopicProducerMgr) doCommandWithTimeoutAndRetryAndPartition(topic string, partition int, timeout time.Duration, maxRetry uint32,
commandFunc func(pid int) (*Command, error)) ([]byte, error) {
return self.doCommandWithTimeoutAndRetryTemplate(topic, timeout, maxRetry, commandFunc,
Expand Down
53 changes: 53 additions & 0 deletions producer_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -559,6 +560,58 @@ func TestTopicProducerMgrPubBackground(t *testing.T) {
time.Sleep(time.Second * 5)
}

func TestProducerMgrPublishWithTimeout(t *testing.T) {

topicName := "topic_producer_mgr_pub_timeout" + strconv.Itoa(int(time.Now().Unix()))
EnsureTopic(t, 4150, topicName, 0)

testingTimeout = true
testingSendTimeout = true
defer func() {
testingSendTimeout = false
testingTimeout = false
}()
time.Sleep(time.Second)

config := NewConfig()
config.PubTimeout = time.Second * 2
config.PubMaxBackgroundRetry = 10
config.PubStrategy = PubRR
w, err := NewTopicProducerMgr([]string{topicName}, config)
if err != nil {
t.Fatal(err)
}
w.SetLogger(newTestLogger(t), LogLevelDebug)
lookupList := make([]string, 0)
lookupList = append(lookupList, "127.0.0.1:4161")
w.AddLookupdNodes(lookupList)
defer w.Stop()

moreBeginTime := time.Now().Unix()
err = w.PublishWithTimeout(topicName, []byte("publish_test_case"), time.Second * 3)
if time.Now().Unix() - moreBeginTime <= 3 {
t.Fatalf("func timeout param have not override config timeout")
}
checkDeadlineExceeded(err, t)


lessBeginTime := time.Now().Unix()
err = w.PublishWithTimeout(topicName, []byte("publish_test_case"), time.Second)
if time.Now().Unix() - lessBeginTime >= 2 {
t.Fatalf("func timeout param have not override config timeout")
}
checkDeadlineExceeded(err, t)
}

func checkDeadlineExceeded(err error, t *testing.T) {
if err != context.DeadlineExceeded {
if err != nil && strings.Contains(err.Error(), "deadline exceeded") {
return
}
t.Fatalf("error %s", err)
}
}

func TestTopicProducerMgrPubOrdered(t *testing.T) {
stopC := make(chan struct{})
var meta metaInfo
Expand Down

0 comments on commit 2b60aae

Please sign in to comment.