Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Requeuer and Message Delay #469

Merged
merged 45 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bf59a61
Requeue PoC
m110 Aug 20, 2024
0c63143
More config
m110 Aug 21, 2024
7facfcc
More generic
m110 Aug 22, 2024
e7449b0
Update requeue.go
m110 Aug 22, 2024
556ea4c
select
m110 Aug 22, 2024
a560bcc
Merge branch 'requeue' of github.com:ThreeDotsLabs/watermill into req…
m110 Aug 22, 2024
7068466
Merge branch 'master' into requeue
m110 Aug 23, 2024
1dee3ef
Merge branch 'master' into requeue
m110 Sep 6, 2024
c9b6cc7
Change the API
m110 Sep 6, 2024
7fb304d
Add example
m110 Sep 6, 2024
6b932de
Move some things out
m110 Sep 6, 2024
f8e5a7b
Renames
m110 Sep 6, 2024
ef81333
Merge branch 'master' into requeue
m110 Oct 5, 2024
ca4cdb1
Cleanup
m110 Oct 5, 2024
7bc94df
Merge branch 'master' into requeue
m110 Oct 10, 2024
455e44d
WIP generic delay
m110 Oct 10, 2024
156ccf3
Examples
m110 Oct 10, 2024
42afaf5
Update interfaces
m110 Oct 10, 2024
6399a86
Update
m110 Oct 10, 2024
dccd1ce
Docs
m110 Oct 10, 2024
1cb09cd
Docs
m110 Oct 11, 2024
1b05cc3
Merge branch 'master' into requeue
m110 Oct 11, 2024
e65a78b
pq
m110 Oct 11, 2024
79fb3ef
Merge branch 'requeue' of github.com:ThreeDotsLabs/watermill into req…
m110 Oct 11, 2024
b9dd16f
Update pq models
m110 Oct 11, 2024
8a75bed
Naive requeue and ack
m110 Oct 11, 2024
472169f
Tests
m110 Oct 14, 2024
deee5dd
Context
m110 Oct 14, 2024
a44a025
Tests
m110 Oct 15, 2024
0344486
Refactor pq
m110 Oct 15, 2024
1b0dcb3
Dialogs
m110 Oct 15, 2024
7fcfe2e
Delayed Requeuer example
m110 Oct 16, 2024
bb0f5f3
rename
m110 Oct 16, 2024
dbbde2f
Update example
m110 Oct 22, 2024
a27c36f
Cleanup
m110 Oct 22, 2024
0973bb4
Adjust config
m110 Oct 22, 2024
b7fb98b
Allow no delay publisher
m110 Oct 22, 2024
30815a0
Rework into redis + forwarder
m110 Oct 23, 2024
840a10d
Merge branch 'master' into requeue
m110 Oct 23, 2024
eaa5ff9
Docs
m110 Oct 23, 2024
aaaa495
README
m110 Oct 24, 2024
cb068b7
Exponential backoff
m110 Oct 24, 2024
675f6b7
Update versions
m110 Oct 24, 2024
cc91ad3
Update README
m110 Oct 24, 2024
d9a7baa
Merge branch 'master' into requeue
m110 Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion _examples/pubsubs/googlecloud/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2Aawl
cloud.google.com/go/iam v0.10.0 h1:fpP/gByFs6US1ma53v7VxhvbJpO2Aapng6wabJ99MuI=
cloud.google.com/go/iam v0.10.0/go.mod h1:nXAECrMt2qHpF6RZUZseteD6QyanL68reN4OXPw0UWM=
cloud.google.com/go/kms v1.6.0 h1:OWRZzrPmOZUzurjI2FBGtgY2mB1WaJkqhw6oIwSj0Yg=
cloud.google.com/go/kms v1.6.0/go.mod h1:Jjy850yySiasBUDi6KFUwUv2n1+o7QZFyuUJg6OgjA0=
cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
cloud.google.com/go/longrunning v0.3.0/go.mod h1:qth9Y41RRSUE69rDcOn6DdK3HfQfsUI0YSmW3iIlLJc=
cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -137,7 +139,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57 h1:vArvWooPH749rNHpBGgVl+U9B9dATjiEhJzcWGlovNs=
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
google.golang.org/grpc v1.21.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
Expand Down
64 changes: 64 additions & 0 deletions components/requeue/requeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package requeue

import (
"context"
"time"

"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/pkg/errors"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

const RequeueDelayKey = "requeue_delay"

type Requeue struct {
router *message.Router
}

type Config struct {
Subscriber message.Subscriber
Topic string
Publisher message.Publisher
Delay time.Duration
}

func NewRequeue(
config Config,
logger watermill.LoggerAdapter,
) (*Requeue, error) {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
return nil, err
}

router.AddNoPublisherHandler(
"requeue",
config.Topic,
config.Subscriber,
func(msg *message.Message) error {
time.Sleep(config.Delay)

topic := msg.Metadata.Get(middleware.PoisonedTopicKey)
if topic == "" {
return errors.New("missing requeue topic")
}

err := config.Publisher.Publish(topic, msg)
if err != nil {
return err
}

return nil
},
)

return &Requeue{
router: router,
}, nil
}

func (r *Requeue) Run(ctx context.Context) error {
return r.router.Run(ctx)
}
97 changes: 97 additions & 0 deletions components/requeue/requeue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package requeue

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestRequeue(t *testing.T) {
logger := watermill.NewStdLogger(false, false)

pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

requeue, err := NewRequeue(Config{
Subscriber: pubSub,
Topic: "requeue",
Publisher: pubSub,
Delay: time.Millisecond * 200,
}, logger)
require.NoError(t, err)

go func() {
err := requeue.Run(context.Background())
require.NoError(t, err)
}()

router, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

pq, err := middleware.PoisonQueue(pubSub, "requeue")
m110 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

router.AddMiddleware(pq)

receivedMessages := make(chan int, 10)

counter := 0

router.AddNoPublisherHandler(
"test",
"test",
pubSub,
func(msg *message.Message) error {
i, err := strconv.Atoi(string(msg.Payload))
if err != nil {
return err
}

counter++

if counter < 10 && i%2 == 0 {
return errors.New("error")
}

receivedMessages <- i

return nil
},
)

go func() {
err := router.Run(context.Background())
require.NoError(t, err)
}()

time.Sleep(time.Second)

for i := 0; i < 10; i++ {
msg := message.NewMessage(watermill.NewUUID(), []byte(fmt.Sprint(i)))
err := pubSub.Publish("test", msg)
require.NoError(t, err)
}

var received []int

timeout := false
for !timeout {
select {
case i := <-receivedMessages:
received = append(received, i)
case <-time.After(5 * time.Second):
timeout = true
break
}
}

require.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, received)
}
Loading