segmenter: add process_id

This commit is contained in:
Romain Bazile 2021-10-18 12:44:47 +02:00
parent 5d0c067eb6
commit ec43f11f98

View file

@ -98,6 +98,7 @@ class SegmenterProcess(multiprocessing.Process):
# combination of self.__debug_objects_root and actual sample folder name
self.__working_debug_path = ""
self.__archive_fn = ""
self.__process_id = ""
self.__flat = None
self.__mask_array = None
self.__mask_to_remove = None
@ -565,6 +566,12 @@ class SegmenterProcess(multiprocessing.Process):
logger.debug(f"Images found are {images_list}")
images_count = len(images_list)
if images_count == 0:
logger.error(
"There is no image to run the segmentation on. Please check your selection."
)
raise FileNotFoundError
else:
logger.debug(f"We found {images_count} images, good luck!")
first_start = time.monotonic()
@ -665,7 +672,9 @@ class SegmenterProcess(multiprocessing.Process):
objects_count, _ = self._slice_image(img, name, mask, total_objects)
total_objects += objects_count
# Simple heuristic to detect a movement of the flow cell and a change in the resulting flat
# TODO: this heuristic should be improved or removed if deemed unnecessary
if average_objects != 0 and objects_count > average_objects + 20:
# FIXME: this should force a new slice of the current image
logger.debug(
f"We need to recalculate a flat since we have {objects_count} new objects instead of the average of {average_objects}"
)
@ -735,14 +744,29 @@ class SegmenterProcess(multiprocessing.Process):
"""
logger.info(f"The pipeline will be run in {len(path_list)} directories")
logger.debug(f"Those are {path_list}")
self.__process_uuid = planktoscope.uuidName.uuidMachine(
machine=planktoscope.uuidName.getSerial()
)
if self.__process_id == "":
self.__process_id = self.__process_uuid
logger.info(f"The process_uuid of this run is {self.__process_uuid}")
logger.info(f"The process_id of this run is {self.__process_id}")
exception = None
for path in path_list:
logger.debug(f"{path}: Checking for the presence of metadata.json")
if os.path.exists(os.path.join(path, "metadata.json")):
# The file exists, let's check if we force or not
if force:
# forcing, let's gooooo
if not self.segment_path(path, ecotaxa_export):
logger.error(f"There was en error while segmenting {path}")
try:
self.segment_path(path, ecotaxa_export)
except Exception as e:
logger.error(f"There was an error while segmenting {path}")
exception = e
else:
# we need to check for the presence of done.txt in each folder
logger.debug(f"{path}: Checking for the presence of done.txt")
@ -751,12 +775,25 @@ class SegmenterProcess(multiprocessing.Process):
f"Moving to the next folder, {path} has already been segmented"
)
else:
if not self.segment_path(path, ecotaxa_export):
logger.error(f"There was en error while segmenting {path}")
try:
self.segment_path(path, ecotaxa_export)
except Exception as e:
logger.error(f"There was an error while segmenting {path}")
exception = e
else:
logger.debug(f"Moving to the next folder, {path} has no metadata.json")
if exception is None:
# Publish the status "Done" to via MQTT to Node-RED
self.segmenter_client.client.publish("status/segmenter", '{"status":"Done"}')
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Done"}'
)
else:
self.segmenter_client.client.publish(
"status/segmenter",
f'{{"status":"An exception was raised during the segmentation: {e}."}}',
)
# Reset process_id
self.__process_id = ""
def segment_path(self, path, ecotaxa_export):
"""Starts the segmentation in the given path
@ -778,8 +815,12 @@ class SegmenterProcess(multiprocessing.Process):
)
project = self.__global_metadata["sample_project"].replace(" ", "_")
date = datetime.datetime.utcnow().isoformat()
sample = self.__global_metadata["sample_id"].replace(" ", "_")
date = datetime.datetime.utcnow().isoformat()
self.__global_metadata["process_datetime"] = date
self.__global_metadata["process_uuid"] = self.__process_uuid
self.__global_metadata["process_id"] = f"{project}_{sample}_{self.__process_id}"
# TODO Make this dynamic: if we change operations order and/or parameters, we need to make this evolve.
self.__global_metadata["process_1st_operation"] = {
@ -851,12 +892,13 @@ class SegmenterProcess(multiprocessing.Process):
self._pipe(ecotaxa_export)
except Exception as e:
logger.exception(f"There was an error in the pipeline {e}")
return False
raise e
# Add file 'done' to path to mark the folder as already segmented
with open(os.path.join(self.__working_path, "done.txt"), "w") as done_file:
done_file.writelines(datetime.datetime.utcnow().isoformat())
logger.info(f"Pipeline has been run for {path}")
return True
@logger.catch
@ -871,28 +913,39 @@ class SegmenterProcess(multiprocessing.Process):
if "action" in last_message:
# If the command is "segment"
if last_message["action"] == "segment":
path = None
recursive = True
force = False
ecotaxa_export = True
# {"action":"segment"}
if "settings" in last_message:
if "force" in last_message["settings"]:
# force rework of already done folder
force = last_message["settings"]["force"]
if "recursive" in last_message["settings"]:
force = (
last_message["settings"]["force"]
if "force" in last_message
else False
)
# parse folders recursively starting from the given parameter
recursive = last_message["settings"]["recursive"]
if "ecotaxa" in last_message["settings"]:
recursive = (
last_message["settings"]["recursive"]
if "recursive" in last_message
else True
)
# generate ecotaxa output archive
ecotaxa_export = last_message["settings"]["ecotaxa"]
ecotaxa_export = (
last_message["settings"]["ecotaxa"]
if "ecotaxa" in last_message
else True
)
if "keep" in last_message["settings"]:
# keep debug images
self.__save_debug_img = last_message["settings"]["keep"]
if "process_id" in last_message["settings"]:
# keep debug images
self.__process_id = last_message["settings"]["process_id"]
# TODO eventually add customisation to segmenter parameters here
if "path" in last_message:
path = last_message["path"]
path = last_message["path"] if "path" in last_message else None
# Publish the status "Started" to via MQTT to Node-RED
self.segmenter_client.client.publish(