From 7082896aa771943711f9fdf902e6d0ea23253bd7 Mon Sep 17 00:00:00 2001 From: Dmitry Kotik <7944694+dkotik@users.noreply.github.com> Date: Fri, 3 Feb 2023 10:11:14 -0500 Subject: [PATCH 1/3] Add generic jsoendpoint adapter --- components/jsonendpoint/endpoint.go | 122 +++++++++++++++++++++++ components/jsonendpoint/endpoint_test.go | 103 +++++++++++++++++++ go.mod | 2 +- 3 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 components/jsonendpoint/endpoint.go create mode 100644 components/jsonendpoint/endpoint_test.go diff --git a/components/jsonendpoint/endpoint.go b/components/jsonendpoint/endpoint.go new file mode 100644 index 000000000..60c487928 --- /dev/null +++ b/components/jsonendpoint/endpoint.go @@ -0,0 +1,122 @@ +/* + +Package jsonendpoint provides a generic adapter that converts a validated JSON request into a [message.Message] published to a certain topic. Use together with an HTTP router to build gateways to any [message.Publisher]. + +# Usage Example + +Imagine a fairly standard situation of collecting newsletter signups from a Restful API: + + type NewsletterSignup struct { + Name string + Email string + } + + // Validate satisfies [Validator] interface. + func (n *NewsletterSignup) Validate() error { + if n.Name == "" || n.Email == "" { + return errors.New("newsletter signup requires both name and email address") + } + return nil + } + + func main() { + pubSub := gochannel.NewGoChannel(gochannel.Config{ + OutputChannelBuffer: 100, + Persistent: true, + }, + watermill.NewStdLogger(true, true), + ) + + endpoint := New( + 1024*1024, // HTTP readLimit + func(m *NewsletterSignup) (*message.Message, error) { // converter + payload, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to encode: %w", err) + } + return message.NewMessage(watermill.NewUUID(), payload), nil + }, + "newsletter/signup", // Watermill topic + pubSub) + + // ... setup HTTP server and router + router.Post("/api/v1/newsletter/signup", endpoint) + } + +JSON HTTP post requests that hit "/api/v1/newsletter/signup" will get parsed, validated, and converted into a [message.Message]. +*/ +package jsonendpoint + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + + "github.com/ThreeDotsLabs/watermill/message" +) + +// Validatable is a generic interface that requires type T to be a pointer and implement the Validate method. It complements the adapter definitions. See . +type Validatable[T any] interface { + *T + Validate() error +} + +// New creates an adapter that converts an HTTP request to a [message.Message] and sends it to the [message.Publisher] topic. Enforces message validation on the generic type. +func New[T any, P Validatable[T]](readLimit int64, converter func(P) (*message.Message, error), topic string, p message.Publisher) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + var ( + in = new(T) //*IN + err error + ) + defer func() { + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{ + "Error": err.Error(), + }) + } + }() + + if r.Method != http.MethodPost { + err = fmt.Errorf("request method %q is not supported", r.Method) + return + } + + reader := http.MaxBytesReader(w, r.Body, readLimit) + defer reader.Close() + + err = json.NewDecoder(reader).Decode(in) + if err != nil { + r.Body.Close() + err = errors.New("JSON decoding failure: " + err.Error()) + return + } + r.Body.Close() + + if in == nil { + err = errors.New("no post data provided") + return + } + + if err = P(in).Validate(); err != nil { + err = fmt.Errorf("failed to validate: %w", err) + return + } + + message, err := converter(in) + if err != nil { + err = fmt.Errorf("failed to construct a message: %w", err) + return + } + + if err = p.Publish(topic, message); err != nil { + err = fmt.Errorf("publisher rejected the message: %w", err) + return + } + + io.WriteString(w, "nil") + } +} diff --git a/components/jsonendpoint/endpoint_test.go b/components/jsonendpoint/endpoint_test.go new file mode 100644 index 000000000..f1bea6219 --- /dev/null +++ b/components/jsonendpoint/endpoint_test.go @@ -0,0 +1,103 @@ +package jsonendpoint + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +const testTopic = "testTopic" + +type testMessage struct { + Title string + Number int +} + +func (m *testMessage) ToBytes(t *testing.T) []byte { + result, err := json.Marshal(m) + if err != nil { + t.Fatalf("failed to encode to bytes: %v", err) + } + return result +} + +func (m *testMessage) Validate() error { + if m.Title == "" { + return errors.New("title is required") + } + if m.Number == 0 { + return errors.New("number is required") + } + return nil +} + +func TestEndpointCreationIntegration(t *testing.T) { + pubSub := gochannel.NewGoChannel(gochannel.Config{ + OutputChannelBuffer: 100, + Persistent: false, + }, + watermill.NewStdLogger(true, true), + ) + + m := &testMessage{ + Title: "title", + Number: 9, + } + + // err := pubSub.Publish(testTopic, m) + // if err != nil { + // t.Fatalf("cannot publish the test message: %v", err) + // } + + messages, err := pubSub.Subscribe(context.Background(), testTopic) + if err != nil { + t.Fatalf("failed to subscribe: %v", err) + } + + endpoint := New(999999, func(m *testMessage) (*message.Message, error) { + payload, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to encode: %w", err) + } + return message.NewMessage(watermill.NewUUID(), payload), nil + }, testTopic, pubSub) + + request := httptest.NewRequest( + http.MethodPost, + "/url/path", + bytes.NewReader(m.ToBytes(t)), + ) + w := httptest.NewRecorder() + endpoint(w, request) + res := w.Result() + defer res.Body.Close() + + data, err := ioutil.ReadAll(res.Body) + if res.StatusCode != http.StatusOK { + t.Logf("Data: %s", data) + t.Fatalf("HTTP test request failed: status code is not OK: %d", res.StatusCode) + } + + replayedMessage := <-messages + var rm *testMessage + if err = json.Unmarshal(replayedMessage.Payload, &rm); err != nil { + t.Fatalf("cannot decode replayed message: %v", err) + } + + if rm.Title != m.Title { + t.Fatalf("title mismatch: %q vs %q", rm.Title, rm.Title) + } + if rm.Number != m.Number { + t.Fatalf("number mismatch: %q vs %q", rm.Number, rm.Number) + } +} diff --git a/go.mod b/go.mod index 694d1b08c..d59afca59 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ThreeDotsLabs/watermill -go 1.17 +go 1.18 require ( github.com/cenkalti/backoff/v3 v3.2.2 From 18d1587905871f3482973bc4e16891358cec6082 Mon Sep 17 00:00:00 2001 From: Dmitry Kotik <7944694+dkotik@users.noreply.github.com> Date: Sat, 4 Feb 2023 12:09:22 -0500 Subject: [PATCH 2/3] Add request context to message for JSON endpoint adapter --- components/jsonendpoint/endpoint.go | 1 + 1 file changed, 1 insertion(+) diff --git a/components/jsonendpoint/endpoint.go b/components/jsonendpoint/endpoint.go index 60c487928..150cbea21 100644 --- a/components/jsonendpoint/endpoint.go +++ b/components/jsonendpoint/endpoint.go @@ -111,6 +111,7 @@ func New[T any, P Validatable[T]](readLimit int64, converter func(P) (*message.M err = fmt.Errorf("failed to construct a message: %w", err) return } + message.SetContext(r.Context()) if err = p.Publish(topic, message); err != nil { err = fmt.Errorf("publisher rejected the message: %w", err) From 5922e8f45ce69cdc0bc526c4bf75f01061da0a80 Mon Sep 17 00:00:00 2001 From: Dmitry Kotik <7944694+dkotik@users.noreply.github.com> Date: Tue, 14 Feb 2023 09:58:16 -0500 Subject: [PATCH 3/3] Add new message UUID to jsonendpoint JSON response Previously, the endpoint adapter would return `nil` on success, which is invalid JavaScript Object Notation format. It should be "{}". Added UUID to make the adapter useful in more use cases. For example, the returned value can serve as a shopping cart order ID. --- components/jsonendpoint/endpoint.go | 14 ++++++++++--- components/jsonendpoint/endpoint_test.go | 26 ++++++++++++++++++------ 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/components/jsonendpoint/endpoint.go b/components/jsonendpoint/endpoint.go index 150cbea21..4d2496ba3 100644 --- a/components/jsonendpoint/endpoint.go +++ b/components/jsonendpoint/endpoint.go @@ -51,7 +51,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net/http" "github.com/ThreeDotsLabs/watermill/message" @@ -111,13 +110,22 @@ func New[T any, P Validatable[T]](readLimit int64, converter func(P) (*message.M err = fmt.Errorf("failed to construct a message: %w", err) return } - message.SetContext(r.Context()) + message.SetContext(r.Context()) if err = p.Publish(topic, message); err != nil { err = fmt.Errorf("publisher rejected the message: %w", err) return } - io.WriteString(w, "nil") + response, err := json.Marshal(map[string]string{ + "UUID": message.UUID, + }) + if err != nil { + r.Body.Close() + err = errors.New("JSON encoding failure: " + err.Error()) + return + } + + _, err = w.Write(response) } } diff --git a/components/jsonendpoint/endpoint_test.go b/components/jsonendpoint/endpoint_test.go index f1bea6219..a7664e0be 100644 --- a/components/jsonendpoint/endpoint_test.go +++ b/components/jsonendpoint/endpoint_test.go @@ -79,13 +79,9 @@ func TestEndpointCreationIntegration(t *testing.T) { ) w := httptest.NewRecorder() endpoint(w, request) - res := w.Result() - defer res.Body.Close() - data, err := ioutil.ReadAll(res.Body) - if res.StatusCode != http.StatusOK { - t.Logf("Data: %s", data) - t.Fatalf("HTTP test request failed: status code is not OK: %d", res.StatusCode) + if err = validateEndpointResponse(w.Result()); err != nil { + t.Fatalf("HTTP test request failed: %v", err) } replayedMessage := <-messages @@ -101,3 +97,21 @@ func TestEndpointCreationIntegration(t *testing.T) { t.Fatalf("number mismatch: %q vs %q", rm.Number, rm.Number) } } + +func validateEndpointResponse(response *http.Response) (err error) { + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if response.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP test request failed: status code is not OK: %d", response.StatusCode) + } + + var values map[string]string + if err = json.Unmarshal(data, &values); err != nil { + return fmt.Errorf("JSON decoding failure for %q: %w", data, err) + } + if _, ok := values["UUID"]; !ok { + return errors.New("response does not contain new message UUID") + } + return nil +}