Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Requeuer and Message Delay #469

Merged
merged 45 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bf59a61
Requeue PoC
m110 Aug 20, 2024
0c63143
More config
m110 Aug 21, 2024
7facfcc
More generic
m110 Aug 22, 2024
e7449b0
Update requeue.go
m110 Aug 22, 2024
556ea4c
select
m110 Aug 22, 2024
a560bcc
Merge branch 'requeue' of github.com:ThreeDotsLabs/watermill into req…
m110 Aug 22, 2024
7068466
Merge branch 'master' into requeue
m110 Aug 23, 2024
1dee3ef
Merge branch 'master' into requeue
m110 Sep 6, 2024
c9b6cc7
Change the API
m110 Sep 6, 2024
7fb304d
Add example
m110 Sep 6, 2024
6b932de
Move some things out
m110 Sep 6, 2024
f8e5a7b
Renames
m110 Sep 6, 2024
ef81333
Merge branch 'master' into requeue
m110 Oct 5, 2024
ca4cdb1
Cleanup
m110 Oct 5, 2024
7bc94df
Merge branch 'master' into requeue
m110 Oct 10, 2024
455e44d
WIP generic delay
m110 Oct 10, 2024
156ccf3
Examples
m110 Oct 10, 2024
42afaf5
Update interfaces
m110 Oct 10, 2024
6399a86
Update
m110 Oct 10, 2024
dccd1ce
Docs
m110 Oct 10, 2024
1cb09cd
Docs
m110 Oct 11, 2024
1b05cc3
Merge branch 'master' into requeue
m110 Oct 11, 2024
e65a78b
pq
m110 Oct 11, 2024
79fb3ef
Merge branch 'requeue' of github.com:ThreeDotsLabs/watermill into req…
m110 Oct 11, 2024
b9dd16f
Update pq models
m110 Oct 11, 2024
8a75bed
Naive requeue and ack
m110 Oct 11, 2024
472169f
Tests
m110 Oct 14, 2024
deee5dd
Context
m110 Oct 14, 2024
a44a025
Tests
m110 Oct 15, 2024
0344486
Refactor pq
m110 Oct 15, 2024
1b0dcb3
Dialogs
m110 Oct 15, 2024
7fcfe2e
Delayed Requeuer example
m110 Oct 16, 2024
bb0f5f3
rename
m110 Oct 16, 2024
dbbde2f
Update example
m110 Oct 22, 2024
a27c36f
Cleanup
m110 Oct 22, 2024
0973bb4
Adjust config
m110 Oct 22, 2024
b7fb98b
Allow no delay publisher
m110 Oct 22, 2024
30815a0
Rework into redis + forwarder
m110 Oct 23, 2024
840a10d
Merge branch 'master' into requeue
m110 Oct 23, 2024
eaa5ff9
Docs
m110 Oct 23, 2024
aaaa495
README
m110 Oct 24, 2024
cb068b7
Exponential backoff
m110 Oct 24, 2024
675f6b7
Update versions
m110 Oct 24, 2024
cc91ad3
Update README
m110 Oct 24, 2024
d9a7baa
Merge branch 'master' into requeue
m110 Oct 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions _examples/real-world-examples/delayed-messages/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
server:
image: golang:1.23
restart: unless-stopped
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app
command: go run main.go

postgres:
image: postgres:15
restart: unless-stopped
ports:
- 5432:5432
environment:
POSTGRES_USER: watermill
POSTGRES_DB: watermill
POSTGRES_PASSWORD: "password"
25 changes: 25 additions & 0 deletions _examples/real-world-examples/delayed-messages/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module delayed-messsages

go 1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241011082756-1cb09cdf7d08
github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.1-0.20241011111920-9b207ae2da1c
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.2
)

require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
)

replace github.com/ThreeDotsLabs/watermill => ../../../

