From f801d74c0e239a3e78a87940233e5f5e1fdc824a Mon Sep 17 00:00:00 2001 From: Romain Bazile Date: Tue, 15 Dec 2020 16:44:01 +0100 Subject: [PATCH] imager: bugfix for wrong message received If we stop the imager when the pump is still running, we were removing the callback treating the messages from the pump. The effect is that if a status message has already been sent by the pump, the imager would parse the message with its normal code. So yeah, not nice. I've now integrated all message treatment in one function, and pump_callback disappeared! --- scripts/planktoscope/imager.py | 80 ++++++++++++---------------------- 1 file changed, 27 insertions(+), 53 deletions(-) diff --git a/scripts/planktoscope/imager.py b/scripts/planktoscope/imager.py index 870d8ca..45f008d 100644 --- a/scripts/planktoscope/imager.py +++ b/scripts/planktoscope/imager.py @@ -262,35 +262,6 @@ class ImagerProcess(multiprocessing.Process): logger.success("planktoscope.imager is initialised and ready to go!") - def pump_callback(self, client, userdata, msg): - """Callback for when we receive an MQTT message - - Args: - client: Paho MQTT client information - userdata: userdata of the message - msg: actual message received - """ - # Print the topic and the message - logger.info(f"{self.name}: {msg.topic} {str(msg.qos)} {str(msg.payload)}") - if msg.topic != "status/pump": - logger.error( - f"The received message has the wrong topic {msg.topic}, payload was {str(msg.payload)}" - ) - return - payload = json.loads(msg.payload.decode()) - logger.debug(f"parsed payload is {payload}") - if self.__imager.state.name == "waiting": - if payload["status"] == "Done": - self.__imager.change(planktoscope.imager_state_machine.Capture) - self.imager_client.client.message_callback_remove("status/pump") - self.imager_client.client.unsubscribe("status/pump") - else: - logger.info(f"the pump is not done yet {payload}") - else: - logger.error( - "There is an error, status is not waiting for the pump and yet we received a pump message" - ) - def __message_image(self, last_message): """Actions for when we receive a message""" if ( @@ -302,7 +273,6 @@ class ImagerProcess(multiprocessing.Process): logger.error(f"The received message has the wrong argument {last_message}") self.imager_client.client.publish("status/imager", '{"status":"Error"}') return - # Change the state of the machine self.__imager.change(planktoscope.imager_state_machine.Imaging) # Get duration to wait before an image from the different received arguments @@ -322,9 +292,7 @@ class ImagerProcess(multiprocessing.Process): self.imager_client.client.publish("status/imager", '{"status":"Started"}') - def __message_stop(self, last_message): - # Remove callback for "status/pump" and unsubscribe - self.imager_client.client.message_callback_remove("status/pump") + def __message_stop(self): self.imager_client.client.unsubscribe("status/pump") # Stops the pump @@ -337,7 +305,6 @@ class ImagerProcess(multiprocessing.Process): planktoscope.light.interrupted() - # Change state to Stop self.__imager.change(planktoscope.imager_state_machine.Stop) def __message_update(self, last_message): @@ -509,13 +476,31 @@ class ImagerProcess(multiprocessing.Process): @logger.catch def treat_message(self): action = "" - if self.imager_client.new_message_received(): - logger.info("We received a new message") + logger.info("We received a new message") + if self.imager_client.msg["topic"].startswith("imager/"): last_message = self.imager_client.msg["payload"] logger.debug(last_message) action = self.imager_client.msg["payload"]["action"] logger.debug(action) - self.imager_client.read_message() + elif self.imager_client.msg["topic"] == "status/pump": + logger.debug( + f"Status message payload is {self.imager_client.msg['payload']}" + ) + if self.__imager.state.name == "waiting": + if self.imager_client.msg["payload"]["status"] == "Done": + self.__imager.change(planktoscope.imager_state_machine.Capture) + self.imager_client.client.unsubscribe("status/pump") + else: + logger.info(f"The pump is not done yet {payload}") + else: + logger.error( + "There is an error, we received an unexpected pump message" + ) + else: + logger.error( + f"The received message was not for us! Topic was {self.imager_client.msg['topic']}" + ) + self.imager_client.read_message() # If the command is "image" if action == "image": @@ -523,7 +508,7 @@ class ImagerProcess(multiprocessing.Process): self.__message_image(last_message) elif action == "stop": - self.__message_stop(last_message) + self.__message_stop() elif action == "update_config": self.__message_update(last_message) @@ -557,9 +542,6 @@ class ImagerProcess(multiprocessing.Process): def __state_imaging(self): # subscribe to status/pump self.imager_client.client.subscribe("status/pump") - self.imager_client.client.message_callback_add( - "status/pump", self.pump_callback - ) # Definition of the few important metadata local_metadata = { @@ -609,7 +591,6 @@ class ImagerProcess(multiprocessing.Process): ) # Reset the counter to 0 self.__img_done = 0 - # Change state towards stop self.__imager.change(planktoscope.imager_state_machine.Stop) planktoscope.light.error() return @@ -643,7 +624,6 @@ class ImagerProcess(multiprocessing.Process): self.__pump_message() - # Change state towards Waiting for pump self.__imager.change(planktoscope.imager_state_machine.Waiting) def __state_capture(self): @@ -674,7 +654,6 @@ class ImagerProcess(multiprocessing.Process): # Reset the counter to 0 self.__img_done = 0 self.__img_goal = 0 - # Change state towards stop self.__imager.change(planktoscope.imager_state_machine.Stop) planktoscope.light.error() return @@ -684,7 +663,7 @@ class ImagerProcess(multiprocessing.Process): planktoscope.integrity.append_to_integrity_file(filename_path) except FileNotFoundError as e: logger.error( - f"{filename_path} was not found, the camera may not have worked properly!" + f"{filename_path} was not found, the camera did not work properly!" ) # Publish the name of the image to via MQTT to Node-RED @@ -704,22 +683,16 @@ class ImagerProcess(multiprocessing.Process): # Publish the status "Done" to via MQTT to Node-RED self.imager_client.client.publish("status/imager", '{"status":"Done"}') - # Change state towards done self.__imager.change(planktoscope.imager_state_machine.Stop) planktoscope.light.ready() return else: # We have not reached the final stage, let's keep imaging - # subscribe to status/pump self.imager_client.client.subscribe("status/pump") - self.imager_client.client.message_callback_add( - "status/pump", self.pump_callback - ) self.__pump_message() - # Change state towards Waiting for pump self.__imager.change(planktoscope.imager_state_machine.Waiting) @logger.catch @@ -782,9 +755,10 @@ class ImagerProcess(multiprocessing.Process): # This is the main loop while not self.stop_event.is_set(): - self.treat_message() + if self.imager_client.new_message_received(): + self.treat_message() self.state_machine() - time.sleep(0.0001) + time.sleep(0.001) logger.info("Shutting down the imager process") self.imager_client.client.publish("status/imager", '{"status":"Dead"}')