Skip to content

Commit

Permalink
Bump watermill
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed May 31, 2024
1 parent 7299cda commit b8790a1
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 40 deletions.
88 changes: 53 additions & 35 deletions _examples/real-world-examples/server-sent-events-htmx/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,26 @@ type Routers struct {
func NewRouters(cfg config, repo *Repository) (Routers, error) {
logger := watermill.NewStdLogger(false, false)

publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
ProjectID: cfg.PubSubProjectID,
}, logger)
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: cfg.PubSubProjectID,
},
logger,
)
if err != nil {
return Routers{}, err
}

eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
return params.EventName, nil
eventBus, err := cqrs.NewEventBusWithConfig(
publisher,
cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
return params.EventName, nil
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
})
)
if err != nil {
return Routers{}, err
}
Expand All @@ -63,21 +69,27 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) {

eventsRouter.AddMiddleware(middleware.Recoverer)

eventProcessor, err := cqrs.NewEventProcessorWithConfig(eventsRouter, cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return params.EventName, nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
ProjectID: cfg.PubSubProjectID,
GenerateSubscriptionName: func(topic string) string {
return fmt.Sprintf("%v_%v", topic, params.HandlerName)
},
}, logger)
eventProcessor, err := cqrs.NewEventProcessorWithConfig(
eventsRouter,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return params.EventName, nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return googlecloud.NewSubscriber(
googlecloud.SubscriberConfig{
ProjectID: cfg.PubSubProjectID,
GenerateSubscriptionName: func(topic string) string {
return fmt.Sprintf("%v_%v", topic, params.HandlerName)
},
},
logger,
)
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
})
)
if err != nil {
return Routers{}, err
}
Expand Down Expand Up @@ -124,23 +136,29 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) {
return Routers{}, err
}

sseSubscriber, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
ProjectID: cfg.PubSubProjectID,
GenerateSubscriptionName: func(topic string) string {
return fmt.Sprintf("%v_%v", topic, watermill.NewShortUUID())
},
SubscriptionConfig: pubsub.SubscriptionConfig{
ExpirationPolicy: time.Hour * 24,
sseSubscriber, err := googlecloud.NewSubscriber(
googlecloud.SubscriberConfig{
ProjectID: cfg.PubSubProjectID,
GenerateSubscriptionName: func(topic string) string {
return fmt.Sprintf("%v_%v", topic, watermill.NewShortUUID())
},
SubscriptionConfig: pubsub.SubscriptionConfig{
ExpirationPolicy: time.Hour * 24,
},
},
}, logger)
logger,
)
if err != nil {
return Routers{}, err
}

sseRouter, err := http.NewSSERouter(http.SSERouterConfig{
UpstreamSubscriber: sseSubscriber,
Marshaler: http.BytesSSEMarshaler{},
}, logger)
sseRouter, err := http.NewSSERouter(
http.SSERouterConfig{
UpstreamSubscriber: sseSubscriber,
Marshaler: http.StringSSEMarshaler{},
},
logger,
)
if err != nil {
return Routers{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
cloud.google.com/go/pubsub v1.36.1
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-googlecloud v1.2.0
github.com/ThreeDotsLabs/watermill-http/v2 v2.1.1-0.20240420100340-dbcfd0330cce
github.com/ThreeDotsLabs/watermill-http/v2 v2.2.0
github.com/a-h/templ v0.2.663
github.com/kelseyhightower/envconfig v1.4.0
github.com/labstack/echo/v4 v4.11.4
Expand Down
4 changes: 2 additions & 2 deletions _examples/real-world-examples/server-sent-events-htmx/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill-googlecloud v1.2.0 h1:HEpoDiIU4K/wGVADVkuIcAUZSjvtAWS3lqiyk+LVt3E=
github.com/ThreeDotsLabs/watermill-googlecloud v1.2.0/go.mod h1:cLifbk8zyGZ4MEqCmjE5vyTQOnTmy0kZUlrMzoixBBk=
github.com/ThreeDotsLabs/watermill-http/v2 v2.1.1-0.20240420100340-dbcfd0330cce h1:Ffot3YR4kPrl3CdwDk9Q4YKV6b/8+LBWxHjmZmTv5Wc=
github.com/ThreeDotsLabs/watermill-http/v2 v2.1.1-0.20240420100340-dbcfd0330cce/go.mod h1:Ily2cdTrvlj9dLB8BqAy5OIzWwS8B5WrFh+Ey7wOgQQ=
github.com/ThreeDotsLabs/watermill-http/v2 v2.2.0 h1:SyKKGRCDMvbExmm4AQ9tPDbMm+uyExBSiKl5DH24V1U=
github.com/ThreeDotsLabs/watermill-http/v2 v2.2.0/go.mod h1:Ily2cdTrvlj9dLB8BqAy5OIzWwS8B5WrFh+Ey7wOgQQ=
github.com/a-h/templ v0.2.663 h1:aa0WMm27InkYHGjimcM7us6hJ6BLhg98ZbfaiDPyjHE=
github.com/a-h/templ v0.2.663/go.mod h1:SA7mtYwVEajbIXFRh3vKdYm/4FYyLQAtPH1+KxzGPA8=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
Expand Down
4 changes: 2 additions & 2 deletions _examples/real-world-examples/server-sent-events-htmx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewHandler(repo *Repository, eventBus *cqrs.EventBus, sseRouter watermillht
e.Use(middleware.Recover())
e.Use(middleware.Logger())

e.GET("/", h.Index)
e.GET("/", h.AllPosts)
e.POST("/posts/:id/reactions", h.AddReaction)
e.GET("/posts/:id/stats", func(c echo.Context) error {
postID := c.Param("id")
Expand All @@ -48,7 +48,7 @@ func NewHandler(repo *Repository, eventBus *cqrs.EventBus, sseRouter watermillht
return e
}

func (h Handler) Index(c echo.Context) error {
func (h Handler) AllPosts(c echo.Context) error {
posts, err := h.repo.AllPosts(c.Request().Context())
if err != nil {
return err
Expand Down

0 comments on commit b8790a1

Please sign in to comment.