Skip to content

Commit

Permalink
code refactoring on clearing retained messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ftylitak committed Apr 26, 2022
1 parent 65e911d commit 1dca133
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 105 deletions.
151 changes: 79 additions & 72 deletions insighioNode/apps/demo_console/ota.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,80 +10,87 @@ def checkAndApply(client):
return False

logging.info("Waiting for incoming control message (OTA)...")
message = client.get_control_message()
messageDict = client.get_control_message()

if message is None or message["message"] is None:
if messageDict is None or messageDict["topic"] is None or messageDict["message"] is None:
logging.info("No control message received")
return
else:
logging.debug("mqtt message received: " + str(message["message"]) + " in topic: " + str(message["topic"]))

# if it is a device configuration with content
if message["topic"].endswith("/config"):
# if modem based, clear message and ignore
if client.modem_based:
client.clearConfigMessages()
return

message_parsed = message["message"]
# if message is byte array, decode it, or else ignore and proceed
try:
message_parsed = message["message"].decode("utf-8")
except Exception as e:
pass

applyDeviceConfiguration(client, message_parsed)
# if it is a configuration request without content
elif message["topic"].endswith("/configRequest"):
if not client.modem_based:
client.clearConfigRequestMessages()
return

message_parsed = downloadDeviceConfigurationHTTP(client)
# if message is byte array, decode it, or else ignore and proceed
try:
message_parsed = message["message"].decode("utf-8")
except Exception as e:
pass

applyDeviceConfiguration(client, message_parsed)
elif message["topic"] is None or message["topic"].endswith("/ota"):
from external.kpn_senml.senml_pack_json import SenmlPackJson
senmlMessage = SenmlPackJson("")
senmlMessage.from_json(message["message"])
eventId = None
fileId = None
fileType = None
fileSize = None
for el in senmlMessage:
name = str(el.name)
if name == "e":
eventId = el.value
elif name == "i":
fileId = el.value
elif name == "t":
fileType = el.value
elif name == "s":
fileSize = el.value
# eventId ==0 => pending for installation
if str(eventId) == "0" and fileId and fileType and fileSize:
downloaded_file = downloadOTA(client, fileId, fileType, fileSize)
if downloaded_file:
from . import apply_ota
applied = apply_ota.do_apply(downloaded_file)
if applied:
print("about to reset...")
sendStatusMessage(client, fileId, True)
client.clear_control_message_ota()
import utils
utils.clearCachedStates()
import machine
machine.reset()
else:
sendStatusMessage(client, fileId, False, "can not apply")
client.clear_control_message_ota()
topic = messageDict["topic"]
message = messageDict["message"]
# if message is byte array, decode it, or else ignore and proceed
try:
topic = topic.decode("utf-8")
except Exception as e:
pass
try:
message = message.decode("utf-8")
except Exception as e:
pass

logging.debug("mqtt message received: " + str(message) + " in topic: " + str(topic))

# if it is a device configuration with content
if topic.endswith("/config"):
# if modem based, clear message and ignore
if client.modem_based:
client.clear_retained(topic)
checkAndApply(client)
return

# first clear non-modem based config request
client.clear_retained(topic + "Request")
applyDeviceConfiguration(client, message, topic)
# if it is a configuration request without content
elif topic.endswith("/configRequest") and message == "pending":
if not client.modem_based:
client.clear_retained(topic)
checkAndApply(client)
return

# first clear non-modem based config request
client.clear_retained(topic.replace("Request", ""))
config_content = downloadDeviceConfigurationHTTP(client)

applyDeviceConfiguration(client, config_content, topic)
# topic is None ? why is this?
elif topic is None or topic.endswith("/ota"):
from external.kpn_senml.senml_pack_json import SenmlPackJson
senmlMessage = SenmlPackJson("")
senmlMessage.from_json(message)
eventId = None
fileId = None
fileType = None
fileSize = None
for el in senmlMessage:
name = str(el.name)
if name == "e":
eventId = el.value
elif name == "i":
fileId = el.value
elif name == "t":
fileType = el.value
elif name == "s":
fileSize = el.value
# eventId ==0 => pending for installation
if str(eventId) == "0" and fileId and fileType and fileSize:
downloaded_file = downloadOTA(client, fileId, fileType, fileSize)
if downloaded_file:
from . import apply_ota
applied = apply_ota.do_apply(downloaded_file)
if applied:
print("about to reset...")
sendStatusMessage(client, fileId, True)
client.clear_retained(topic)
import utils
utils.clearCachedStates()
import machine
machine.reset()
else:
sendStatusMessage(client, fileId, False, "can not download")
sendStatusMessage(client, fileId, False, "can not apply")
client.clear_retained(topic)
else:
sendStatusMessage(client, fileId, False, "can not download")


