diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..07ed7069 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build/* \ No newline at end of file diff --git a/README.md b/README.md index 3131f1c9..fb92a198 100644 --- a/README.md +++ b/README.md @@ -42,9 +42,11 @@ By default `Export` service looks for config file at [`../configs/config.toml`][ password = "" ca = "ca.crt" cert = "thing.crt" - mtls = "false" + mtls = false priv_key = "thing.key" - retain = "false" + retain = false + persist = false + persist_dir = "../mqtt_persist" skip_tls_ver = "false" url = "tcp://mainflux.com:1883" @@ -110,21 +112,24 @@ Edit Mainflux [docker-compose.yml][docker-compose]. NATS section must look like Service will look for `config.toml` first and if not found it will be configured with env variables and new config file specified with `MF_EXPORT_CONFIG_FILE` will be saved with values populated from env vars. The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|-------------------------------|---------------------------------------------------------------|-----------------------| -| MF_NATS_URL | Nats url | localhost:4222 | -| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 | -| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | | -| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | | -| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | | -| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true | -| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false | -| MF_EXPORT_MQTT_CA | CA for tls | ca.crt | -| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt | -| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key | -| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 | -| MF_EXPORT_MQTT_RETAIN | MQTT retain | false | -| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml | +| Variable | Description | Default | +|-------------------------------|-----------------------------------------------------------------------------------------|-----------------------| +| MF_NATS_URL | Nats url | localhost:4222 | +| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 | +| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | | +| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | | +| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | | +| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true | +| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false | +| MF_EXPORT_MQTT_CA | CA for tls | ca.crt | +| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt | +| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key | +| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 | +| MF_EXPORT_MQTT_RETAIN | MQTT retain | false | +| MF_EXPORT_MQTT_CLEAN_SESSION | MQTT clean session | false | +| MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false | +| MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false | +| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml | for values in environment variables to take effect make sure that there is no `MF_EXPORT_CONF` file. diff --git a/cmd/main.go b/cmd/main.go index e4a6ca45..4a78749d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -29,22 +29,25 @@ import ( ) const ( - svcName = "export" - defNatsURL = nats.DefaultURL - defLogLevel = "debug" - defPort = "8170" - defMqttHost = "tcp://localhost:1883" - defMqttUsername = "" - defMqttPassword = "" - defMqttChannel = "" - defMqttSkipTLSVer = "true" - defMqttMTLS = "false" - defMqttCA = "ca.crt" - defMqttQoS = "0" - defMqttRetain = "false" - defMqttCert = "thing.cert" - defMqttPrivKey = "thing.key" - defConfigFile = "../configs/config.toml" + svcName = "export" + defNatsURL = nats.DefaultURL + defLogLevel = "debug" + defPort = "8170" + defMqttHost = "tcp://localhost:1883" + defMqttUsername = "" + defMqttPassword = "" + defMqttChannel = "" + defMqttSkipTLSVer = "true" + defMqttMTLS = "false" + defMqttCA = "ca.crt" + defMqttQoS = "0" + defMqttRetain = "false" + defMqttCleanSession = "true" + defMqttPersist = "false" + defMqttPersistDir = "../mqtt_persist" + defMqttCert = "thing.cert" + defMqttPrivKey = "thing.key" + defConfigFile = "../configs/config.toml" defCacheURL = "localhost:6379" defCachePass = "" @@ -54,18 +57,21 @@ const ( envLogLevel = "MF_EXPORT_LOG_LEVEL" envPort = "MF_EXPORT_PORT" - envMqttHost = "MF_EXPORT_MQTT_HOST" - envMqttUsername = "MF_EXPORT_MQTT_USERNAME" - envMqttPassword = "MF_EXPORT_MQTT_PASSWORD" - envMqttChannel = "MF_EXPORT_MQTT_CHANNEL" - envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS" - envMqttMTLS = "MF_EXPORT_MQTT_MTLS" - envMqttCA = "MF_EXPORT_MQTT_CA" - envMqttQoS = "MF_EXPORT_MQTT_QOS" - envMqttRetain = "MF_EXPORT_MQTT_RETAIN" - envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT" - envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK" - envConfigFile = "MF_EXPORT_CONFIG_FILE" + envMqttHost = "MF_EXPORT_MQTT_HOST" + envMqttUsername = "MF_EXPORT_MQTT_USERNAME" + envMqttPassword = "MF_EXPORT_MQTT_PASSWORD" + envMqttChannel = "MF_EXPORT_MQTT_CHANNEL" + envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS" + envMqttMTLS = "MF_EXPORT_MQTT_MTLS" + envMqttCA = "MF_EXPORT_MQTT_CA" + envMqttQoS = "MF_EXPORT_MQTT_QOS" + envMqttRetain = "MF_EXPORT_MQTT_RETAIN" + envMqttCleanSession = "MF_EXPORT_MQTT_CLEAN_SESSION" + envMqttPersist = "MF_MQTT_PERSIST" + envMqttPersistDir = "MF_MQTT_PERSIST_FILE" + envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT" + envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK" + envConfigFile = "MF_EXPORT_CONFIG_FILE" envCacheURL = "MF_EXPORT_CACHE_URL" envCachePass = "MF_EXPORT_CACHE_PASS" @@ -145,6 +151,14 @@ func loadConfigs() (exp.Config, error) { if err != nil { mqttRetain = false } + mqttCleanSession, err := strconv.ParseBool(mainflux.Env(envMqttCleanSession, defMqttCleanSession)) + if err != nil { + mqttRetain = false + } + mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist)) + if err != nil { + mqttPersist = false + } q, err := strconv.ParseInt(mainflux.Env(envMqttQoS, defMqttQoS), 10, 64) if err != nil { @@ -166,10 +180,13 @@ func loadConfigs() (exp.Config, error) { Password: mainflux.Env(envMqttPassword, defMqttPassword), Username: mainflux.Env(envMqttUsername, defMqttUsername), - Retain: mqttRetain, - QoS: QoS, - MTLS: mqttMTLS, - SkipTLSVer: mqttSkipTLSVer, + Retain: mqttRetain, + CleanSession: mqttCleanSession, + Persist: mqttPersist, + PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir), + QoS: QoS, + MTLS: mqttMTLS, + SkipTLSVer: mqttSkipTLSVer, CAPath: mainflux.Env(envMqttCA, defMqttCA), ClientCertPath: mainflux.Env(envMqttCert, defMqttCert), diff --git a/configs/config.toml b/configs/config.toml index f7fd688a..61618efe 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -19,6 +19,9 @@ File = "/configs/export/config.toml" password = "" qos = 0 retain = false + clean_session = true + persist = false + persist_dir = "../mqtt_persist" skip_tls_ver = true username = "" diff --git a/pkg/config/config.go b/pkg/config/config.go index 9b64f69d..0b9fd857 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -31,6 +31,9 @@ type MQTT struct { MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"` SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"` Retain bool `json:"retain" toml:"retain" mapstructure:"retain"` + CleanSession bool `json:"clean_session" toml:"clean_session" mapstructure:"clean_session"` + Persist bool `json:"persist" toml:"persist" mapstructure:"persist"` + PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"` QoS int `json:"qos" toml:"qos" mapstructure:"qos"` CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"` ClientCertPath string `json:"client_cert_path" toml:"client_cert_path" mapstructure:"client_cert_path"` diff --git a/pkg/export/service.go b/pkg/export/service.go index e45b4183..e32d3df2 100644 --- a/pkg/export/service.go +++ b/pkg/export/service.go @@ -162,11 +162,18 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C opts := mqtt.NewClientOptions(). AddBroker(conf.MQTT.Host). SetClientID(e.id). - SetCleanSession(true). + SetCleanSession(conf.MQTT.CleanSession). SetAutoReconnect(true). SetOnConnectHandler(e.conn). SetConnectionLostHandler(e.lost) + if conf.MQTT.Persist { + store := mqtt.NewFileStore(conf.MQTT.PersistDir) + opts.SetStore(store) + // Paho deletes persisted messages when restarts with clean session, so it is set to false + opts.SetCleanSession(false) + } + if conf.MQTT.Username != "" && conf.MQTT.Password != "" { opts.SetUsername(conf.MQTT.Username) opts.SetPassword(conf.MQTT.Password)