planktoscope/scripts/mqtt_pump_focus_image.py
2020-02-02 04:20:55 +01:00

440 lines
15 KiB
Python

import paho.mqtt.client as mqtt
from picamera import PiCamera
from datetime import datetime, timedelta
from adafruit_motor import stepper
from adafruit_motorkit import MotorKit
from time import sleep
import json
import os
import subprocess
from skimage.util import img_as_ubyte
from morphocut import Call
from morphocut.contrib.ecotaxa import EcotaxaWriter
from morphocut.contrib.zooprocess import CalculateZooProcessFeatures
from morphocut.core import Pipeline
from morphocut.file import Find
from morphocut.image import (
ExtractROI,
FindRegions,
ImageReader,
ImageWriter,
RescaleIntensity,
RGB2Gray,
)
from morphocut.stat import RunningMedian
from morphocut.str import Format
from morphocut.stream import TQDM, Enumerate, FilterVariables
from skimage.feature import canny
from skimage.color import rgb2gray, label2rgb
from skimage.morphology import disk
from skimage.morphology import erosion, dilation, closing
from skimage.measure import label, regionprops
import cv2, shutil
import smbus
#fan
bus = smbus.SMBus(1)
################################################################################
kit = MotorKit()
pump_stepper = kit.stepper1
pump_stepper.release()
focus_stepper = kit.stepper2
focus_stepper.release()
################################################################################
camera = PiCamera()
camera.resolution = (3280, 2464)
camera.iso = 60
camera.shutter_speed = 500
camera.exposure_mode = 'fixedfps'
################################################################################
message = ''
topic = ''
count=''
################################################################################
def on_connect(client, userdata, flags, rc):
print("Connected! - " + str(rc))
client.subscribe("actuator/#")
rgb(0,255,0)
def on_subscribe(client, obj, mid, granted_qos):
print("Subscribed! - "+str(mid)+" "+str(granted_qos))
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
global message
global topic
global count
message=str(msg.payload.decode())
topic=msg.topic.split("/")[1]
count=0
def on_log(client, obj, level, string):
print(string)
def rgb(R,G,B):
bus.write_byte_data(0x0d, 0x00, 0)
bus.write_byte_data(0x0d, 0x01, R)
bus.write_byte_data(0x0d, 0x02, G)
bus.write_byte_data(0x0d, 0x03, B)
bus.write_byte_data(0x0d, 0x00, 1)
bus.write_byte_data(0x0d, 0x01, R)
bus.write_byte_data(0x0d, 0x02, G)
bus.write_byte_data(0x0d, 0x03, B)
bus.write_byte_data(0x0d, 0x00, 2)
bus.write_byte_data(0x0d, 0x01, R)
bus.write_byte_data(0x0d, 0x02, G)
bus.write_byte_data(0x0d, 0x03, B)
cmd="i2cdetect -y 1"
subprocess.Popen(cmd.split(),stdout=subprocess.PIPE)
################################################################################
client = mqtt.Client()
client.connect("127.0.0.1",1883,60)
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message
client.on_log = on_log
client.loop_start()
################################################################################
while True:
################################################################################
if (topic=="pump"):
rgb(0,0,255)
direction=message.split(" ")[0]
delay=float(message.split(" ")[1])
nb_step=int(message.split(" ")[2])
client.publish("receiver/pump", "Start");
while True:
if direction == "BACKWARD":
direction=stepper.BACKWARD
if direction == "FORWARD":
direction=stepper.FORWARD
count+=1
# print(count,nb_step)
pump_stepper.onestep(direction=direction, style=stepper.DOUBLE)
sleep(delay)
if topic!="pump":
pump_stepper.release()
print("The pump has been interrompted.")
client.publish("receiver/pump", "Interrompted");
rgb(0,255,0)
break
if count>nb_step:
pump_stepper.release()
print("The pumping is done.")
topic="wait"
client.publish("receiver/pump", "Done");
rgb(0,255,0)
break
################################################################################
elif (topic=="focus"):
rgb(255,255,0)
direction=message.split(" ")[0]
nb_step=int(message.split(" ")[1])
client.publish("receiver/focus", "Start");
while True:
if direction == "FORWARD":
direction=stepper.FORWARD
if direction == "BACKWARD":
direction=stepper.BACKWARD
count+=1
# print(count,nb_step)
focus_stepper.onestep(direction=direction, style=stepper.MICROSTEP)
if topic!="focus":
focus_stepper.release()
print("The stage has been interrompted.")
client.publish("receiver/focus", "Interrompted");
rgb(0,255,0)
break
if count>nb_step:
focus_stepper.release()
print("The focusing is done.")
topic="wait"
client.publish("receiver/focus", "Done");
rgb(0,255,0)
break
################################################################################
elif (topic=="image"):
camera.start_preview(fullscreen=False, window = (160, 0, 640, 480))
sleep_before=int(message.split(" ")[0])
nb_step=int(message.split(" ")[1])
path=str(message.split(" ")[2])
nb_frame=int(message.split(" ")[3])
sleep_during=int(message.split(" ")[4])
#sleep a duration before to start
sleep(sleep_before)
client.publish("receiver/image", "Start");
#flushing before to begin
rgb(0,0,255)
for i in range(nb_step):
pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE)
sleep(0.01)
rgb(0,255,0)
directory = os.path.join(path, "PlanktonScope")
os.makedirs(directory, exist_ok=True)
export = os.path.join(directory, "export")
os.makedirs(export, exist_ok=True)
date=datetime.now().strftime("%m_%d_%Y")
time=datetime.now().strftime("%H_%M")
path_date = os.path.join(directory, date)
os.makedirs(path_date, exist_ok=True)
path_time = os.path.join(path_date,time)
os.makedirs(path_time, exist_ok=True)
while True:
count+=1
# print(count,nb_frame)
filename = os.path.join(path_time,datetime.now().strftime("%M_%S_%f")+".jpg")
rgb(0,255,255)
camera.capture(filename)
rgb(0,255,0)
client.publish("receiver/image", datetime.now().strftime("%M_%S_%f")+".jpg has been imaged.");
rgb(0,0,255)
for i in range(10):
pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE)
sleep(0.01)
sleep(0.5)
rgb(0,255,0)
if(count>nb_frame):
camera.stop_preview()
client.publish("receiver/image", "Completed");
# Meta data that is added to every object
local_metadata = {
"process_datetime": datetime.now(),
"acq_camera_resolution" : camera.resolution,
"acq_camera_iso" : camera.iso,
"acq_camera_shutter_speed" : camera.shutter_speed
}
global_metadata = None
config_txt = None
RAW = None
CLEAN = None
ANNOTATED = None
OBJECTS = None
archive_fn = None
config_txt = open('/home/pi/PlanktonScope/config.txt','r')
node_red_metadata = json.loads(config_txt.read())
global_metadata = {**local_metadata, **node_red_metadata}
RAW = os.path.join(path_time, "RAW")
os.makedirs(RAW, exist_ok=True)
os.system("mv "+str(path_time)+"/*.jpg "+str(RAW))
CLEAN = os.path.join(path_time, "CLEAN")
os.makedirs(CLEAN, exist_ok=True)
ANNOTATED = os.path.join(path_time, "ANNOTATED")
os.makedirs(ANNOTATED, exist_ok=True)
OBJECTS = os.path.join(path_time, "OBJECTS")
os.makedirs(OBJECTS, exist_ok=True)
archive_fn = os.path.join(directory,"export", str(date)+"_"+str(time)+"_ecotaxa_export.zip")
client.publish("receiver/segmentation", "Start");
# Define processing pipeline
# Define processing pipeline
with Pipeline() as p:
# Recursively find .jpg files in import_path.
# Sort to get consective frames.
abs_path = Find(RAW, [".jpg"], sort=True, verbose=True)
FilterVariables(abs_path)
# Extract name from abs_path
name = Call(lambda p: os.path.splitext(os.path.basename(p))[0], abs_path)
Call(rgb, 0,255,0)
# Read image
img = ImageReader(abs_path)
# Show progress bar for frames
#TQDM(Format("Frame {name}", name=name))
# Apply running median to approximate the background image
flat_field = RunningMedian(img, 5)
# Correct image
img = img / flat_field
FilterVariables(name,img)
# Rescale intensities and convert to uint8 to speed up calculations
img = RescaleIntensity(img, in_range=(0, 1.1), dtype="uint8")
frame_fn = Format(os.path.join(CLEAN, "{name}.jpg"), name=name)
ImageWriter(frame_fn, img)
FilterVariables(name,img)
# Convert image to uint8 gray
img_gray = RGB2Gray(img)
# ?
img_gray = Call(img_as_ubyte, img_gray)
#Canny edge detection
img_canny = Call(cv2.Canny, img_gray, 50,100)
#Dilate
kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15))
img_dilate = Call(cv2.dilate, img_canny, kernel, iterations=2)
#Close
kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (5, 5))
img_close = Call(cv2.morphologyEx, img_dilate, cv2.MORPH_CLOSE, kernel, iterations=1)
#Erode
kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15))
mask = Call(cv2.erode, img_close, kernel, iterations=2)
FilterVariables(name,img,img_gray,mask)
# Find objects
regionprops = FindRegions(
mask, img_gray, min_area=1000, padding=10, warn_empty=name
)
Call(rgb, 255,0,255)
# For an object, extract a vignette/ROI from the image
roi_orig = ExtractROI(img, regionprops, bg_color=255)
# Generate an object identifier
i = Enumerate()
#Call(print,i)
object_id = Format("{name}_{i:d}", name=name, i=i)
#Call(print,object_id)
object_fn = Format(os.path.join(OBJECTS, "{name}.jpg"), name=object_id)
ImageWriter(object_fn, roi_orig)
# Calculate features. The calculated features are added to the global_metadata.
# Returns a Variable representing a dict for every object in the stream.
meta = CalculateZooProcessFeatures(
regionprops, prefix="object_", meta=global_metadata
)
json_meta = Call(json.dumps,meta, sort_keys=True, default=str)
Call(client.publish, "receiver/segmentation/metric", json_meta)
# Add object_id to the metadata dictionary
meta["object_id"] = object_id
# Generate object filenames
orig_fn = Format("{object_id}.jpg", object_id=object_id)
FilterVariables(orig_fn,roi_orig,meta,object_id)
# Write objects to an EcoTaxa archive:
# roi image in original color, roi image in grayscale, metadata associated with each object
EcotaxaWriter(archive_fn, (orig_fn, roi_orig), meta)
# Progress bar for objects
TQDM(Format("Object {object_id}", object_id=object_id))
Call(client.publish, "receiver/segmentation/object_id", object_id)
meta=None
FilterVariables(meta)
p.run()
#remove directory
#shutil.rmtree(import_path)
client.publish("receiver/segmentation", "Completed");
rgb(255,255,255)
sleep(sleep_during)
rgb(0,255,0)
date=datetime.now().strftime("%m_%d_%Y")
time=datetime.now().strftime("%H_%M")
path_date = os.path.join(directory, date)
os.makedirs(path_date, exist_ok=True)
path_time = os.path.join(path_date,time)
os.makedirs(path_time, exist_ok=True)
rgb(0,0,255)
for i in range(nb_step):
pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE)
sleep(0.01)
rgb(0,255,0)
os.makedirs(path_time, exist_ok=True)
count=0
if topic!="image":
pump_stepper.release()
print("The imaging has been interrompted.")
client.publish("receiver/image", "Interrompted");
rgb(0,255,0)
count=0
break
else:
# print("Waiting")
sleep(1)