Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RetryClient seemingly breaks MQTT delivery guarantees #238

Open
jameshalsall opened this issue Sep 30, 2023 · 3 comments
Open

RetryClient seemingly breaks MQTT delivery guarantees #238

jameshalsall opened this issue Sep 30, 2023 · 3 comments
Labels
enhancement New feature or request

Comments

@jameshalsall
Copy link

I believe the RetryClient may be violating the QoS guarantees of MQTT. When we send a message with QoS 1 (at least once delivery) via the RetryClient's Publish() method we usually get a nil return value, indicating that the message was successfully sent to the broker.

However, this is not the case. Under certain network conditions the client does not get a PUBACK from the broker immediately, but the client returns from the Publish() call and all we can do as the caller is assume it has been sent OK. What is actually happening is the message is held on the client for potential retry as it waits for the PUBACK asynchronously.

The Client does not suffer from this problem as the signaller blocks waiting for the PUBACK message back from the broker before it returns from the Publish() method call.

Any application using this library needs to know when the RetryClient's messages have successfully arrived at the broker. However, there isn't really an easy way to do this in the current API. I would expect some kind of Wait() function that could be used during application shutdown, but I can't see one?

I am currently trying something like this, but it's fairly hard to prove it's correct as network conditions for this issue are hard to simulate:

NOTE: This is a heavily redacted and simplified case from my application, so some references to our types are left behind but it should still be clear enough to illustrate the point.

package main

import (
	"context"
	"embed"
	"encoding/json"
	"fmt"
	"os"
	"time"

	"github.com/at-wat/mqtt-go"

	"acme/pkg/config"
	"acme/pkg/mqtt/platform/types"
	"acme/container"
)

func main() {
	stopCh := make(chan os.Signal, 1)
	diContainer := container.New(context.Background(), config.New(), stopCh)

	// this is a higher order type to represent an AWS IoT connection using https://github.com/seqsense/aws-iot-device-sdk-go
	iot := diContainer.IotConnection()
	iot.Connect(context.Background(), "blah")

	dev := iot.Device()
	retryer, ok := dev.(mqtt.Retryer)
	if !ok {
		panic("not a retryer")
	}

	err := dev.Publish(context.Background(), &mqtt.Message{
		Topic:   "update/status",
		QoS:     mqtt.QoS1,
		Payload: []byte("my messsage bytes"),
	})
	if err != nil {
		panic(err)
	}

	// simulate a signal sent to stop the application
	stopCh <- os.Interrupt

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	for {
		select {
		case <-ctx.Done():
			fmt.Println("timed out waiting for client to publish")
			return
		default:
		}
		
		stats := retryer.Stats()

		if stats.QueuedRetries == 0 && stats.QueuedTasks == 0 {
			fmt.Println("client has published everything, stopping")
			break
		}
	}
}

Having something like this built into the client behind a Wait() method may be a nice addition 👍🏻

What are your thoughts on this problem? Is this a sensible solution? If so, I'm happy to contribute this in a PR.

@at-wat
Copy link
Owner

at-wat commented Oct 2, 2023

@jameshalsall If you want to synchronously PUBLISH over RetryClient, use

retrycli.Client().Publish(...)

(c *RetryClient) Client() *BaseClient returns underlying synchronous client, so Publish will be blocked until receiving PUBACK, or return error if context is timed-out.

@jameshalsall
Copy link
Author

jameshalsall commented Oct 2, 2023

That's good to know 👍🏻 thanks.

I think there is still a benefit from having the RetryClient's current publish behaviour but also being able to wait until all queued tasks/retries are sent before shutting down.

I think a Wait(ctx context.Context) <-chan struct{} method on the Client interface would be a good addition for developers using this library (only RetryClient would need to implement a waiting behaviour, other clients can just finish immediately). For now, I've implemented this in my own application anyway but something this lib could take care of for future use maybe. Let me know 👍🏻

@at-wat
Copy link
Owner

at-wat commented Oct 2, 2023

I think there is still a benefit from having the RetryClient's current publish behaviour but also being able to wait until all queued tasks/retries are sent before shutting down.

It makes sense to me and adding Wait(ctx context.Context) <-chan struct{} to Retryer interface would be nice

@at-wat at-wat added the enhancement New feature or request label Oct 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants