planktoscope/scripts/planktoscope/segmenter.py
Romain Bazile 51ff3b7375 update the light library
also replaces smbus by smbus2 to allow for cleaner setups
2020-11-30 00:17:30 +01:00

374 lines
15 KiB
Python

################################################################################
# Practical Libraries
################################################################################
# Logger library compatible with multiprocessing
from loguru import logger
# Library to get date and time for folder name and filename
import datetime
# Library to be able to sleep for a given duration
import time
# Libraries manipulate json format, execute bash commands
import json, shutil, os
# Library for starting processes
import multiprocessing
# Basic planktoscope libraries
import planktoscope.mqtt
import planktoscope.light
logger.info("planktoscope.segmenter is loaded")
################################################################################
# Main Segmenter class
################################################################################
class SegmenterProcess(multiprocessing.Process):
"""This class contains the main definitions for the segmenter of the PlanktoScope"""
@logger.catch
def __init__(self, event):
"""Initialize the Segmenter class
Args:
event (multiprocessing.Event): shutdown event
"""
super(SegmenterProcess, self).__init__(name="segmenter")
logger.info("planktoscope.segmenter is initialising")
self.stop_event = event
self.__pipe = None
self.segmenter_client = None
self.__img_path = "/home/pi/data/img/"
self.__export_path = "/home/pi/data/export/"
self.__objects_base_path = "/home/pi/data/objects/"
self.__ecotaxa_path = os.path.join(self.__export_path, "ecotaxa")
self.__global_metadata = None
self.__working_path = ""
self.__working_obj_path = ""
self.__archive_fn = ""
if not os.path.exists(self.__ecotaxa_path):
# create the path!
os.makedirs(self.__ecotaxa_path)
if not os.path.exists(self.__objects_base_path):
# create the path!
os.makedirs(self.__objects_base_path)
logger.success("planktoscope.segmenter is initialised and ready to go!")
def __create_morphocut_pipeline(self):
"""Creates the Morphocut Pipeline"""
logger.debug("Let's start creating the Morphocut Pipeline")
with morphocut.Pipeline() as self.__pipe:
# TODO wrap morphocut.Call(logger.debug()) in something that allows it not to be added to the pipeline
# if the logger.level is not debug. Might not be as easy as it sounds.
# Recursively find .jpg files in import_path.
# Sort to get consecutive frames.
abs_path = morphocut.file.Find(
self.__working_path, [".jpg"], sort=True, verbose=True
)
# Extract name from abs_path
name = morphocut.Call(
lambda p: os.path.splitext(os.path.basename(p))[0], abs_path
)
morphocut.Call(planktoscope.light.segmenting())
# Read image
img = morphocut.image.ImageReader(abs_path)
# Show progress bar for frames
morphocut.stream.TQDM(morphocut.str.Format("Frame {name}", name=name))
# Apply running median to approximate the background image
flat_field = morphocut.stat.RunningMedian(img, 5)
# Correct image
img = img / flat_field
# Rescale intensities and convert to uint8 to speed up calculations
img = morphocut.image.RescaleIntensity(
img, in_range=(0, 1.1), dtype="uint8"
)
# Filter variable to reduce memory load
morphocut.stream.FilterVariables(name, img)
# Save cleaned images
# frame_fn = morphocut.str.Format(os.path.join("/home/pi/PlanktonScope/tmp","CLEAN", "{name}.jpg"), name=name)
# morphocut.image.ImageWriter(frame_fn, img)
# Convert image to uint8 gray
img_gray = morphocut.image.RGB2Gray(img)
# ?
img_gray = morphocut.Call(skimage.util.img_as_ubyte, img_gray)
# Canny edge detection using OpenCV
img_canny = morphocut.Call(cv2.Canny, img_gray, 50, 100)
# Dilate using OpenCV
kernel = morphocut.Call(
cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15)
)
img_dilate = morphocut.Call(cv2.dilate, img_canny, kernel, iterations=2)
# Close using OpenCV
kernel = morphocut.Call(
cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (5, 5)
)
img_close = morphocut.Call(
cv2.morphologyEx, img_dilate, cv2.MORPH_CLOSE, kernel, iterations=1
)
# Erode using OpenCV
kernel = morphocut.Call(
cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15)
)
mask = morphocut.Call(cv2.erode, img_close, kernel, iterations=2)
# Find objects
regionprops = morphocut.image.FindRegions(
mask, img_gray, min_area=1000, padding=10, warn_empty=name
)
# For an object, extract a vignette/ROI from the image
roi_orig = morphocut.image.ExtractROI(img, regionprops, bg_color=255)
# Generate an object identifier
i = morphocut.stream.Enumerate()
# morphocut.Call(print,i)
# Define the ID of each object
object_id = morphocut.str.Format("{name}_{i:d}", name=name, i=i)
# morphocut.Call(print,object_id)
# Define the name of each object
object_fn = morphocut.str.Format(
os.path.join(self.__working_obj_path, "{name}.jpg"),
name=object_id,
)
# Save the image of the object with its name
morphocut.image.ImageWriter(object_fn, roi_orig)
# Calculate features. The calculated features are added to the global_metadata.
# Returns a Variable representing a dict for every object in the stream.
meta = morphocut.contrib.zooprocess.CalculateZooProcessFeatures(
regionprops, prefix="object_", meta=self.__global_metadata
)
# Get all the metadata
json_meta = morphocut.Call(json.dumps, meta, sort_keys=True, default=str)
# Publish the json containing all the metadata to via MQTT to Node-RED
morphocut.Call(
self.segmenter_client.client.publish,
"status/segmenter/metric",
json_meta,
)
# Add object_id to the metadata dictionary
meta["object_id"] = object_id
# Generate object filenames
orig_fn = morphocut.str.Format("{object_id}.jpg", object_id=object_id)
# Write objects to an EcoTaxa archive:
# roi image in original color, roi image in grayscale, metadata associated with each object
morphocut.contrib.ecotaxa.EcotaxaWriter(
self.__archive_fn, (orig_fn, roi_orig), meta
)
# Progress bar for objects
morphocut.stream.TQDM(
morphocut.str.Format("Object {object_id}", object_id=object_id)
)
id_json = morphocut.str.Format(
'{{"object_id":"{object_id}"}}', object_id=object_id
)
# Publish the object_id to via MQTT to Node-RED
morphocut.Call(
self.segmenter_client.client.publish,
"status/segmenter/object_id",
id_json,
)
logger.info("Morphocut's Pipeline has been created")
@logger.catch
def treat_message(self):
action = ""
if self.segmenter_client.new_message_received():
logger.info("We received a new message")
last_message = self.segmenter_client.msg["payload"]
logger.debug(last_message)
action = self.segmenter_client.msg["payload"]["action"]
self.segmenter_client.read_message()
# If the command is "segment"
if action == "segment":
# {"action":"segment"}
# Publish the status "Started" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Started"}'
)
img_paths = [x[0] for x in os.walk(self.__img_path)]
logger.info(f"The pipeline will be run in {len(img_paths)} directories")
logger.debug(f"Those are {img_paths}")
for path in img_paths:
logger.info(f"{path}: Checking for the presence of metadata.json")
if os.path.exists(os.path.join(path, "metadata.json")):
# The file exists, let's run the pipe!
logger.info(f"Loading the metadata file for {path}")
with open(os.path.join(path, "metadata.json"), "r") as config_file:
self.__global_metadata = json.load(config_file)
logger.debug(
f"Configuration loaded is {self.__global_metadata}"
)
project = self.__global_metadata["sample_project"].replace(" ", "_")
date = self.__global_metadata["process_datetime"]
sample = self.__global_metadata["sample_id"]
# Define the name of the .zip file that will contain the images and the .tsv table for EcoTaxa
self.__archive_fn = os.path.join(
self.__ecotaxa_path,
# filename includes project name, timestamp and sample id
f"export_{project}_{date}_{sample}.zip",
)
self.__working_path = path
# recreate the subfolder img architecture of this folder inside objects
# when we split the working path with the base img path, we get the date/sample architecture back
# "/home/pi/data/img/2020-10-17/5/5".split("/home/pi/data/img/")[1] => '2020-10-17/5/5'
sample_path = self.__working_path.split(self.__img_path)[1].strip()
logger.debug(f"base obj path is {self.__objects_base_path}")
logger.debug(f"sample path is {sample_path}")
self.__working_obj_path = os.path.join(
self.__objects_base_path, sample_path
)
logger.debug(
f"The working objects path is {self.__working_obj_path}"
)
# Create the objects path
if not os.path.exists(self.__working_obj_path):
# create the path!
os.makedirs(self.__working_obj_path)
logger.debug(f"The archive folder is {self.__archive_fn}")
self.__create_morphocut_pipeline()
logger.info(f"Starting the pipeline in {path}")
# Start the MorphoCut Pipeline on the found path
try:
self.__pipe.run()
except Exception as e:
planktoscope.light.error()
logger.exception(f"There was an error in the pipeline {e}")
logger.info(f"Pipeline has been run for {path}")
else:
logger.info(f"Moving to the next folder, {path} is empty")
# remove directory
# shutil.rmtree(import_path)
# Publish the status "Done" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Done"}'
)
planktoscope.light.ready()
elif action == "stop":
logger.info("The segmentation has been interrupted.")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Interrupted"}'
)
planktoscope.light.interrupted()
elif action == "update_config":
logger.error("We can't update the configuration while we are segmenting.")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Busy"}'
)
elif action != "":
logger.warning(
f"We did not understand the received request {action} - {last_message}"
)
################################################################################
# While loop for capturing commands from Node-RED
################################################################################
@logger.catch
def run(self):
"""This is the function that needs to be started to create a thread"""
logger.info(
f"The segmenter control thread has been started in process {os.getpid()}"
)
# MQTT Service connection
self.segmenter_client = planktoscope.mqtt.MQTT_Client(
topic="segmenter/#", name="segmenter_client"
)
################################################################################
# Morphocut Libraries
################################################################################
import morphocut
import morphocut.file
import morphocut.image
import morphocut.stat
import morphocut.stream
import morphocut.str
import morphocut.contrib.ecotaxa
import morphocut.contrib.zooprocess
################################################################################
# Other image processing Libraries
################################################################################
import skimage.util
import cv2
# Instantiate the morphocut pipeline
# self.__create_morphocut_pipeline()
# Publish the status "Ready" to via MQTT to Node-RED
self.segmenter_client.client.publish("status/segmenter", '{"status":"Ready"}')
logger.success("Segmenter is READY!")
# This is the loop
while not self.stop_event.is_set():
self.treat_message()
time.sleep(0.5)
logger.info("Shutting down the segmenter process")
self.segmenter_client.client.publish("status/segmenter", '{"status":"Dead"}')
self.segmenter_client.shutdown()
logger.success("Segmenter process shut down! See you!")
# This is called if this script is launched directly
if __name__ == "__main__":
# TODO This should be a test suite for this library
pass