Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use generic CQRS handlers in all examples #519

Merged
merged 1 commit into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 8 additions & 50 deletions _examples/basic/5-cqrs-protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,7 @@ type BookRoomHandler struct {
eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}

// NewCommand returns type of command which this handle should handle. It must be a pointer.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c is always the type returned by `NewCommand`, so casting is always safe
cmd := c.(*BookRoom)

func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error {
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10

Expand Down Expand Up @@ -70,18 +58,7 @@ type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
// this name is passed to EventsSubscriberConstructor and used to generate queue name
return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, event *RoomBooked) error {
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
Expand All @@ -100,13 +77,7 @@ func (o OrderBeerHandler) HandlerName() string {
return "OrderBeerHandler"
}

func (o OrderBeerHandler) NewCommand() interface{} {
return &OrderBeer{}
}

func (o OrderBeerHandler) Handle(ctx context.Context, c interface{}) error {
cmd := c.(*OrderBeer)

func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
if rand.Int63n(10) == 0 {
// sometimes there is no beer left, command will be retried
return fmt.Errorf("no beer left for room %s, please try later", cmd.RoomId)
Expand Down Expand Up @@ -137,22 +108,11 @@ func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
// this name is passed to EventsSubscriberConstructor and used to generate queue name
return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked) error {
// Handle may be called concurrently, so it need to be thread safe.
b.lock.Lock()
defer b.lock.Unlock()

event := e.(*RoomBooked)

// When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
// GoChannel Pub/Sub provides exactly-once delivery,
// but let's make this example ready for other Pub/Sub implementations.
Expand Down Expand Up @@ -323,19 +283,17 @@ func main() {
}

err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
cqrs.NewCommandHandler("BookRoomHandler", BookRoomHandler{eventBus}.Handle),
cqrs.NewCommandHandler("OrderBeerHandler", OrderBeerHandler{eventBus}.Handle),
)
if err != nil {
panic(err)
}

err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},

NewBookingsFinancialReport(),

cqrs.NewGroupEventHandler(OrderBeerOnRoomBooked{commandBus}.Handle),
cqrs.NewGroupEventHandler(NewBookingsFinancialReport().Handle),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
Expand Down
56 changes: 8 additions & 48 deletions _examples/real-world-examples/consumer-groups/crm-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToCRM8Handler{},
AddToSupport8Handler{},
cqrs.NewEventHandler("AddToCRM-8", AddToCRM8Handler{}.Handle),
cqrs.NewEventHandler("AddToSupport-8", AddToSupport8Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -120,8 +120,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToCRM9Handler{},
AddToSupport9Handler{},
cqrs.NewEventHandler("AddToCRM-9", AddToCRM9Handler{}.Handle),
cqrs.NewEventHandler("AddToSupport-9", AddToSupport9Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -152,71 +152,31 @@ func main() {

type AddToCRM8Handler struct{}

func (h AddToCRM8Handler) HandlerName() string {
return "AddToCRM-8"
}

func (h AddToCRM8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToCRM8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToCRM8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the CRM")

return nil
}

type AddToSupport8Handler struct{}

func (h AddToSupport8Handler) HandlerName() string {
return "AddToSupport-8"
}

func (h AddToSupport8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToSupport8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToSupport8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the support channel")

return nil
}

type AddToCRM9Handler struct{}

func (h AddToCRM9Handler) HandlerName() string {
return "AddToCRM-9"
}

func (h AddToCRM9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToCRM9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToCRM9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the CRM")

return nil
}

type AddToSupport9Handler struct{}

func (h AddToSupport9Handler) HandlerName() string {
return "AddToSupport-9"
}

func (h AddToSupport9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToSupport9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToSupport9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the support channel")

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToPromotionsList8Handler{},
AddToNewsList8Handler{},
cqrs.NewEventHandler("AddToPromotionsList-8", AddToPromotionsList8Handler{}.Handle),
cqrs.NewEventHandler("AddToNewsList-8", AddToNewsList8Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -345,8 +345,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToPromotionsList9Handler{},
AddToNewsList9Handler{},
cqrs.NewEventHandler("AddToPromotionsList-9", AddToPromotionsList9Handler{}.Handle),
cqrs.NewEventHandler("AddToNewsList-9", AddToNewsList9Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -381,13 +381,7 @@ func (h AddToPromotionsList8Handler) HandlerName() string {
return "AddToPromotionsList-8"
}

func (h AddToPromotionsList8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToPromotionsList8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToPromotionsList8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.Marketing {
return nil
}
Expand All @@ -399,17 +393,7 @@ func (h AddToPromotionsList8Handler) Handle(ctx context.Context, event interface

type AddToNewsList8Handler struct{}

func (h AddToNewsList8Handler) HandlerName() string {
return "AddToNewsList-8"
}

func (h AddToNewsList8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToNewsList8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToNewsList8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.News {
return nil
}
Expand All @@ -420,17 +404,7 @@ func (h AddToNewsList8Handler) Handle(ctx context.Context, event interface{}) er

type AddToPromotionsList9Handler struct{}

func (h AddToPromotionsList9Handler) HandlerName() string {
return "AddToPromotionsList-9"
}

func (h AddToPromotionsList9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToPromotionsList9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToPromotionsList9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.Marketing {
return nil
}
Expand All @@ -442,17 +416,7 @@ func (h AddToPromotionsList9Handler) Handle(ctx context.Context, event interface

type AddToNewsList9Handler struct{}

func (h AddToNewsList9Handler) HandlerName() string {
return "AddToNewsList-9"
}

func (h AddToNewsList9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToNewsList9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToNewsList9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.News {
return nil
}
Expand Down
Loading