From 917a81bc42852c71bcfd9d0a20a481457262c324 Mon Sep 17 00:00:00 2001 From: PricelessRabbit Date: Thu, 5 Nov 2020 15:27:29 +0100 Subject: [PATCH 1/3] * enabled messages paho persistence * enabled persistent session to avoid paho to discard pending message if service restarted * updated readme Signed-off-by: PricelessRabbit --- .gitignore | 1 + README.md | 38 +++++++++++++++++++++----------------- cmd/main.go | 14 ++++++++++++-- configs/config.toml | 2 ++ pkg/config/config.go | 2 ++ pkg/export/service.go | 7 ++++++- 6 files changed, 44 insertions(+), 20 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..07ed7069 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build/* \ No newline at end of file diff --git a/README.md b/README.md index 3131f1c9..24b46ac9 100644 --- a/README.md +++ b/README.md @@ -42,9 +42,11 @@ By default `Export` service looks for config file at [`../configs/config.toml`][ password = "" ca = "ca.crt" cert = "thing.crt" - mtls = "false" + mtls = false priv_key = "thing.key" - retain = "false" + retain = false + persist = false + persist_dir = "../mqtt_persist" skip_tls_ver = "false" url = "tcp://mainflux.com:1883" @@ -110,21 +112,23 @@ Edit Mainflux [docker-compose.yml][docker-compose]. NATS section must look like Service will look for `config.toml` first and if not found it will be configured with env variables and new config file specified with `MF_EXPORT_CONFIG_FILE` will be saved with values populated from env vars. The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|-------------------------------|---------------------------------------------------------------|-----------------------| -| MF_NATS_URL | Nats url | localhost:4222 | -| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 | -| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | | -| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | | -| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | | -| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true | -| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false | -| MF_EXPORT_MQTT_CA | CA for tls | ca.crt | -| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt | -| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key | -| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 | -| MF_EXPORT_MQTT_RETAIN | MQTT retain | false | -| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml | +| Variable | Description | Default | +|-------------------------------|-----------------------------------------------------------------------------------------|-----------------------| +| MF_NATS_URL | Nats url | localhost:4222 | +| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 | +| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | | +| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | | +| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | | +| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true | +| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false | +| MF_EXPORT_MQTT_CA | CA for tls | ca.crt | +| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt | +| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key | +| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 | +| MF_EXPORT_MQTT_RETAIN | MQTT retain | false | +| MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false | +| MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false | +| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml | for values in environment variables to take effect make sure that there is no `MF_EXPORT_CONF` file. diff --git a/cmd/main.go b/cmd/main.go index b4e5cb15..9e046ae4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -41,7 +41,9 @@ const ( defMqttMTLS = "false" defMqttCA = "ca.crt" defMqttQoS = "0" - defMqttRetain = false + defMqttRetain = "false" + defMqttPersist = "false" + defMqttPersistDir = "../mqtt_persist" defMqttCert = "thing.cert" defMqttPrivKey = "thing.key" defConfigFile = "../configs/config.toml" @@ -63,6 +65,8 @@ const ( envMqttCA = "MF_EXPORT_MQTT_CA" envMqttQoS = "MF_EXPORT_MQTT_QOS" envMqttRetain = "MF_EXPORT_MQTT_RETAIN" + envMqttPersist = "MF_MQTT_PERSIST" + envMqttPersistDir = "MF_MQTT_PERSIST_FILE" envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT" envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK" envConfigFile = "MF_EXPORT_CONFIG_FILE" @@ -141,10 +145,14 @@ func loadConfigs() (exp.Config, error) { if err != nil { mqttMTLS = false } - mqttRetain, err := strconv.ParseBool(mainflux.Env(envMqttMTLS, defMqttMTLS)) + mqttRetain, err := strconv.ParseBool(mainflux.Env(envMqttRetain, defMqttRetain)) if err != nil { mqttRetain = false } + mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist)) + if err != nil { + mqttPersist = false + } q, err := strconv.ParseInt(mainflux.Env(envMqttQoS, defMqttQoS), 10, 64) if err != nil { @@ -167,6 +175,8 @@ func loadConfigs() (exp.Config, error) { Username: mainflux.Env(envMqttUsername, defMqttUsername), Retain: mqttRetain, + Persist: mqttPersist, + PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir), QoS: QoS, MTLS: mqttMTLS, SkipTLSVer: mqttSkipTLSVer, diff --git a/configs/config.toml b/configs/config.toml index f7fd688a..ef8e210d 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -19,6 +19,8 @@ File = "/configs/export/config.toml" password = "" qos = 0 retain = false + persist = false + persist_dir = "../mqtt_persist" skip_tls_ver = true username = "" diff --git a/pkg/config/config.go b/pkg/config/config.go index 9b64f69d..2bd1d726 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -31,6 +31,8 @@ type MQTT struct { MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"` SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"` Retain bool `json:"retain" toml:"retain" mapstructure:"retain"` + Persist bool `json:"persist" toml:"persist" mapstructure:"persist"` + PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"` QoS int `json:"qos" toml:"qos" mapstructure:"qos"` CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"` ClientCertPath string `json:"client_cert_path" toml:"client_cert_path" mapstructure:"client_cert_path"` diff --git a/pkg/export/service.go b/pkg/export/service.go index e45b4183..0affc60f 100644 --- a/pkg/export/service.go +++ b/pkg/export/service.go @@ -162,11 +162,16 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C opts := mqtt.NewClientOptions(). AddBroker(conf.MQTT.Host). SetClientID(e.id). - SetCleanSession(true). + SetCleanSession(false). SetAutoReconnect(true). SetOnConnectHandler(e.conn). SetConnectionLostHandler(e.lost) + if conf.MQTT.Persist { + store := mqtt.NewFileStore(conf.MQTT.PersistDir) + opts.SetStore(store) + } + if conf.MQTT.Username != "" && conf.MQTT.Password != "" { opts.SetUsername(conf.MQTT.Username) opts.SetPassword(conf.MQTT.Password) From b9975c65bae3f1f042f7a942806e092dbcb044cb Mon Sep 17 00:00:00 2001 From: PricelessRabbit Date: Wed, 25 Nov 2020 18:59:36 +0100 Subject: [PATCH 2/3] - added clean session to env variables - automatically put clean session to false if file message persistence enabled - updated doc Signed-off-by: PricelessRabbit --- README.md | 1 + cmd/main.go | 83 +++++++++++++++++++++++-------------------- configs/config.toml | 1 + pkg/config/config.go | 1 + pkg/export/service.go | 4 ++- 5 files changed, 51 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 24b46ac9..fb92a198 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ The service is configured using the environment variables presented in the follo | MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key | | MF_EXPORT_MQTT_QOS | MQTT QOS | 0 | | MF_EXPORT_MQTT_RETAIN | MQTT retain | false | +| MF_EXPORT_MQTT_CLEAN_SESSION | MQTT clean session | false | | MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false | | MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false | | MF_EXPORT_CONFIG_FILE | Configuration file | config.toml | diff --git a/cmd/main.go b/cmd/main.go index 9e046ae4..4a78749d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -29,24 +29,25 @@ import ( ) const ( - svcName = "export" - defNatsURL = nats.DefaultURL - defLogLevel = "debug" - defPort = "8170" - defMqttHost = "tcp://localhost:1883" - defMqttUsername = "" - defMqttPassword = "" - defMqttChannel = "" - defMqttSkipTLSVer = "true" - defMqttMTLS = "false" - defMqttCA = "ca.crt" - defMqttQoS = "0" - defMqttRetain = "false" - defMqttPersist = "false" - defMqttPersistDir = "../mqtt_persist" - defMqttCert = "thing.cert" - defMqttPrivKey = "thing.key" - defConfigFile = "../configs/config.toml" + svcName = "export" + defNatsURL = nats.DefaultURL + defLogLevel = "debug" + defPort = "8170" + defMqttHost = "tcp://localhost:1883" + defMqttUsername = "" + defMqttPassword = "" + defMqttChannel = "" + defMqttSkipTLSVer = "true" + defMqttMTLS = "false" + defMqttCA = "ca.crt" + defMqttQoS = "0" + defMqttRetain = "false" + defMqttCleanSession = "true" + defMqttPersist = "false" + defMqttPersistDir = "../mqtt_persist" + defMqttCert = "thing.cert" + defMqttPrivKey = "thing.key" + defConfigFile = "../configs/config.toml" defCacheURL = "localhost:6379" defCachePass = "" @@ -56,20 +57,21 @@ const ( envLogLevel = "MF_EXPORT_LOG_LEVEL" envPort = "MF_EXPORT_PORT" - envMqttHost = "MF_EXPORT_MQTT_HOST" - envMqttUsername = "MF_EXPORT_MQTT_USERNAME" - envMqttPassword = "MF_EXPORT_MQTT_PASSWORD" - envMqttChannel = "MF_EXPORT_MQTT_CHANNEL" - envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS" - envMqttMTLS = "MF_EXPORT_MQTT_MTLS" - envMqttCA = "MF_EXPORT_MQTT_CA" - envMqttQoS = "MF_EXPORT_MQTT_QOS" - envMqttRetain = "MF_EXPORT_MQTT_RETAIN" - envMqttPersist = "MF_MQTT_PERSIST" - envMqttPersistDir = "MF_MQTT_PERSIST_FILE" - envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT" - envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK" - envConfigFile = "MF_EXPORT_CONFIG_FILE" + envMqttHost = "MF_EXPORT_MQTT_HOST" + envMqttUsername = "MF_EXPORT_MQTT_USERNAME" + envMqttPassword = "MF_EXPORT_MQTT_PASSWORD" + envMqttChannel = "MF_EXPORT_MQTT_CHANNEL" + envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS" + envMqttMTLS = "MF_EXPORT_MQTT_MTLS" + envMqttCA = "MF_EXPORT_MQTT_CA" + envMqttQoS = "MF_EXPORT_MQTT_QOS" + envMqttRetain = "MF_EXPORT_MQTT_RETAIN" + envMqttCleanSession = "MF_EXPORT_MQTT_CLEAN_SESSION" + envMqttPersist = "MF_MQTT_PERSIST" + envMqttPersistDir = "MF_MQTT_PERSIST_FILE" + envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT" + envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK" + envConfigFile = "MF_EXPORT_CONFIG_FILE" envCacheURL = "MF_EXPORT_CACHE_URL" envCachePass = "MF_EXPORT_CACHE_PASS" @@ -149,6 +151,10 @@ func loadConfigs() (exp.Config, error) { if err != nil { mqttRetain = false } + mqttCleanSession, err := strconv.ParseBool(mainflux.Env(envMqttCleanSession, defMqttCleanSession)) + if err != nil { + mqttRetain = false + } mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist)) if err != nil { mqttPersist = false @@ -174,12 +180,13 @@ func loadConfigs() (exp.Config, error) { Password: mainflux.Env(envMqttPassword, defMqttPassword), Username: mainflux.Env(envMqttUsername, defMqttUsername), - Retain: mqttRetain, - Persist: mqttPersist, - PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir), - QoS: QoS, - MTLS: mqttMTLS, - SkipTLSVer: mqttSkipTLSVer, + Retain: mqttRetain, + CleanSession: mqttCleanSession, + Persist: mqttPersist, + PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir), + QoS: QoS, + MTLS: mqttMTLS, + SkipTLSVer: mqttSkipTLSVer, CAPath: mainflux.Env(envMqttCA, defMqttCA), ClientCertPath: mainflux.Env(envMqttCert, defMqttCert), diff --git a/configs/config.toml b/configs/config.toml index ef8e210d..61618efe 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -19,6 +19,7 @@ File = "/configs/export/config.toml" password = "" qos = 0 retain = false + clean_session = true persist = false persist_dir = "../mqtt_persist" skip_tls_ver = true diff --git a/pkg/config/config.go b/pkg/config/config.go index 2bd1d726..0b9fd857 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -31,6 +31,7 @@ type MQTT struct { MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"` SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"` Retain bool `json:"retain" toml:"retain" mapstructure:"retain"` + CleanSession bool `json:"clean_session" toml:"clean_session" mapstructure:"clean_session"` Persist bool `json:"persist" toml:"persist" mapstructure:"persist"` PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"` QoS int `json:"qos" toml:"qos" mapstructure:"qos"` diff --git a/pkg/export/service.go b/pkg/export/service.go index 0affc60f..ced8e00c 100644 --- a/pkg/export/service.go +++ b/pkg/export/service.go @@ -162,7 +162,7 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C opts := mqtt.NewClientOptions(). AddBroker(conf.MQTT.Host). SetClientID(e.id). - SetCleanSession(false). + SetCleanSession(conf.MQTT.CleanSession). SetAutoReconnect(true). SetOnConnectHandler(e.conn). SetConnectionLostHandler(e.lost) @@ -170,6 +170,8 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C if conf.MQTT.Persist { store := mqtt.NewFileStore(conf.MQTT.PersistDir) opts.SetStore(store) + //disable clean session because paho deletes stored messages when restarts + opts.SetCleanSession(false) } if conf.MQTT.Username != "" && conf.MQTT.Password != "" { From 5b75f38e20ab66b315d1aeae5427b0660828228e Mon Sep 17 00:00:00 2001 From: PricelessRabbit Date: Fri, 27 Nov 2020 09:05:59 +0100 Subject: [PATCH 3/3] - updated comment Signed-off-by: PricelessRabbit --- pkg/export/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/export/service.go b/pkg/export/service.go index ced8e00c..e32d3df2 100644 --- a/pkg/export/service.go +++ b/pkg/export/service.go @@ -170,7 +170,7 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C if conf.MQTT.Persist { store := mqtt.NewFileStore(conf.MQTT.PersistDir) opts.SetStore(store) - //disable clean session because paho deletes stored messages when restarts + // Paho deletes persisted messages when restarts with clean session, so it is set to false opts.SetCleanSession(false) }