Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-28 - enabled messages paho persistence #27

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build/*
38 changes: 21 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ By default `Export` service looks for config file at [`../configs/config.toml`][
password = "<thing_password>"
ca = "ca.crt"
cert = "thing.crt"
mtls = "false"
mtls = false
priv_key = "thing.key"
retain = "false"
retain = false
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = "false"
url = "tcp://mainflux.com:1883"

Expand Down Expand Up @@ -110,21 +112,23 @@ Edit Mainflux [docker-compose.yml][docker-compose]. NATS section must look like
Service will look for `config.toml` first and if not found it will be configured with env variables and new config file specified with `MF_EXPORT_CONFIG_FILE` will be saved with values populated from env vars.
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

| Variable | Description | Default |
|-------------------------------|---------------------------------------------------------------|-----------------------|
| MF_NATS_URL | Nats url | localhost:4222 |
| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 |
| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | |
| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | |
| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | |
| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true |
| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false |
| MF_EXPORT_MQTT_CA | CA for tls | ca.crt |
| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt |
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |
| Variable | Description | Default |
|-------------------------------|-----------------------------------------------------------------------------------------|-----------------------|
| MF_NATS_URL | Nats url | localhost:4222 |
| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 |
| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | |
| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | |
| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | |
| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true |
| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false |
| MF_EXPORT_MQTT_CA | CA for tls | ca.crt |
| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt |
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false |
| MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |

for values in environment variables to take effect make sure that there is no `MF_EXPORT_CONF` file.

Expand Down
10 changes: 10 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = "false"
defMqttPersist = "false"
defMqttPersistDir = "../mqtt_persist"
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "../configs/config.toml"
Expand All @@ -63,6 +65,8 @@ const (
envMqttCA = "MF_EXPORT_MQTT_CA"
envMqttQoS = "MF_EXPORT_MQTT_QOS"
envMqttRetain = "MF_EXPORT_MQTT_RETAIN"
envMqttPersist = "MF_MQTT_PERSIST"
envMqttPersistDir = "MF_MQTT_PERSIST_FILE"
envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK"
envConfigFile = "MF_EXPORT_CONFIG_FILE"
Expand Down Expand Up @@ -145,6 +149,10 @@ func loadConfigs() (exp.Config, error) {
if err != nil {
mqttRetain = false
}
mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist))
if err != nil {
mqttPersist = false
}

q, err := strconv.ParseInt(mainflux.Env(envMqttQoS, defMqttQoS), 10, 64)
if err != nil {
Expand All @@ -167,6 +175,8 @@ func loadConfigs() (exp.Config, error) {
Username: mainflux.Env(envMqttUsername, defMqttUsername),

Retain: mqttRetain,
Persist: mqttPersist,
PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir),
QoS: QoS,
MTLS: mqttMTLS,
SkipTLSVer: mqttSkipTLSVer,
Expand Down
2 changes: 2 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ File = "/configs/export/config.toml"
password = ""
qos = 0
retain = false
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = true
username = ""

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MQTT struct {
MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"`
SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"`
Retain bool `json:"retain" toml:"retain" mapstructure:"retain"`
Persist bool `json:"persist" toml:"persist" mapstructure:"persist"`
PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"`
QoS int `json:"qos" toml:"qos" mapstructure:"qos"`
CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"`
ClientCertPath string `json:"client_cert_path" toml:"client_cert_path" mapstructure:"client_cert_path"`
Expand Down
7 changes: 6 additions & 1 deletion pkg/export/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,16 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C
opts := mqtt.NewClientOptions().
AddBroker(conf.MQTT.Host).
SetClientID(e.id).
SetCleanSession(true).
SetCleanSession(false).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be optional, so that when started for the first time it can start with the new session. Should be added to ENV vars.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added variable to the env in new commit

SetAutoReconnect(true).
SetOnConnectHandler(e.conn).
SetConnectionLostHandler(e.lost)

if conf.MQTT.Persist {
store := mqtt.NewFileStore(conf.MQTT.PersistDir)
opts.SetStore(store)
}

if conf.MQTT.Username != "" && conf.MQTT.Password != "" {
opts.SetUsername(conf.MQTT.Username)
opts.SetPassword(conf.MQTT.Password)
Expand Down