def hasEnoughFreeSpace(fileSize):
Expand Down Expand Up @@ -161,7 +168,7 @@ def downloadDeviceConfigurationHTTP(client):
if x < len(buf):
buf = buf[:x]
try:
return buf.decode()
return buf.decode("utf-8")
except Exception as e:
pass

Expand Down Expand Up @@ -219,7 +226,7 @@ def downloadOTA(client, fileId, fileType, fileSize):
return None


def applyDeviceConfiguration(client, configurationParameters):
def applyDeviceConfiguration(client, configurationParameters, topic):
if configurationParameters is None:
return
# remove '?' if the string starts with it
Expand All @@ -242,7 +249,7 @@ def applyDeviceConfiguration(client, configurationParameters):

from www import configuration_handler
configuration_handler.apply_configuration(keyValueDict)
client.clear_control_message_config()
client.clear_retained(topic)
client.disconnect()
import machine
machine.reset()
27 changes: 6 additions & 21 deletions insighioNode/apps/demo_console/transfer_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,12 @@ def get_control_message(self):
elif self.protocol == 'mqtt':
return self.client.subscribe_and_get_first_message()

def clear_control_message_ota(self):
return self.clear_control_messages("/ota")

def clear_control_message_config(self):
statusDefault = self.clear_control_messages("/config")
statusHTTP = self.clear_control_messages("/configRequest")

def clear_control_messages(self, controlMessagePostfix):

if not self.connected:
logging.info("TransferProtocol not connected")
return False

def clear_retained(self, topic):
logging.info("About to clear retained message of topic: " + topic)
if self.modem_based:
topic = 'channels/{}/messages/{}{}'.format(self.protocol_config.control_channel_id, self.protocol_config.thing_id, controlMessagePostfix)
return self.modem_instance.mqtt_publish(topic, "", 3, True)
elif self.protocol == 'mqtt':
logging.info("About to clear retained messages")
if controlMessagePostfix == "/ota":
self.client.clearOtaMessages()
elif controlMessagePostfix == "/config":
self.client.clearConfigMessages()
# self.client.sendMessage("", self.protocol_config.control_channel_id, True)
logging.info("Done.")
return self.client.sendMessage("", topic, True)
else:
logging.error("clear_retained: protocol not supported")
return False
12 changes: 0 additions & 12 deletions insighioNode/lib/protocols/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ def prepareChannelNames(self, mqtt_config):
controlChannel = "channels/" + mqtt_config.control_channel_id + "/messages"
self.controlChannelGeneric = controlChannel + "/" + mqtt_config.thing_id + "/#"
self.otaChannel = controlChannel + "/" + mqtt_config.thing_id + "/ota"
self.configChannel = controlChannel + "/" + mqtt_config.thing_id + "/config"
self.configResponseChannel = controlChannel + "/" + mqtt_config.thing_id + "/configResponse"
self.messageChannel = "channels/" + mqtt_config.message_channel_id + "/messages/" + mqtt_config.thing_id

logging.debug("Selected channels:")
logging.debug(" ota channel: " + self.otaChannel)
logging.debug(" config channel: " + self.configChannel)
logging.debug(" messageChannel: " + self.messageChannel)

def subscribe_callback(self, topic, message):
Expand Down Expand Up @@ -86,15 +83,6 @@ def sendMessage(self, message, topic=None, retained=False):
def sendOtaMessage(self, message):
return self.sendMessage(message, self.otaChannel, False)

def clearOtaMessages(self):
return self.sendMessage("", self.otaChannel, True)

def clearConfigMessages(self):
return self.sendMessage("", self.configChannel, True)

def clearConfigRequestMessages(self):
return self.sendMessage("", self.configResponseChannel, True)

def subscribe_and_get_first_message(self, channel=None):
try:
if channel is None:
Expand Down

0 comments on commit 1dca133

Please sign in to comment.