-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add PostgreSQLQueue and DelayedPostgreSQL Pub/Sub (#34)
* Added `PostgreSQLQueueSchema` and `PostgreSQLQueueOffsetsAdapter`. * This schema is similar to the original one, except it allows filtering messages with a custom `WHERE` clause. * It doesn't support customer groups. * It allows deleting messages from the table after acking (optional). * Added `NewDelayedPostgreSQLPublisher` and `NewDelayedPostgresSQLSubscriber` * They work on top of the conditional schema and use the `delay` component implemented in ThreeDotsLabs/watermill#469 * The idea is to receive messages with a delay or at a given time in a simple way. * Reworked the `SchemaAdapter` and `OffsetsAdapter` interfaces **(Breaking change -> will bump the major to v4)**. * We had a few instances where some details have been missing from one of the interface methods. It makes it difficult to extend this library with new features, as each will require a major version bump, and we want to avoid it if possible. * To mitigate this, we're moving to a `params` struct passed to the methods instead of raw arguments. It adds some verbosity but allows extending the params if needed with no breaking changes to the library. * The methods now also return errors. * Migration: if you don't use a custom schema/offsets adapter, you don't need to do anything. If you do, change the methods to implement the new interfaces. Use values from the `params` argument and return errors.
- Loading branch information
Showing
24 changed files
with
1,190 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package main | ||
|
||
import ( | ||
stdSQL "database/sql" | ||
"fmt" | ||
"os" | ||
"time" | ||
|
||
driver "github.com/go-sql-driver/mysql" | ||
_ "github.com/lib/pq" | ||
) | ||
|
||
func main() { | ||
for i := 0; i < 10; i++ { | ||
err := tryConnecting() | ||
if err == nil { | ||
os.Exit(0) | ||
} | ||
|
||
time.Sleep(1 * time.Second) | ||
} | ||
|
||
fmt.Println("Failed to connect") | ||
os.Exit(1) | ||
} | ||
|
||
func tryConnecting() error { | ||
err := connectToMySQL() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = connectToPostgreSQL() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func connectToMySQL() error { | ||
addr := os.Getenv("WATERMILL_TEST_MYSQL_HOST") | ||
if addr == "" { | ||
addr = "localhost" | ||
} | ||
conf := driver.NewConfig() | ||
conf.Net = "tcp" | ||
conf.User = "root" | ||
conf.Addr = addr | ||
|
||
conf.DBName = "watermill" | ||
|
||
db, err := stdSQL.Open("mysql", conf.FormatDSN()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = db.Ping() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func connectToPostgreSQL() error { | ||
addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") | ||
if addr == "" { | ||
addr = "localhost" | ||
} | ||
|
||
connStr := fmt.Sprintf("postgres://watermill:password@%s/watermill?sslmode=disable", addr) | ||
db, err := stdSQL.Open("postgres", connStr) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = db.Ping() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package sql | ||
|
||
import ( | ||
"database/sql" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill/components/delay" | ||
"github.com/ThreeDotsLabs/watermill/message" | ||
) | ||
|
||
type DelayedPostgreSQLPublisherConfig struct { | ||
// DelayPublisherConfig is a configuration for the delay.Publisher. | ||
DelayPublisherConfig delay.PublisherConfig | ||
|
||
// OverridePublisherConfig allows overriding the default PublisherConfig. | ||
OverridePublisherConfig func(config *PublisherConfig) error | ||
|
||
Logger watermill.LoggerAdapter | ||
} | ||
|
||
func (c *DelayedPostgreSQLPublisherConfig) setDefaults() { | ||
if c.Logger == nil { | ||
c.Logger = watermill.NopLogger{} | ||
} | ||
} | ||
|
||
// NewDelayedPostgreSQLPublisher creates a new Publisher that stores messages in PostgreSQL with a delay. | ||
// The delay can be set per message with the Watermill's components/delay metadata. | ||
func NewDelayedPostgreSQLPublisher(db *sql.DB, config DelayedPostgreSQLPublisherConfig) (message.Publisher, error) { | ||
config.setDefaults() | ||
|
||
publisherConfig := PublisherConfig{ | ||
SchemaAdapter: PostgreSQLQueueSchema{}, | ||
AutoInitializeSchema: true, | ||
} | ||
|
||
if config.OverridePublisherConfig != nil { | ||
err := config.OverridePublisherConfig(&publisherConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
var publisher message.Publisher | ||
var err error | ||
|
||
publisher, err = NewPublisher(db, publisherConfig, config.Logger) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
publisher, err = delay.NewPublisher(publisher, config.DelayPublisherConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return publisher, nil | ||
} | ||
|
||
type DelayedPostgreSQLSubscriberConfig struct { | ||
// OverrideSubscriberConfig allows overriding the default SubscriberConfig. | ||
OverrideSubscriberConfig func(config *SubscriberConfig) error | ||
|
||
// DeleteOnAck deletes the message from the queue when it's acknowledged. | ||
DeleteOnAck bool | ||
|
||
// AllowNoDelay allows receiving messages without the delay metadata. | ||
// By default, such messages will be skipped. | ||
// If set to true, messages without delay metadata will be received immediately. | ||
AllowNoDelay bool | ||
|
||
Logger watermill.LoggerAdapter | ||
} | ||
|
||
func (c *DelayedPostgreSQLSubscriberConfig) setDefaults() { | ||
if c.Logger == nil { | ||
c.Logger = watermill.NopLogger{} | ||
} | ||
} | ||
|
||
// NewDelayedPostgreSQLSubscriber creates a new Subscriber that reads messages from PostgreSQL with a delay. | ||
// The delay can be set per message with the Watermill's components/delay metadata. | ||
func NewDelayedPostgreSQLSubscriber(db *sql.DB, config DelayedPostgreSQLSubscriberConfig) (message.Subscriber, error) { | ||
config.setDefaults() | ||
|
||
where := fmt.Sprintf("(metadata->>'%v')::timestamptz < NOW() AT TIME ZONE 'UTC'", delay.DelayedUntilKey) | ||
|
||
if config.AllowNoDelay { | ||
where += fmt.Sprintf(` OR (metadata->>'%s') IS NULL`, delay.DelayedUntilKey) | ||
} | ||
|
||
schemaAdapter := delayedPostgreSQLSchemaAdapter{ | ||
PostgreSQLQueueSchema: PostgreSQLQueueSchema{ | ||
GenerateWhereClause: func(params GenerateWhereClauseParams) (string, []any) { | ||
return where, nil | ||
}, | ||
}, | ||
} | ||
|
||
subscriberConfig := SubscriberConfig{ | ||
SchemaAdapter: schemaAdapter, | ||
OffsetsAdapter: PostgreSQLQueueOffsetsAdapter{ | ||
DeleteOnAck: config.DeleteOnAck, | ||
}, | ||
InitializeSchema: true, | ||
} | ||
|
||
if config.OverrideSubscriberConfig != nil { | ||
err := config.OverrideSubscriberConfig(&subscriberConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
sub, err := NewSubscriber(db, subscriberConfig, config.Logger) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return sub, nil | ||
} | ||
|
||
type delayedPostgreSQLSchemaAdapter struct { | ||
PostgreSQLQueueSchema | ||
} | ||
|
||
func (a delayedPostgreSQLSchemaAdapter) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) { | ||
queries, err := a.PostgreSQLQueueSchema.SchemaInitializingQueries(params) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
table := a.MessagesTable(params.Topic) | ||
index := fmt.Sprintf(`"%s_delayed_until_idx"`, strings.ReplaceAll(table, `"`, "")) | ||
|
||
queries = append(queries, Query{ | ||
Query: fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s ((metadata->>'%s'))`, index, table, delay.DelayedUntilKey), | ||
}) | ||
|
||
return queries, nil | ||
} |
Oops, something went wrong.