diff --git a/scripts/mqtt_pump_focus_image_segment_strem.py b/scripts/mqtt_pump_focus_image_segment_strem.py new file mode 100644 index 0000000..c1e7a27 --- /dev/null +++ b/scripts/mqtt_pump_focus_image_segment_strem.py @@ -0,0 +1,529 @@ +################################################################################ +#Actuator Libraries +################################################################################ + +#Library for exchaning messages with Node-RED +import paho.mqtt.client as mqtt + +#Library to control the PiCamera +from picamera import PiCamera + +#Libraries to control the steppers for focusing and pumping +from adafruit_motor import stepper +from adafruit_motorkit import MotorKit + +#Library to send command over I2C for the light module on the fan +import smbus + +################################################################################ +#Practical Libraries +################################################################################ + +#Library to get date and time for folder name and filename +from datetime import datetime, timedelta + +#Library to be able to sleep for a duration +from time import sleep + +#Libraries manipulate json format, execute bash commands +import json, shutil, os, subprocess + +################################################################################ +#Morphocut Libraries +################################################################################ + +from skimage.util import img_as_ubyte +from morphocut import Call +from morphocut.contrib.ecotaxa import EcotaxaWriter +from morphocut.contrib.zooprocess import CalculateZooProcessFeatures +from morphocut.core import Pipeline +from morphocut.file import Find +from morphocut.image import (ExtractROI, + FindRegions, + ImageReader, + ImageWriter, + RescaleIntensity, + RGB2Gray +) +from morphocut.stat import RunningMedian +from morphocut.str import Format +from morphocut.stream import TQDM, Enumerate, FilterVariables + +################################################################################ +#Other image processing Libraries +################################################################################ + +from skimage.feature import canny +from skimage.color import rgb2gray, label2rgb +from skimage.morphology import disk +from skimage.morphology import erosion, dilation, closing +from skimage.measure import label, regionprops +#pip3 install opencv-python +import cv2 + + +################################################################################ +#STREAMING +################################################################################ +import io +import picamera +import logging +import socketserver +from threading import Condition +from http import server +import threading + +PAGE="""\ + + +picamera MJPEG streaming demo + + + + + +""" + +class StreamingOutput(object): + def __init__(self): + self.frame = None + self.buffer = io.BytesIO() + self.condition = Condition() + + def write(self, buf): + if buf.startswith(b'\xff\xd8'): + # New frame, copy the existing buffer's content and notify all + # clients it's available + self.buffer.truncate() + with self.condition: + self.frame = self.buffer.getvalue() + self.condition.notify_all() + self.buffer.seek(0) + return self.buffer.write(buf) + +class StreamingHandler(server.BaseHTTPRequestHandler): + def do_GET(self): + if self.path == '/': + self.send_response(301) + self.send_header('Location', '/index.html') + self.end_headers() + elif self.path == '/index.html': + content = PAGE.encode('utf-8') + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.send_header('Content-Length', len(content)) + self.end_headers() + self.wfile.write(content) + elif self.path == '/stream.mjpg': + self.send_response(200) + self.send_header('Age', 0) + self.send_header('Cache-Control', 'no-cache, private') + self.send_header('Pragma', 'no-cache') + self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') + self.end_headers() + try: + while True: + with output.condition: + output.condition.wait() + frame = output.frame + self.wfile.write(b'--FRAME\r\n') + self.send_header('Content-Type', 'image/jpeg') + self.send_header('Content-Length', len(frame)) + self.end_headers() + self.wfile.write(frame) + self.wfile.write(b'\r\n') + except Exception as e: + logging.warning( + 'Removed streaming client %s: %s', + self.client_address, str(e)) + else: + self.send_error(404) + self.end_headers() + +class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): + allow_reuse_address = True + daemon_threads = True +################################################################################ +#MQTT core functions +################################################################################ +#Run this function in order to connect to the client (Node-RED) +def on_connect(client, userdata, flags, rc): + #Print when connected + print("Connected! - " + str(rc)) + #When connected, run subscribe() + client.subscribe("actuator/#") + #Turn green the light module + rgb(0,255,0) + +#Run this function in order to subscribe to all the topics begining by actuator +def on_subscribe(client, obj, mid, granted_qos): + #Print when subscribed + print("Subscribed! - "+str(mid)+" "+str(granted_qos)) + +#Run this command when Node-RED is sending a message on the subscribed topic +def on_message(client, userdata, msg): + #Print the topic and the message + print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + #Update the global variables command, args and counter + global command + global args + global counter + #Parse the topic to find the command. ex : actuator/pump -> pump + command=msg.topic.split("/")[1] + #Decode the message to find the arguments + args=str(msg.payload.decode()) + #Reset the counter to 0 + counter=0 + +################################################################################ +#Actuators core functions +################################################################################ +def rgb(R,G,B): + #Update LED n°1 + bus.write_byte_data(0x0d, 0x00, 0) + bus.write_byte_data(0x0d, 0x01, R) + bus.write_byte_data(0x0d, 0x02, G) + bus.write_byte_data(0x0d, 0x03, B) + + #Update LED n°2 + bus.write_byte_data(0x0d, 0x00, 1) + bus.write_byte_data(0x0d, 0x01, R) + bus.write_byte_data(0x0d, 0x02, G) + bus.write_byte_data(0x0d, 0x03, B) + + #Update LED n°3 + bus.write_byte_data(0x0d, 0x00, 2) + bus.write_byte_data(0x0d, 0x01, R) + bus.write_byte_data(0x0d, 0x02, G) + bus.write_byte_data(0x0d, 0x03, B) + + #Update the I2C Bus in order to really update the LEDs new values + cmd="i2cdetect -y 1" + subprocess.Popen(cmd.split(),stdout=subprocess.PIPE) + + + +################################################################################ +#Init function - executed only once +################################################################################ + +#define the bus used to actuate the light module on the fan +bus = smbus.SMBus(1) + +#define the names for the 2 exsting steppers +kit = MotorKit() +pump_stepper = kit.stepper1 +pump_stepper.release() +focus_stepper = kit.stepper2 +focus_stepper.release() + +#Precise the settings of the PiCamera +camera = PiCamera() +camera.resolution = (3280, 2464) +camera.iso = 60 +camera.shutter_speed = 500 +camera.exposure_mode = 'fixedfps' + +#Declare the global variables command, args and counter +command = '' +args = '' +counter='' + +client = mqtt.Client() +client.connect("127.0.0.1",1883,60) +client.on_connect = on_connect +client.on_subscribe = on_subscribe +client.on_message = on_message +client.loop_start() + +################################################################################ + +local_metadata = { + "process_datetime": datetime.now(), + "acq_camera_resolution" : camera.resolution, + "acq_camera_iso" : camera.iso, + "acq_camera_shutter_speed" : camera.shutter_speed +} + +config_txt = open('/home/pi/PlanktonScope/config.txt','r') +node_red_metadata = json.loads(config_txt.read()) + +global_metadata = {**local_metadata, **node_red_metadata} + +archive_fn = os.path.join("/home/pi/PlanktonScope/","export", "ecotaxa_export.zip") +# Define processing pipeline + +with Pipeline() as p: + # Recursively find .jpg files in import_path. + # Sort to get consective frames. + abs_path = Find("/home/pi/PlanktonScope/tmp", [".jpg"], sort=True, verbose=True) + + + # Extract name from abs_path + name = Call(lambda p: os.path.splitext(os.path.basename(p))[0], abs_path) + + Call(rgb, 0,255,0) + + # Read image + img = ImageReader(abs_path) + + # Show progress bar for frames + TQDM(Format("Frame {name}", name=name)) + + # Apply running median to approximate the background image + flat_field = RunningMedian(img, 5) + + # Correct image + img = img / flat_field + + # Rescale intensities and convert to uint8 to speed up calculations + img = RescaleIntensity(img, in_range=(0, 1.1), dtype="uint8") + + FilterVariables(name,img) +# frame_fn = Format(os.path.join("/home/pi/PlanktonScope/tmp","CLEAN", "{name}.jpg"), name=name) + +# ImageWriter(frame_fn, img) + + # Convert image to uint8 gray + img_gray = RGB2Gray(img) + + # ? + img_gray = Call(img_as_ubyte, img_gray) + + #Canny edge detection + img_canny = Call(cv2.Canny, img_gray, 50,100) + + #Dilate + kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15)) + img_dilate = Call(cv2.dilate, img_canny, kernel, iterations=2) + + #Close + kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (5, 5)) + img_close = Call(cv2.morphologyEx, img_dilate, cv2.MORPH_CLOSE, kernel, iterations=1) + + #Erode + kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15)) + mask = Call(cv2.erode, img_close, kernel, iterations=2) + + # Find objects + regionprops = FindRegions( + mask, img_gray, min_area=1000, padding=10, warn_empty=name + ) + + Call(rgb, 255,0,255) + # For an object, extract a vignette/ROI from the image + roi_orig = ExtractROI(img, regionprops, bg_color=255) + + # Generate an object identifier + i = Enumerate() + #Call(print,i) + + object_id = Format("{name}_{i:d}", name=name, i=i) + #Call(print,object_id) + object_fn = Format(os.path.join("/home/pi/PlanktonScope/","OBJECTS", "{name}.jpg"), name=object_id) + + ImageWriter(object_fn, roi_orig) + + # Calculate features. The calculated features are added to the global_metadata. + # Returns a Variable representing a dict for every object in the stream. + meta = CalculateZooProcessFeatures( + regionprops, prefix="object_", meta=global_metadata + ) + + json_meta = Call(json.dumps,meta, sort_keys=True, default=str) + + Call(client.publish, "receiver/segmentation/metric", json_meta) + + # Add object_id to the metadata dictionary + meta["object_id"] = object_id + + # Generate object filenames + orig_fn = Format("{object_id}.jpg", object_id=object_id) + + # Write objects to an EcoTaxa archive: + # roi image in original color, roi image in grayscale, metadata associated with each object + EcotaxaWriter(archive_fn, (orig_fn, roi_orig), meta) + + # Progress bar for objects + TQDM(Format("Object {object_id}", object_id=object_id)) + + Call(client.publish, "receiver/segmentation/object_id", object_id) + + +output = StreamingOutput() +address = ('', 8000) +server = StreamingServer(address, StreamingHandler) + +threading.Thread(target=server.serve_forever).start() +################################################################################ + +camera.start_recording(output, format='mjpeg', resize=(640, 480)) +while True: + + if (command=="pump"): + rgb(0,0,255) + direction=args.split(" ")[0] + delay=float(args.split(" ")[1]) + nb_step=int(args.split(" ")[2]) + + client.publish("receiver/pump", "Start"); + + + while True: + + if direction == "BACKWARD": + direction=stepper.BACKWARD + if direction == "FORWARD": + direction=stepper.FORWARD + pump_stepper.onestep(direction=direction, style=stepper.DOUBLE) + counter+=1 + sleep(delay) + + if command!="pump": + pump_stepper.release() + print("The pump has been interrompted.") + client.publish("receiver/pump", "Interrompted"); + rgb(0,255,0) + break + + if counter>nb_step: + pump_stepper.release() + print("The pumping is done.") + command="wait" + client.publish("receiver/pump", "Done"); + rgb(0,255,0) + break + +################################################################################ + + elif (command=="focus"): + + rgb(255,255,0) + direction=args.split(" ")[0] + nb_step=int(args.split(" ")[1]) + client.publish("receiver/focus", "Start"); + + + while True: + + if direction == "FORWARD": + direction=stepper.FORWARD + if direction == "BACKWARD": + direction=stepper.BACKWARD + counter+=1 + focus_stepper.onestep(direction=direction, style=stepper.MICROSTEP) + + if command!="focus": + focus_stepper.release() + print("The stage has been interrompted.") + client.publish("receiver/focus", "Interrompted"); + rgb(0,255,0) + break + + if counter>nb_step: + focus_stepper.release() + print("The focusing is done.") + command="wait" + client.publish("receiver/focus", "Done"); + rgb(0,255,0) + break + +################################################################################ + + elif (command=="image"): + + + sleep_before=int(args.split(" ")[0]) + + nb_step=int(args.split(" ")[1]) + + path=str(args.split(" ")[2]) + + nb_frame=int(args.split(" ")[3]) + + sleep_during=int(args.split(" ")[4]) + + #sleep a duration before to start + sleep(sleep_before) + + + client.publish("receiver/image", "Start"); + + #flushing before to begin + + rgb(0,0,255) + for i in range(nb_step): + if (command=="image"): + pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE) + sleep(0.01) + else: + break + rgb(0,255,0) + + while True: + + counter+=1 + print(datetime.now().strftime("%H_%M_%S_%f")) + filename = os.path.join("/home/pi/PlanktonScope/tmp",datetime.now().strftime("%M_%S_%f")+".jpg") + + rgb(0,255,255) + camera.capture(filename) + rgb(0,255,0) + + client.publish("receiver/image", datetime.now().strftime("%M_%S_%f")+".jpg has been imaged."); + + rgb(0,0,255) + for i in range(10): + pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE) + sleep(0.01) + sleep(0.5) + rgb(0,255,0) + + if(counter>nb_frame): + +# camera.stop_preview() + + client.publish("receiver/image", "Completed"); + # Meta data that is added to every object + + + + client.publish("receiver/segmentation", "Start"); + # Define processing pipeline + + + p.run() + + #remove directory + #shutil.rmtree(import_path) + + client.publish("receiver/segmentation", "Completed"); + + + cmd = os.popen("rm -rf /home/pi/PlanktonScope/tmp/*.jpg") + + rgb(255,255,255) + sleep(sleep_during) + rgb(0,255,0) + + + rgb(0,0,255) + for i in range(nb_step): + pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE) + sleep(0.01) + rgb(0,255,0) + + counter=0 + + + if command!="image": + pump_stepper.release() + print("The imaging has been interrompted.") + client.publish("receiver/image", "Interrompted"); + rgb(0,255,0) + counter=0 + break + + else: +# print("Waiting") + sleep(1)