replace github.com/ThreeDotsLabs/watermill-sql/v3 => ../../../../watermill-sql
100 changes: 100 additions & 0 deletions _examples/real-world-examples/delayed-messages/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906115919-b00f99a56e28 h1:Yw7uu/UI47iavtR16hO35RrX2g0PcPQOgUXSLgDQJAo=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906115919-b00f99a56e28/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906121028-f42a4cbde116 h1:x5TprGnmHVFMIvaQlM63IL4sJbKi5ivADe7con4lzXg=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906121028-f42a4cbde116/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906121631-62f93534b600 h1:OsYZlahUgx8wbKS2cLt5GIHw1xoHjGmeFE/CyNhy2hY=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906121631-62f93534b600/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906121730-aaf911c0adb7 h1:x7KZq9ZD/XEpMzgSlQbvOI/NcuKwBgeX5inhDYN7prw=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906121730-aaf911c0adb7/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906122508-e0de57ad1d8e h1:/ZDF+iVaHYaFkGbdDBG1+AkXOae69aQQvWx+SQ+04KE=
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.4-0.20240906122508-e0de57ad1d8e/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w=
github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag=
github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw=
github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4=
github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU=
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ=
github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg=
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
128 changes: 128 additions & 0 deletions _examples/real-world-examples/delayed-messages/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"context"
stdSQL "database/sql"
"fmt"
"math/rand"
"time"

"github.com/google/uuid"
_ "github.com/lib/pq"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/components/delay"
"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved
db, err := stdSQL.Open("postgres", "postgres://watermill:password@localhost:5432/watermill?sslmode=disable")
if err != nil {
panic(err)
}

logger := watermill.NewStdLogger(false, false)

publisher, err := sql.NewDelayedPostgreSQLPublisher(db, sql.DelayedPostgreSQLPublisherConfig{
DelayPublisherConfig: delay.PublisherConfig{
DefaultDelay: delay.For(10 * time.Second),
},
Logger: logger,
})
if err != nil {
panic(err)
}

eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
return params.EventName, nil
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
})
if err != nil {
panic(err)
}

router := message.NewDefaultRouter(logger)

eventProcessor, err := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return params.EventName, nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{
DeleteOnAck: true,
Logger: logger,
})
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
})
if err != nil {
panic(err)
}

err = eventProcessor.AddHandlers(
cqrs.NewEventHandler(
"OnOrderPlacedHandler",
func(ctx context.Context, event *OrderPlaced) error {
fmt.Println("Received order placed:", event.OrderID)

msg := cqrs.OriginalMessageFromCtx(ctx)
delayedUntil := msg.Metadata.Get(delay.DelayedUntilKey)
delayedFor := msg.Metadata.Get(delay.DelayedForKey)

if delayedUntil != "" {
fmt.Println("\tDelayed until:", delayedUntil)
fmt.Println("\tDelayed for:", delayedFor)
}

return nil
},
),
)
if err != nil {
panic(err)
}

go func() {
err = router.Run(context.Background())
if err != nil {
panic(err)
}
}()

<-router.Running()

msg := message.NewMessage(watermill.NewUUID(), nil)
delay.Message(msg, delay.For(10*time.Second))

for {
e := OrderPlaced{
OrderID: uuid.NewString(),
}

ctx := context.Background()

chance := rand.Intn(10)
if chance > 8 {
ctx = delay.WithContext(ctx, delay.Until(time.Now().UTC().Add(time.Minute)))
} else if chance > 5 {
ctx = delay.WithContext(ctx, delay.For(20*time.Second))
}

err = eventBus.Publish(ctx, e)
if err != nil {
panic(err)
}

time.Sleep(1 * time.Second)
}
}

type OrderPlaced struct {
OrderID string `json:"order_id"`
}
20 changes: 20 additions & 0 deletions _examples/real-world-examples/delayed-requeue/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
services:
server:
image: golang:1.23
#restart: unless-stopped
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app
#command: go run main.go
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved
command: echo

postgres:
image: postgres:15
restart: unless-stopped
ports:
- 5432:5432
environment:
POSTGRES_USER: watermill
POSTGRES_DB: watermill
POSTGRES_PASSWORD: "password"
27 changes: 27 additions & 0 deletions _examples/real-world-examples/delayed-requeue/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module delayed-poison-queue

go 1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241011082756-1cb09cdf7d08
github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.1-0.20241011111920-9b207ae2da1c
github.com/brianvoe/gofakeit/v6 v6.28.0
github.com/lib/pq v1.10.9
)

require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
)

replace github.com/ThreeDotsLabs/watermill => ../../../

replace github.com/ThreeDotsLabs/watermill-sql/v3 => ../../../../watermill-sql
Loading
Loading