From ec43f11f9816fb21e5e77cb44e6bee47ced2670a Mon Sep 17 00:00:00 2001 From: Romain Bazile Date: Mon, 18 Oct 2021 12:44:47 +0200 Subject: [PATCH] segmenter: add process_id --- scripts/planktoscope/segmenter/__init__.py | 101 ++++++++++++++++----- 1 file changed, 77 insertions(+), 24 deletions(-) diff --git a/scripts/planktoscope/segmenter/__init__.py b/scripts/planktoscope/segmenter/__init__.py index 4ad6673..89b1d09 100644 --- a/scripts/planktoscope/segmenter/__init__.py +++ b/scripts/planktoscope/segmenter/__init__.py @@ -98,6 +98,7 @@ class SegmenterProcess(multiprocessing.Process): # combination of self.__debug_objects_root and actual sample folder name self.__working_debug_path = "" self.__archive_fn = "" + self.__process_id = "" self.__flat = None self.__mask_array = None self.__mask_to_remove = None @@ -565,7 +566,13 @@ class SegmenterProcess(multiprocessing.Process): logger.debug(f"Images found are {images_list}") images_count = len(images_list) - logger.debug(f"We found {images_count} images, good luck!") + if images_count == 0: + logger.error( + "There is no image to run the segmentation on. Please check your selection." + ) + raise FileNotFoundError + else: + logger.debug(f"We found {images_count} images, good luck!") first_start = time.monotonic() self.__mask_to_remove = None @@ -665,7 +672,9 @@ class SegmenterProcess(multiprocessing.Process): objects_count, _ = self._slice_image(img, name, mask, total_objects) total_objects += objects_count # Simple heuristic to detect a movement of the flow cell and a change in the resulting flat + # TODO: this heuristic should be improved or removed if deemed unnecessary if average_objects != 0 and objects_count > average_objects + 20: + # FIXME: this should force a new slice of the current image logger.debug( f"We need to recalculate a flat since we have {objects_count} new objects instead of the average of {average_objects}" ) @@ -735,14 +744,29 @@ class SegmenterProcess(multiprocessing.Process): """ logger.info(f"The pipeline will be run in {len(path_list)} directories") logger.debug(f"Those are {path_list}") + + self.__process_uuid = planktoscope.uuidName.uuidMachine( + machine=planktoscope.uuidName.getSerial() + ) + + if self.__process_id == "": + self.__process_id = self.__process_uuid + + logger.info(f"The process_uuid of this run is {self.__process_uuid}") + logger.info(f"The process_id of this run is {self.__process_id}") + exception = None + for path in path_list: logger.debug(f"{path}: Checking for the presence of metadata.json") if os.path.exists(os.path.join(path, "metadata.json")): # The file exists, let's check if we force or not if force: # forcing, let's gooooo - if not self.segment_path(path, ecotaxa_export): - logger.error(f"There was en error while segmenting {path}") + try: + self.segment_path(path, ecotaxa_export) + except Exception as e: + logger.error(f"There was an error while segmenting {path}") + exception = e else: # we need to check for the presence of done.txt in each folder logger.debug(f"{path}: Checking for the presence of done.txt") @@ -751,12 +775,25 @@ class SegmenterProcess(multiprocessing.Process): f"Moving to the next folder, {path} has already been segmented" ) else: - if not self.segment_path(path, ecotaxa_export): - logger.error(f"There was en error while segmenting {path}") + try: + self.segment_path(path, ecotaxa_export) + except Exception as e: + logger.error(f"There was an error while segmenting {path}") + exception = e else: logger.debug(f"Moving to the next folder, {path} has no metadata.json") - # Publish the status "Done" to via MQTT to Node-RED - self.segmenter_client.client.publish("status/segmenter", '{"status":"Done"}') + if exception is None: + # Publish the status "Done" to via MQTT to Node-RED + self.segmenter_client.client.publish( + "status/segmenter", '{"status":"Done"}' + ) + else: + self.segmenter_client.client.publish( + "status/segmenter", + f'{{"status":"An exception was raised during the segmentation: {e}."}}', + ) + # Reset process_id + self.__process_id = "" def segment_path(self, path, ecotaxa_export): """Starts the segmentation in the given path @@ -778,8 +815,12 @@ class SegmenterProcess(multiprocessing.Process): ) project = self.__global_metadata["sample_project"].replace(" ", "_") - date = datetime.datetime.utcnow().isoformat() sample = self.__global_metadata["sample_id"].replace(" ", "_") + date = datetime.datetime.utcnow().isoformat() + + self.__global_metadata["process_datetime"] = date + self.__global_metadata["process_uuid"] = self.__process_uuid + self.__global_metadata["process_id"] = f"{project}_{sample}_{self.__process_id}" # TODO Make this dynamic: if we change operations order and/or parameters, we need to make this evolve. self.__global_metadata["process_1st_operation"] = { @@ -851,12 +892,13 @@ class SegmenterProcess(multiprocessing.Process): self._pipe(ecotaxa_export) except Exception as e: logger.exception(f"There was an error in the pipeline {e}") - return False + raise e # Add file 'done' to path to mark the folder as already segmented with open(os.path.join(self.__working_path, "done.txt"), "w") as done_file: done_file.writelines(datetime.datetime.utcnow().isoformat()) logger.info(f"Pipeline has been run for {path}") + return True @logger.catch @@ -871,28 +913,39 @@ class SegmenterProcess(multiprocessing.Process): if "action" in last_message: # If the command is "segment" if last_message["action"] == "segment": - path = None - recursive = True - force = False - ecotaxa_export = True # {"action":"segment"} if "settings" in last_message: - if "force" in last_message["settings"]: - # force rework of already done folder - force = last_message["settings"]["force"] - if "recursive" in last_message["settings"]: - # parse folders recursively starting from the given parameter - recursive = last_message["settings"]["recursive"] - if "ecotaxa" in last_message["settings"]: - # generate ecotaxa output archive - ecotaxa_export = last_message["settings"]["ecotaxa"] + # force rework of already done folder + force = ( + last_message["settings"]["force"] + if "force" in last_message + else False + ) + + # parse folders recursively starting from the given parameter + recursive = ( + last_message["settings"]["recursive"] + if "recursive" in last_message + else True + ) + + # generate ecotaxa output archive + ecotaxa_export = ( + last_message["settings"]["ecotaxa"] + if "ecotaxa" in last_message + else True + ) + if "keep" in last_message["settings"]: # keep debug images self.__save_debug_img = last_message["settings"]["keep"] + + if "process_id" in last_message["settings"]: + # keep debug images + self.__process_id = last_message["settings"]["process_id"] # TODO eventually add customisation to segmenter parameters here - if "path" in last_message: - path = last_message["path"] + path = last_message["path"] if "path" in last_message else None # Publish the status "Started" to via MQTT to Node-RED self.segmenter_client.client.publish(