diff --git a/scripts/planktoscope/imager.py b/scripts/planktoscope/imager.py index 870d8ca..45f008d 100644 --- a/scripts/planktoscope/imager.py +++ b/scripts/planktoscope/imager.py @@ -262,35 +262,6 @@ class ImagerProcess(multiprocessing.Process): logger.success("planktoscope.imager is initialised and ready to go!") - def pump_callback(self, client, userdata, msg): - """Callback for when we receive an MQTT message - - Args: - client: Paho MQTT client information - userdata: userdata of the message - msg: actual message received - """ - # Print the topic and the message - logger.info(f"{self.name}: {msg.topic} {str(msg.qos)} {str(msg.payload)}") - if msg.topic != "status/pump": - logger.error( - f"The received message has the wrong topic {msg.topic}, payload was {str(msg.payload)}" - ) - return - payload = json.loads(msg.payload.decode()) - logger.debug(f"parsed payload is {payload}") - if self.__imager.state.name == "waiting": - if payload["status"] == "Done": - self.__imager.change(planktoscope.imager_state_machine.Capture) - self.imager_client.client.message_callback_remove("status/pump") - self.imager_client.client.unsubscribe("status/pump") - else: - logger.info(f"the pump is not done yet {payload}") - else: - logger.error( - "There is an error, status is not waiting for the pump and yet we received a pump message" - ) - def __message_image(self, last_message): """Actions for when we receive a message""" if ( @@ -302,7 +273,6 @@ class ImagerProcess(multiprocessing.Process): logger.error(f"The received message has the wrong argument {last_message}") self.imager_client.client.publish("status/imager", '{"status":"Error"}') return - # Change the state of the machine self.__imager.change(planktoscope.imager_state_machine.Imaging) # Get duration to wait before an image from the different received arguments @@ -322,9 +292,7 @@ class ImagerProcess(multiprocessing.Process): self.imager_client.client.publish("status/imager", '{"status":"Started"}') - def __message_stop(self, last_message): - # Remove callback for "status/pump" and unsubscribe - self.imager_client.client.message_callback_remove("status/pump") + def __message_stop(self): self.imager_client.client.unsubscribe("status/pump") # Stops the pump @@ -337,7 +305,6 @@ class ImagerProcess(multiprocessing.Process): planktoscope.light.interrupted() - # Change state to Stop self.__imager.change(planktoscope.imager_state_machine.Stop) def __message_update(self, last_message): @@ -509,13 +476,31 @@ class ImagerProcess(multiprocessing.Process): @logger.catch def treat_message(self): action = "" - if self.imager_client.new_message_received(): - logger.info("We received a new message") + logger.info("We received a new message") + if self.imager_client.msg["topic"].startswith("imager/"): last_message = self.imager_client.msg["payload"] logger.debug(last_message) action = self.imager_client.msg["payload"]["action"] logger.debug(action) - self.imager_client.read_message() + elif self.imager_client.msg["topic"] == "status/pump": + logger.debug( + f"Status message payload is {self.imager_client.msg['payload']}" + ) + if self.__imager.state.name == "waiting": + if self.imager_client.msg["payload"]["status"] == "Done": + self.__imager.change(planktoscope.imager_state_machine.Capture) + self.imager_client.client.unsubscribe("status/pump") + else: + logger.info(f"The pump is not done yet {payload}") + else: + logger.error( + "There is an error, we received an unexpected pump message" + ) + else: + logger.error( + f"The received message was not for us! Topic was {self.imager_client.msg['topic']}" + ) + self.imager_client.read_message() # If the command is "image" if action == "image": @@ -523,7 +508,7 @@ class ImagerProcess(multiprocessing.Process): self.__message_image(last_message) elif action == "stop": - self.__message_stop(last_message) + self.__message_stop() elif action == "update_config": self.__message_update(last_message) @@ -557,9 +542,6 @@ class ImagerProcess(multiprocessing.Process): def __state_imaging(self): # subscribe to status/pump self.imager_client.client.subscribe("status/pump") - self.imager_client.client.message_callback_add( - "status/pump", self.pump_callback - ) # Definition of the few important metadata local_metadata = { @@ -609,7 +591,6 @@ class ImagerProcess(multiprocessing.Process): ) # Reset the counter to 0 self.__img_done = 0 - # Change state towards stop self.__imager.change(planktoscope.imager_state_machine.Stop) planktoscope.light.error() return @@ -643,7 +624,6 @@ class ImagerProcess(multiprocessing.Process): self.__pump_message() - # Change state towards Waiting for pump self.__imager.change(planktoscope.imager_state_machine.Waiting) def __state_capture(self): @@ -674,7 +654,6 @@ class ImagerProcess(multiprocessing.Process): # Reset the counter to 0 self.__img_done = 0 self.__img_goal = 0 - # Change state towards stop self.__imager.change(planktoscope.imager_state_machine.Stop) planktoscope.light.error() return @@ -684,7 +663,7 @@ class ImagerProcess(multiprocessing.Process): planktoscope.integrity.append_to_integrity_file(filename_path) except FileNotFoundError as e: logger.error( - f"{filename_path} was not found, the camera may not have worked properly!" + f"{filename_path} was not found, the camera did not work properly!" ) # Publish the name of the image to via MQTT to Node-RED @@ -704,22 +683,16 @@ class ImagerProcess(multiprocessing.Process): # Publish the status "Done" to via MQTT to Node-RED self.imager_client.client.publish("status/imager", '{"status":"Done"}') - # Change state towards done self.__imager.change(planktoscope.imager_state_machine.Stop) planktoscope.light.ready() return else: # We have not reached the final stage, let's keep imaging - # subscribe to status/pump self.imager_client.client.subscribe("status/pump") - self.imager_client.client.message_callback_add( - "status/pump", self.pump_callback - ) self.__pump_message() - # Change state towards Waiting for pump self.__imager.change(planktoscope.imager_state_machine.Waiting) @logger.catch @@ -782,9 +755,10 @@ class ImagerProcess(multiprocessing.Process): # This is the main loop while not self.stop_event.is_set(): - self.treat_message() + if self.imager_client.new_message_received(): + self.treat_message() self.state_machine() - time.sleep(0.0001) + time.sleep(0.001) logger.info("Shutting down the imager process") self.imager_client.client.publish("status/imager", '{"status":"Dead"}')