imager: bugfix for wrong message received

If we stop the imager when the pump is still running, we were removing the callback treating the messages from the pump.
The effect is that if a status message has already been sent by the pump, the imager would parse the message with its normal code. So yeah, not nice.
I've now integrated all message treatment in one function, and pump_callback disappeared!
This commit is contained in:
Romain Bazile 2020-12-15 16:44:01 +01:00
parent 87fb866942
commit f801d74c0e

View file

@ -262,35 +262,6 @@ class ImagerProcess(multiprocessing.Process):
logger.success("planktoscope.imager is initialised and ready to go!") logger.success("planktoscope.imager is initialised and ready to go!")
def pump_callback(self, client, userdata, msg):
"""Callback for when we receive an MQTT message
Args:
client: Paho MQTT client information
userdata: userdata of the message
msg: actual message received
"""
# Print the topic and the message
logger.info(f"{self.name}: {msg.topic} {str(msg.qos)} {str(msg.payload)}")
if msg.topic != "status/pump":
logger.error(
f"The received message has the wrong topic {msg.topic}, payload was {str(msg.payload)}"
)
return
payload = json.loads(msg.payload.decode())
logger.debug(f"parsed payload is {payload}")
if self.__imager.state.name == "waiting":
if payload["status"] == "Done":
self.__imager.change(planktoscope.imager_state_machine.Capture)
self.imager_client.client.message_callback_remove("status/pump")
self.imager_client.client.unsubscribe("status/pump")
else:
logger.info(f"the pump is not done yet {payload}")
else:
logger.error(
"There is an error, status is not waiting for the pump and yet we received a pump message"
)
def __message_image(self, last_message): def __message_image(self, last_message):
"""Actions for when we receive a message""" """Actions for when we receive a message"""
if ( if (
@ -302,7 +273,6 @@ class ImagerProcess(multiprocessing.Process):
logger.error(f"The received message has the wrong argument {last_message}") logger.error(f"The received message has the wrong argument {last_message}")
self.imager_client.client.publish("status/imager", '{"status":"Error"}') self.imager_client.client.publish("status/imager", '{"status":"Error"}')
return return
# Change the state of the machine
self.__imager.change(planktoscope.imager_state_machine.Imaging) self.__imager.change(planktoscope.imager_state_machine.Imaging)
# Get duration to wait before an image from the different received arguments # Get duration to wait before an image from the different received arguments
@ -322,9 +292,7 @@ class ImagerProcess(multiprocessing.Process):
self.imager_client.client.publish("status/imager", '{"status":"Started"}') self.imager_client.client.publish("status/imager", '{"status":"Started"}')
def __message_stop(self, last_message): def __message_stop(self):
# Remove callback for "status/pump" and unsubscribe
self.imager_client.client.message_callback_remove("status/pump")
self.imager_client.client.unsubscribe("status/pump") self.imager_client.client.unsubscribe("status/pump")
# Stops the pump # Stops the pump
@ -337,7 +305,6 @@ class ImagerProcess(multiprocessing.Process):
planktoscope.light.interrupted() planktoscope.light.interrupted()
# Change state to Stop
self.__imager.change(planktoscope.imager_state_machine.Stop) self.__imager.change(planktoscope.imager_state_machine.Stop)
def __message_update(self, last_message): def __message_update(self, last_message):
@ -509,13 +476,31 @@ class ImagerProcess(multiprocessing.Process):
@logger.catch @logger.catch
def treat_message(self): def treat_message(self):
action = "" action = ""
if self.imager_client.new_message_received(): logger.info("We received a new message")
logger.info("We received a new message") if self.imager_client.msg["topic"].startswith("imager/"):
last_message = self.imager_client.msg["payload"] last_message = self.imager_client.msg["payload"]
logger.debug(last_message) logger.debug(last_message)
action = self.imager_client.msg["payload"]["action"] action = self.imager_client.msg["payload"]["action"]
logger.debug(action) logger.debug(action)
self.imager_client.read_message() elif self.imager_client.msg["topic"] == "status/pump":
logger.debug(
f"Status message payload is {self.imager_client.msg['payload']}"
)
if self.__imager.state.name == "waiting":
if self.imager_client.msg["payload"]["status"] == "Done":
self.__imager.change(planktoscope.imager_state_machine.Capture)
self.imager_client.client.unsubscribe("status/pump")
else:
logger.info(f"The pump is not done yet {payload}")
else:
logger.error(
"There is an error, we received an unexpected pump message"
)
else:
logger.error(
f"The received message was not for us! Topic was {self.imager_client.msg['topic']}"
)
self.imager_client.read_message()
# If the command is "image" # If the command is "image"
if action == "image": if action == "image":
@ -523,7 +508,7 @@ class ImagerProcess(multiprocessing.Process):
self.__message_image(last_message) self.__message_image(last_message)
elif action == "stop": elif action == "stop":
self.__message_stop(last_message) self.__message_stop()
elif action == "update_config": elif action == "update_config":
self.__message_update(last_message) self.__message_update(last_message)
@ -557,9 +542,6 @@ class ImagerProcess(multiprocessing.Process):
def __state_imaging(self): def __state_imaging(self):
# subscribe to status/pump # subscribe to status/pump
self.imager_client.client.subscribe("status/pump") self.imager_client.client.subscribe("status/pump")
self.imager_client.client.message_callback_add(
"status/pump", self.pump_callback
)
# Definition of the few important metadata # Definition of the few important metadata
local_metadata = { local_metadata = {
@ -609,7 +591,6 @@ class ImagerProcess(multiprocessing.Process):
) )
# Reset the counter to 0 # Reset the counter to 0
self.__img_done = 0 self.__img_done = 0
# Change state towards stop
self.__imager.change(planktoscope.imager_state_machine.Stop) self.__imager.change(planktoscope.imager_state_machine.Stop)
planktoscope.light.error() planktoscope.light.error()
return return
@ -643,7 +624,6 @@ class ImagerProcess(multiprocessing.Process):
self.__pump_message() self.__pump_message()
# Change state towards Waiting for pump
self.__imager.change(planktoscope.imager_state_machine.Waiting) self.__imager.change(planktoscope.imager_state_machine.Waiting)
def __state_capture(self): def __state_capture(self):
@ -674,7 +654,6 @@ class ImagerProcess(multiprocessing.Process):
# Reset the counter to 0 # Reset the counter to 0
self.__img_done = 0 self.__img_done = 0
self.__img_goal = 0 self.__img_goal = 0
# Change state towards stop
self.__imager.change(planktoscope.imager_state_machine.Stop) self.__imager.change(planktoscope.imager_state_machine.Stop)
planktoscope.light.error() planktoscope.light.error()
return return
@ -684,7 +663,7 @@ class ImagerProcess(multiprocessing.Process):
planktoscope.integrity.append_to_integrity_file(filename_path) planktoscope.integrity.append_to_integrity_file(filename_path)
except FileNotFoundError as e: except FileNotFoundError as e:
logger.error( logger.error(
f"{filename_path} was not found, the camera may not have worked properly!" f"{filename_path} was not found, the camera did not work properly!"
) )
# Publish the name of the image to via MQTT to Node-RED # Publish the name of the image to via MQTT to Node-RED
@ -704,22 +683,16 @@ class ImagerProcess(multiprocessing.Process):
# Publish the status "Done" to via MQTT to Node-RED # Publish the status "Done" to via MQTT to Node-RED
self.imager_client.client.publish("status/imager", '{"status":"Done"}') self.imager_client.client.publish("status/imager", '{"status":"Done"}')
# Change state towards done
self.__imager.change(planktoscope.imager_state_machine.Stop) self.__imager.change(planktoscope.imager_state_machine.Stop)
planktoscope.light.ready() planktoscope.light.ready()
return return
else: else:
# We have not reached the final stage, let's keep imaging # We have not reached the final stage, let's keep imaging
# subscribe to status/pump
self.imager_client.client.subscribe("status/pump") self.imager_client.client.subscribe("status/pump")
self.imager_client.client.message_callback_add(
"status/pump", self.pump_callback
)
self.__pump_message() self.__pump_message()
# Change state towards Waiting for pump
self.__imager.change(planktoscope.imager_state_machine.Waiting) self.__imager.change(planktoscope.imager_state_machine.Waiting)
@logger.catch @logger.catch
@ -782,9 +755,10 @@ class ImagerProcess(multiprocessing.Process):
# This is the main loop # This is the main loop
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self.treat_message() if self.imager_client.new_message_received():
self.treat_message()
self.state_machine() self.state_machine()
time.sleep(0.0001) time.sleep(0.001)
logger.info("Shutting down the imager process") logger.info("Shutting down the imager process")
self.imager_client.client.publish("status/imager", '{"status":"Dead"}') self.imager_client.client.publish("status/imager", '{"status":"Dead"}')