Extraction and refactor of the python code from node-red flow

The rationale for this rewrite is to improve the readability, the modularity, the reliability and the future-proofing of the main python script.

All in all, this is now 124 commits that are going to be squashed and merged together, spanning more than two weeks of development and testing.

Please test away this release and break things. An upgrading guide will be published in the coming days, along with a new image for people to use if they don't want to upgrade on their own.

Read along if you want to know all the goodies!

As a starter, the python script was extracted from the main flow, and now lives in its own files at `PlantonScope/scripts/*`.

We set up the auto formatting of the code by using [Black](https://github.com/psf/black). This make the code clearer and uniform. We are using the default settings, so if you just install Black and set your editor to format on save using it, you should be good to go.

The code is separated in four main processes, each with a specific set of responsibilities:
- The main process controls all the others, starts everything up and cleans up on shutdown
- The stepper process manages the stepper movements. It's now possible to have simultaneous movements of both motors (this closes #38 ).
- The imager process controls the camera and the streaming server via a state machine.
- The segmenter process manages the segmentation and its outputs. The segmentation happens recursively in all folders in `/home/pi/PlanktonScope/img/`. Each folder has its own output archive, and bug #26 is now closed.


Those processes communicates together using MQTT and json messages. Each message is adressed to one topic. The high level topic controls which process receives the message. The details of each topic is at the end of this commit message.

Every imaging sessions has now its own folder, under the `img` root. Metadata are saved individually for every session, in a JSON file in the same directory as the pictures.

The configuration is not parsed from `config.json` anymore and passed directly through MQTT messages to the concerned process.

A new configuration file has been created: `hardware.json`. This file contains information related to your specific hardware configuration. You can choose to reverse the connection of the motors for example, but you can also define specific speed limits and steps number for your pump and focus stage. This will make it easier for people who wants to experiment with different kind of hardware. It's not necessary to have this file though. If it doesn't exists, the default configuration will be applied.

The code is architectured around 6 modules and about 10 classes. I encourage you to have a look at the files, they're pretty straightforward to understand.

There is a lot of work left around the node-red code refactoring, dashboard ui improvements, better and clearer LED messages, OLED screen integration and finer control of the segmentation process, but this is quite good for now.


Here is the topic lists for MQTT and the corresponding messages.
- actuator :  This topic adresses the stepper control thread
              No publication under this topic should happen from the python process
  - actuator/pump :   Control of the pump
                      The message is a json object
                      {"action":"move", "direction":"FORWARD", "volume":10, "flowrate":1}
                      to move 10mL forward at 1mL/min
                      action can be "move" or "stop"
                      Receive only
  - actuator/focus :  Control of the focus stage
                      The message is a json object, speed is optional
                      {"action":"move", "direction":"UP", "distance":0.26, "speed":1}
                      to move up 10mm
                      action can be "move" or "stop"
                      Receive only
- imager/image :      This topic adresses the imaging thread
                      Is a json object with
                      {"action":"image","sleep":5,"volume":1,"nb_frame":200}
                      sleep in seconds, volume in mL
                      Can also receive a config update message:
                      {"action":"config","config":[...]}
                      Can also receive a camera settings message:
                      {"action":"settings","iso":100,"shutter_speed":40}
                      Receive only
- segmenter/segment : This topic adresses the segmenter process
                      Is a json object with
                      {"action":"segment"}
                      Receive only
- status :    This topics sends feedback to Node-Red
              No publication or receive at this level
  - status/pump :     State of the pump
                      Is a json object with
                      {"status":"Start", "time_left":25}
                      Status is one of Started, Ready, Done, Interrupted
                      Publish only
  - status/focus :    State of the focus stage
                      Is a json object with
                      {"status":"Start", "time_left":25}
                      Status is one of Started, Ready, Done, Interrupted
                      Publish only
  - status/imager :   State of the imager
                      Is a json object with
                      {"status":"Start", "time_left":25}
                      Status is one of Started, Ready, Completed or 12_11_15_0.1.jpg has been imaged.
                      Publish only
  - status/segmenter :   Status of the segmentation
      - status/segmenter/name
      - status/segmenter/object_id
      - status/segmenter/metric




Here is the original commit history:
* Extract python main.py from flow

* Fix bug in server addresses

These addresses should be the loopback device instead of the network 
address of the device. Using the loopback address will not necessitate 
to update the script when the network address changes.

* clean up picamera import

* changes to main python and flow:
update MQTT requests address to localhost (bugfix)
update streaming output address to nothing
update main flow to remove python script references and location

* Automatically initialise imaging led on startup to off state.

* Add the ability to invert outputs of the motor

We added a key to config.json "hardware_config" with a subkey 
"stepper_reverse". If this key is present in the config file and set to 
1, the output of the motors are inversed (stepper2 becomes the pump 
motor and stepper1 the focus motor)

* move all non main script to a subfolder

* add __init__.py to package

* light module rewrite

* json cleanup and absolute path for config file

* light.py forgot to import subprocess

#oups

* Add command to turn the leds off

* Auto formatting of main.py

I've used Black with default settings, see https://github.com/psf/black

* First commit of stepper.py
Pump parameters still needs to be checked and tuned.

* addition of hardware details in config.json

* Introduce hardware.json to replace the `hardware_config` of config.json

* stepper.py: calibration, typos

* creates the MQTT_Client class

* pump_max_speed is now in ml/min to help readability

* forgot to add self to the class def

* addition of threading capabilities to stepper.py (UNTESTED)

* mqtt: fix topic bug

* remove counter

* mqtt add doc about topics

* stepper.py creates an "actuator/*/state" topic

* stepper.py: rename mqtt_client to pump_client

* mqtt.py: add details about topics

* stepper.py: rename pump_client to actuator_client

* topic was not split properly and a part was lost

* switch to f-strings for mqtt.py

* cosmetic update

* stepper.py: folder name will be planktoscope change calls

* hardware.json became more straightforward

* stepper.py syntax bugs

* stepper.py addition of a received stop command

* stepper.py: update to max travel distance

* stepper.py: several typos here

* rename folder

* main.py: reword to reflect folder rename

* main.py: remove logic that has been moved to stepper.py and mqtt.py

* main.py: update to add mqtt imaging client

* mqtt.py: make command and args local to class and output more verbose

* make stepper.py a class

* main.py: instantiate stepper class and call it

* main.py: name mqtt client

* update to main.json to reflect main.py changes

* fix bugs around pump control

* update flows to latest version from Thibault

* distance can be a small value, and definitevely should be a float.

* unify mqtt topics

* unify mqtt output in the main flow

* first logger implementation, uses loguru

* mqtt: add reason to on_connect

* mqtt: add on_disconnect handler

* stepper: add more logger calls for debug mainly

* main: add levels for logger

* imager.py: first move of the imager logic

* imager: time import cleanup

* imager: morphocut import cleanup

* imager: skimage import cleanup

* imager: finishing import cleanup

* imager: Class creation - WIP

Also provides a fix for #26 (see line 190).

* imager: threading is needed for Condition()

* streamer: get the streamer server its own file

* imager: creates start_camera and get the server creation out

* imager: subclass multiprocessing.Process

* imager: get Pipeline creation its own function

* imager: cleanup of self calls

* main: code removal and corresponding calls to newly created classes

* imager: various formatting changes

* main: management of signal shutdown

* add requirements.txt

* mqtt: messages are now json objects

Also, addition of a flag on receiving a new message

* mqtt: make message private and add logic to synchronise

* stepper: creates the stepper class

* stepper: use the new class

* stepper: uses the new logic

* stepper: add the shutdown event

* stepper: add shutdown method

* main: add shutdown event

* imager: graceful shutdown

* stepper: nicer way of checking the Eevnt

* self is a required first argument for a method in a class

Especially if you use said class private members!

* python: various typos and small errors in import

* stepper: create mqtt client during init

* stepper: instanciate the mqtt client inside run

Otherwise it's not accessible from inside the loop. It's a PITA,
more information at  https://stackoverflow.com/questions/17172878/using-pythons-multiprocessing-process-class

* stepper: little bugs and typos all around

* mqtt: add shutdown method

* mqtt: add connect in init

* stepper: fix bugs, sanitize inputs

* stepper: work on delay prediction improvements

* stepper: json is mean, double quote are mandatory inside

* mqtt: add details about message exchanged

* imager: first implementation of json messages

* main.json: add new tab for RPi management + json for payloads

* imager: add state_machine class

* stepper: publish last will

* imager: major refactor

* main: make streaming server process a daemon

* mqtt: insert debug statement on close

* main: reorder imports

* imager: make it work!

Reinsert the streaming server logic in there, because there is a problem with the synchronisation part otherwise.
Also, eventually, StreamingOuput() will have to be made not global

Final very critical learning: it's super duper important to make sure the memory split is at least 256Meg for the GPU.
Chaos ensues otherwise

* main: changes to accomodate the streamer/imager fusino

* imager_state_machine: insert states transition description

* stepper: cleanup of code

* segmenter: creation of the class

* python: include segmenter changes

* remove unused files

* stepper: check existence of hardware.json

* main.json: changes to reflect the python script evolution

* remove unecessary TODOs and add some others

* main: add check for config and directories

* imager: update_config is implemented and we have better management of directories now

* segmenter: now work recursively in all folders

* flow: the configuration is now sent via mqtt

* segmenter: better manage pipeline error

* segmenter: declaration of archive_fn in init

* imager: small bugs and typos

* main: add uniqueID output

* imager: add the camera settings message

We can now update the ISO, shutter speed and resolution from Node-Red

* package.json: update dependencies
This commit is contained in:
Romain Bazile 2020-09-28 11:05:27 +02:00 committed by GitHub
parent 0ceb7abf8d
commit 06ef401a3b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 3252 additions and 1826 deletions

View file

@ -27,4 +27,3 @@
"process_pixel": 1.19,
"process_id": 146
}

File diff suppressed because one or more lines are too long

7
hardware.json Normal file
View file

@ -0,0 +1,7 @@
{
"stepper_reverse" : 1,
"focus_steps_per_mm" : 40,
"pump_steps_per_ml" : 614,
"focus_max_speed" : 0.5,
"pump_max_speed" : 30
}

View file

@ -3,12 +3,13 @@
"description": "Planktonscope main software project",
"version": "0.1",
"dependencies": {
"node-red-contrib-gpsd": "1.0.2",
"node-red-contrib-interval": "0.0.1",
"node-red-contrib-python3-function": "0.0.4",
"node-red-contrib-web-worldmap": "2.4.1",
"node-red-dashboard": "2.23.0",
"node-red-node-pi-gpio": "1.1.1"
"node-red-contrib-gpsd": "^1.0.2",
"node-red-contrib-interval": "^0.0.1",
"node-red-contrib-python3-function": "^0.0.4",
"node-red-contrib-web-worldmap": "^2.5.4",
"node-red-dashboard": "^2.23.4",
"node-red-node-pi-gpio": "^1.2.0",
"node-red-contrib-camerapi": "0.0.38"
},
"node-red": {
"settings": {

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
RPi.GPIO==0.7.0
Adafruit-Blinka==5.2.3
adafruit-circuitpython-motorkit==1.5.2
Adafruit-SSD1306==1.6.2
paho-mqtt==1.5.0
picamera==1.13
opencv-contrib-python==4.1.0.25
morphocut==0.1.1+42.g01a051e
scipy==1.1.0

View file

@ -1,19 +0,0 @@
#!/usr/bin/python
import smbus
import sys
state = str(sys.argv[1])
bus = smbus.SMBus(1)
DEVICE_ADDRESS = 0x0d
def fan(state):
if state == "false":
bus.write_byte_data(DEVICE_ADDRESS, 0x08, 0x00)
bus.write_byte_data(DEVICE_ADDRESS, 0x08, 0x00)
if state == "true":
bus.write_byte_data(DEVICE_ADDRESS, 0x08, 0x01)
bus.write_byte_data(DEVICE_ADDRESS, 0x08, 0x01)
fan(state)

View file

@ -1,28 +0,0 @@
#!/usr/bin/env python
from adafruit_motor import stepper
from adafruit_motorkit import MotorKit
from time import sleep
import sys
nb_step = int(sys.argv[1])
orientation = str(sys.argv[2])
kit = MotorKit()
stage = kit.stepper2
stage.release()
#0.25mm/step
#31um/microsteps
if orientation == 'up':
for i in range(nb_step):
stage.onestep(direction=stepper.FORWARD, style=stepper.MICROSTEP)
sleep(0.001)
if orientation == 'down':
for i in range(nb_step):
stage.onestep(direction=stepper.BACKWARD, style=stepper.MICROSTEP)
sleep(0.001)
stage.release()

View file

@ -1,55 +0,0 @@
#!/usr/bin/env python
#Imaging a volume with a flowrate
#in folder named "/home/pi/PlanktonScope/acquisitions/sample_project/sample_id/acq_id"
#python3.7 $HOME/PlanktonScope/scripts/image.py tara_pacific sample_project sample_id acq_id volume flowrate
import time
from time import sleep
from picamera import PiCamera
from datetime import datetime, timedelta
import os
import sys
path=str(sys.argv[1])
#[i] : ex:24ml
volume=int(sys.argv[2])
#[f] : ex:3.2ml/min
flowrate = float(sys.argv[3])
warm_up_duration=3
duration = (volume/flowrate)*60 - warm_up_duration
max_fps = 0.7
nb_frame = int(duration/max_fps)
if not os.path.exists(path):
os.makedirs(path)
camera = PiCamera()
camera.resolution = (3280, 2464)
camera.iso = 60
def image(nb_frame, path):
sleep(3)
for frame in range(nb_frame):
time = datetime.now().timestamp()
filename=path+"/"+str(time)+".jpg"
camera.capture(filename)
print(time)
sleep(0.1)
image(nb_frame, path)

View file

@ -1,26 +0,0 @@
#!/usr/bin/env python
#Turn on using this command line :
#python3.7 path/to/file/light.py on
#Turn off using this command line :
#python3.7 path/to/file/light.py off
import RPi.GPIO as GPIO
import sys
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(21,GPIO.OUT)
state = str(sys.argv[1])
def light(state):
if state == "true":
GPIO.output(21,GPIO.HIGH)
if state == "false":
GPIO.output(21,GPIO.LOW)
light(state)

150
scripts/main.py Normal file
View file

@ -0,0 +1,150 @@
# Logger library compatible with multiprocessing
from loguru import logger
import sys
# multiprocessing module
import multiprocessing
# Time module so we can sleep
import time
# signal module is used to manage SINGINT/SIGTERM
import signal
# os module is used create paths
import os
# enqueue=True is necessary so we can log accross modules
# rotation happens everyday at 01:00 if not restarted
logger.add(
# sys.stdout,
"PlanktoScope_{time}.log",
rotation="01:00",
retention="1 month",
compression=".tar.gz",
enqueue=True,
level="INFO",
)
# The available level for the logger are as follows:
# Level name Severity Logger method
# TRACE 5 logger.trace()
# DEBUG 10 logger.debug()
# INFO 20 logger.info()
# SUCCESS 25 logger.success()
# WARNING 30 logger.warning()
# ERROR 40 logger.error()
# CRITICAL 50 logger.critical()
logger.info("Starting the PlanktoScope python script!")
# Library for exchaning messages with Node-RED
import planktoscope.mqtt
# Import the planktonscope stepper module
import planktoscope.stepper
# Import the planktonscope imager module
import planktoscope.imager
# Import the planktonscope segmenter module
import planktoscope.segmenter
# Import the planktonscope LED module
import planktoscope.light
# global variable that keeps the wheels spinning
run = True
def handler_stop_signals(signum, frame):
"""This handler simply stop the forever running loop in __main__"""
global run
run = False
if __name__ == "__main__":
logger.info("Welcome!")
logger.info(
"Initialising signals handling and sanitizing the directories (step 1/4)"
)
signal.signal(signal.SIGINT, handler_stop_signals)
signal.signal(signal.SIGTERM, handler_stop_signals)
# check if gpu_mem configuration is at least 256Meg, otherwise the camera will not run properly
with open("/boot/config.txt", "r") as config_file:
for i, line in enumerate(config_file):
if line.startswith("gpu_mem"):
if int(line.split("=")[1].strip()) < 256:
logger.error(
"The GPU memory size is less than 256, this will prevent the camera from running properly"
)
logger.error(
"Please edit the file /boot/config.txt to change the gpu_mem value to at least 256"
)
logger.error(
"or use raspi-config to change the memory split, in menu 7 Advanced Options, A3 Memory Split"
)
sys.exit(1)
# Let's make sure the used base path exists
img_path = "/home/pi/PlanktonScope/img"
# check if this path exists
if not os.path.exists(img_path):
# create the path!
os.makedirs(img_path)
export_path = "/home/pi/PlanktonScope/export"
# check if this path exists
if not os.path.exists(export_path):
# create the path!
os.makedirs(export_path)
with open("/sys/firmware/devicetree/base/serial-number", "r") as config_file:
logger.info(f"This PlanktoScope unique ID is {config_file.readline()}")
# Prepare the event for a gracefull shutdown
shutdown_event = multiprocessing.Event()
shutdown_event.clear()
# Starts the stepper process for actuators
logger.info("Starting the stepper control process (step 2/4)")
stepper_thread = planktoscope.stepper.StepperProcess(shutdown_event)
stepper_thread.start()
# Starts the imager control process
logger.info("Starting the imager control process (step 3/4)")
imager_thread = planktoscope.imager.ImagerProcess(shutdown_event)
imager_thread.start()
# Starts the segmenter process
logger.info("Starting the segmenter control process (step 4/4)")
segmenter_thread = planktoscope.segmenter.SegmenterProcess(shutdown_event)
segmenter_thread.start()
logger.info("Looks like everything is set up and running, have fun!")
while run:
# TODO look into ways of restarting the dead threads
logger.trace("Running around in circles while waiting for someone to die!")
if not stepper_thread.is_alive():
logger.error("The stepper process died unexpectedly! Oh no!")
break
if not imager_thread.is_alive():
logger.error("The imager process died unexpectedly! Oh no!")
break
if not segmenter_thread.is_alive():
logger.error("The segmenter process died unexpectedly! Oh no!")
break
time.sleep(1)
logger.info("Shutting down the shop")
shutdown_event.set()
stepper_thread.join()
imager_thread.join()
segmenter_thread.join()
stepper_thread.close()
imager_thread.close()
segmenter_thread.close()
logger.info("Bye")

View file

@ -1,389 +0,0 @@
import paho.mqtt.client as mqtt
from picamera import PiCamera
from datetime import datetime, timedelta
from adafruit_motor import stepper
from adafruit_motorkit import MotorKit
from time import sleep
import json
import os
import subprocess
from skimage.util import img_as_ubyte
from morphocut import Call
from morphocut.contrib.ecotaxa import EcotaxaWriter
from morphocut.contrib.zooprocess import CalculateZooProcessFeatures
from morphocut.core import Pipeline
from morphocut.file import Find
from morphocut.image import (
ExtractROI,
FindRegions,
ImageReader,
ImageWriter,
RescaleIntensity,
RGB2Gray,
)
from morphocut.stat import RunningMedian
from morphocut.str import Format
from morphocut.stream import TQDM, Enumerate, FilterVariables
from skimage.feature import canny
from skimage.color import rgb2gray, label2rgb
from skimage.morphology import disk
from skimage.morphology import erosion, dilation, closing
from skimage.measure import label, regionprops
import cv2, shutil
import smbus
#fan
bus = smbus.SMBus(1)
################################################################################
kit = MotorKit()
pump_stepper = kit.stepper1
pump_stepper.release()
focus_stepper = kit.stepper2
focus_stepper.release()
################################################################################
camera = PiCamera()
camera.resolution = (3280, 2464)
camera.iso = 60
camera.shutter_speed = 500
camera.exposure_mode = 'fixedfps'
################################################################################
message = ''
topic = ''
count=''
################################################################################
def on_connect(client, userdata, flags, rc):
print("Connected! - " + str(rc))
client.subscribe("actuator/#")
rgb(0,255,0)
def on_subscribe(client, obj, mid, granted_qos):
print("Subscribed! - "+str(mid)+" "+str(granted_qos))
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
global message
global topic
global count
message=str(msg.payload.decode())
topic=msg.topic.split("/")[1]
count=0
def on_log(client, obj, level, string):
print(string)
def rgb(R,G,B):
bus.write_byte_data(0x0d, 0x00, 0)
bus.write_byte_data(0x0d, 0x01, R)
bus.write_byte_data(0x0d, 0x02, G)
bus.write_byte_data(0x0d, 0x03, B)
bus.write_byte_data(0x0d, 0x00, 1)
bus.write_byte_data(0x0d, 0x01, R)
bus.write_byte_data(0x0d, 0x02, G)
bus.write_byte_data(0x0d, 0x03, B)
bus.write_byte_data(0x0d, 0x00, 2)
bus.write_byte_data(0x0d, 0x01, R)
bus.write_byte_data(0x0d, 0x02, G)
bus.write_byte_data(0x0d, 0x03, B)
cmd="i2cdetect -y 1"
subprocess.Popen(cmd.split(),stdout=subprocess.PIPE)
################################################################################
client = mqtt.Client()
client.connect("127.0.0.1",1883,60)
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message
client.on_log = on_log
client.loop_start()
local_metadata = {
"process_datetime": datetime.now(),
"acq_camera_resolution" : camera.resolution,
"acq_camera_iso" : camera.iso,
"acq_camera_shutter_speed" : camera.shutter_speed
}
config_txt = open('/home/pi/PlanktonScope/config.txt','r')
node_red_metadata = json.loads(config_txt.read())
global_metadata = {**local_metadata, **node_red_metadata}
archive_fn = os.path.join("/home/pi/PlanktonScope/","export", "ecotaxa_export.zip")
# Define processing pipeline
with Pipeline() as p:
# Recursively find .jpg files in import_path.
# Sort to get consective frames.
abs_path = Find("/home/pi/PlanktonScope/tmp", [".jpg"], sort=True, verbose=True)
# Extract name from abs_path
name = Call(lambda p: os.path.splitext(os.path.basename(p))[0], abs_path)
Call(rgb, 0,255,0)
# Read image
img = ImageReader(abs_path)
# Show progress bar for frames
#TQDM(Format("Frame {name}", name=name))
# Apply running median to approximate the background image
flat_field = RunningMedian(img, 5)
# Correct image
img = img / flat_field
# Rescale intensities and convert to uint8 to speed up calculations
img = RescaleIntensity(img, in_range=(0, 1.1), dtype="uint8")
# frame_fn = Format(os.path.join("/home/pi/PlanktonScope/tmp","CLEAN", "{name}.jpg"), name=name)
# ImageWriter(frame_fn, img)
# Convert image to uint8 gray
img_gray = RGB2Gray(img)
# ?
img_gray = Call(img_as_ubyte, img_gray)
#Canny edge detection
img_canny = Call(cv2.Canny, img_gray, 50,100)
#Dilate
kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15))
img_dilate = Call(cv2.dilate, img_canny, kernel, iterations=2)
#Close
kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (5, 5))
img_close = Call(cv2.morphologyEx, img_dilate, cv2.MORPH_CLOSE, kernel, iterations=1)
#Erode
kernel = Call(cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15))
mask = Call(cv2.erode, img_close, kernel, iterations=2)
# Find objects
regionprops = FindRegions(
mask, img_gray, min_area=1000, padding=10, warn_empty=name
)
Call(rgb, 255,0,255)
# For an object, extract a vignette/ROI from the image
roi_orig = ExtractROI(img, regionprops, bg_color=255)
# Generate an object identifier
i = Enumerate()
#Call(print,i)
object_id = Format("{name}_{i:d}", name=name, i=i)
#Call(print,object_id)
object_fn = Format(os.path.join("/home/pi/PlanktonScope/","OBJECTS", "{name}.jpg"), name=object_id)
ImageWriter(object_fn, roi_orig)
# Calculate features. The calculated features are added to the global_metadata.
# Returns a Variable representing a dict for every object in the stream.
meta = CalculateZooProcessFeatures(
regionprops, prefix="object_", meta=global_metadata
)
# json_meta = Call(json.dumps,meta, sort_keys=True, default=str)
# Call(client.publish, "receiver/segmentation/metric", json_meta)
# Add object_id to the metadata dictionary
meta["object_id"] = object_id
# Generate object filenames
orig_fn = Format("{object_id}.jpg", object_id=object_id)
# Write objects to an EcoTaxa archive:
# roi image in original color, roi image in grayscale, metadata associated with each object
EcotaxaWriter(archive_fn, (orig_fn, roi_orig), meta)
# Progress bar for objects
TQDM(Format("Object {object_id}", object_id=object_id))
# Call(client.publish, "receiver/segmentation/object_id", object_id)
camera.start_preview(fullscreen=False, window = (160, 0, 640, 480))
################################################################################
while True:
################################################################################
if (topic=="pump"):
rgb(0,0,255)
direction=message.split(" ")[0]
delay=float(message.split(" ")[1])
nb_step=int(message.split(" ")[2])
client.publish("receiver/pump", "Start");
while True:
if direction == "BACKWARD":
direction=stepper.BACKWARD
if direction == "FORWARD":
direction=stepper.FORWARD
count+=1
# print(count,nb_step)
pump_stepper.onestep(direction=direction, style=stepper.DOUBLE)
sleep(delay)
if topic!="pump":
pump_stepper.release()
print("The pump has been interrompted.")
client.publish("receiver/pump", "Interrompted");
rgb(0,255,0)
break
if count>nb_step:
pump_stepper.release()
print("The pumping is done.")
topic="wait"
client.publish("receiver/pump", "Done");
rgb(0,255,0)
break
################################################################################
elif (topic=="focus"):
rgb(255,255,0)
direction=message.split(" ")[0]
nb_step=int(message.split(" ")[1])
client.publish("receiver/focus", "Start");
while True:
if direction == "FORWARD":
direction=stepper.FORWARD
if direction == "BACKWARD":
direction=stepper.BACKWARD
count+=1
# print(count,nb_step)
focus_stepper.onestep(direction=direction, style=stepper.MICROSTEP)
if topic!="focus":
focus_stepper.release()
print("The stage has been interrompted.")
client.publish("receiver/focus", "Interrompted");
rgb(0,255,0)
break
if count>nb_step:
focus_stepper.release()
print("The focusing is done.")
topic="wait"
client.publish("receiver/focus", "Done");
rgb(0,255,0)
break
################################################################################
elif (topic=="image"):
sleep_before=int(message.split(" ")[0])
nb_step=int(message.split(" ")[1])
path=str(message.split(" ")[2])
nb_frame=int(message.split(" ")[3])
sleep_during=int(message.split(" ")[4])
#sleep a duration before to start
sleep(sleep_before)
client.publish("receiver/image", "Start");
#flushing before to begin
rgb(0,0,255)
for i in range(nb_step):
pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE)
sleep(0.01)
rgb(0,255,0)
while True:
count+=1
# print(count,nb_frame)
filename = os.path.join("/home/pi/PlanktonScope/tmp",datetime.now().strftime("%M_%S_%f")+".jpg")
rgb(0,255,255)
camera.capture(filename)
rgb(0,255,0)
client.publish("receiver/image", datetime.now().strftime("%M_%S_%f")+".jpg has been imaged.");
rgb(0,0,255)
for i in range(10):
pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE)
sleep(0.01)
sleep(0.5)
rgb(0,255,0)
if(count>nb_frame):
# camera.stop_preview()
client.publish("receiver/image", "Completed");
# Meta data that is added to every object
client.publish("receiver/segmentation", "Start");
# Define processing pipeline
p.run()
#remove directory
#shutil.rmtree(import_path)
client.publish("receiver/segmentation", "Completed");
command = os.popen("rm -rf /home/pi/PlanktonScope/tmp/*.jpg")
rgb(255,255,255)
sleep(sleep_during)
rgb(0,255,0)
rgb(0,0,255)
for i in range(nb_step):
pump_stepper.onestep(direction=stepper.FORWARD, style=stepper.DOUBLE)
sleep(0.01)
rgb(0,255,0)
count=0
if topic!="image":
pump_stepper.release()
print("The imaging has been interrompted.")
client.publish("receiver/image", "Interrompted");
rgb(0,255,0)
count=0
break
else:
# print("Waiting")
sleep(1)

View file

View file

@ -0,0 +1,518 @@
################################################################################
# Practical Libraries
################################################################################
# Logger library compatible with multiprocessing
from loguru import logger
# Library to get date and time for folder name and filename
import datetime
# Library to be able to sleep for a given duration
import time
# Libraries manipulate json format, execute bash commands
import json, shutil, os
# Library to control the PiCamera
import picamera
# Library for starting processes
import multiprocessing
# Basic planktoscope libraries
import planktoscope.mqtt
import planktoscope.light
# import planktoscope.streamer
import planktoscope.imager_state_machine
################################################################################
# Streaming PiCamera over server
################################################################################
import io
import socketserver
import http.server
import threading
################################################################################
# Classes for the PiCamera Streaming
################################################################################
class StreamingOutput(object):
def __init__(self):
self.frame = None
self.buffer = io.BytesIO()
self.condition = threading.Condition()
def write(self, buf):
if buf.startswith(b"\xff\xd8"):
# New frame, copy the existing buffer's content and notify all
# clients it's available
self.buffer.truncate()
with self.condition:
self.frame = self.buffer.getvalue()
self.condition.notify_all()
self.buffer.seek(0)
return self.buffer.write(buf)
class StreamingHandler(http.server.BaseHTTPRequestHandler):
# Webpage content containing the PiCamera Streaming
PAGE = """\
<html>
<head>
<title>PlanktonScope v2 | PiCamera Streaming</title>
</head>
<body>
<img src="stream.mjpg" width="100%" height="100%" />
</body>
</html>
"""
@logger.catch
def do_GET(self):
if self.path == "/":
self.send_response(301)
self.send_header("Location", "/index.html")
self.end_headers()
elif self.path == "/index.html":
content = self.PAGE.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == "/stream.mjpg":
self.send_response(200)
self.send_header("Age", 0)
self.send_header("Cache-Control", "no-cache, private")
self.send_header("Pragma", "no-cache")
self.send_header(
"Content-Type", "multipart/x-mixed-replace; boundary=FRAME"
)
self.end_headers()
try:
while True:
with output.condition:
output.condition.wait()
frame = output.frame
self.wfile.write(b"--FRAME\r\n")
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b"\r\n")
except Exception as e:
logger.exception(f"Removed streaming client {self.client_address}")
else:
self.send_error(404)
self.end_headers()
class StreamingServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
output = StreamingOutput()
logger.info("planktoscope.imager is loaded")
################################################################################
# Main Imager class
################################################################################
class ImagerProcess(multiprocessing.Process):
"""This class contains the main definitions for the imager of the PlanktoScope"""
@logger.catch
def __init__(self, event, resolution=(3280, 2464), iso=60, shutter_speed=500):
"""Initialize the Imager class
Args:
event (multiprocessing.Event): shutdown event
resolution (tuple, optional): Camera native resolution. Defaults to (3280, 2464).
iso (int, optional): ISO sensitivity. Defaults to 60.
shutter_speed (int, optional): Shutter speed of the camera. Defaults to 500.
"""
super(ImagerProcess, self).__init__(name="imager")
logger.info("planktoscope.imager is initialising")
self.stop_event = event
self.__imager = planktoscope.imager_state_machine.Imager()
self.__img_goal = 0
self.__img_done = 0
self.__sleep_before = None
self.__pump_volume = None
self.__img_goal = None
self.imager_client = None
self.__camera = None
self.__resolution = resolution
self.__iso = iso
self.__shutter_speed = shutter_speed
self.__exposure_mode = "fixedfps"
self.__base_path = "/home/pi/PlanktonScope/img"
self.__export_path = ""
self.__global_metadata = None
# TODO implement a way to receive directly the metadata from Node-Red via MQTT
# FIXME We should save the metadata to a file in the folder too
# TODO create a directory structure per day/per imaging session
logger.info("planktoscope.imager is initialised and ready to go!")
@logger.catch
def start_camera(self):
"""Start the camera streaming process"""
self.__camera.start_recording(output, format="mjpeg", resize=(640, 480))
def pump_callback(self, client, userdata, msg):
# Print the topic and the message
logger.info(f"{self.name}: {msg.topic} {str(msg.qos)} {str(msg.payload)}")
if msg.topic != "status/pump":
logger.error(
f"The received message has the wrong topic {msg.topic}, payload was {str(msg.payload)}"
)
return
payload = json.loads(msg.payload.decode())
logger.debug(f"parsed payload is {payload}")
if self.__imager.state.name is "waiting":
if payload["status"] == "Done":
self.__imager.change(planktoscope.imager_state_machine.Capture)
self.imager_client.client.message_callback_remove("status/pump")
self.imager_client.client.unsubscribe("status/pump")
else:
logger.info(f"the pump is not done yet {payload}")
else:
logger.error(
"There is an error, status is not waiting for the pump and yet we received a pump message"
)
@logger.catch
def treat_message(self):
action = ""
if self.imager_client.new_message_received():
logger.info("We received a new message")
last_message = self.imager_client.msg["payload"]
logger.debug(last_message)
action = self.imager_client.msg["payload"]["action"]
logger.debug(action)
self.imager_client.read_message()
# If the command is "image"
if action == "image":
# {"action":"image","sleep":5,"volume":1,"nb_frame":200}
if (
"sleep" not in last_message
or "volume" not in last_message
or "nb_frame" not in last_message
):
logger.error(
f"The received message has the wrong argument {last_message}"
)
self.imager_client.client.publish("status/imager", '{"status":"Error"}')
return
# Change the state of the machine
self.__imager.change(planktoscope.imager_state_machine.Imaging)
# Get duration to wait before an image from the different received arguments
self.__sleep_before = float(last_message["sleep"])
# Get volume in between two images from the different received arguments
self.__pump_volume = float(last_message["volume"])
# Get the number of frames to image from the different received arguments
self.__img_goal = int(last_message["nb_frame"])
self.imager_client.client.publish("status/imager", '{"status":"Started"}')
elif action == "stop":
# Remove callback for "status/pump" and unsubscribe
self.imager_client.client.message_callback_remove("status/pump")
self.imager_client.client.unsubscribe("status/pump")
# Stops the pump
self.imager_client.client.publish("actuator/pump", '{"action": "stop"}')
logger.info("The imaging has been interrupted.")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.imager_client.client.publish(
"status/imager", '{"status":"Interrupted"}'
)
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
# Change state to Stop
self.__imager.change(planktoscope.imager_state_machine.Stop)
elif action == "update_config":
if self.__imager.state.name is "stop":
if "config" not in last_message:
logger.error(
f"The received message has the wrong argument {last_message}"
)
self.imager_client.client.publish(
"status/imager", '{"status":"Configuration message error"}'
)
return
logger.info("Updating the configuration now with the received data")
# Updating the configuration with the passed parameter in payload["config"]
nodered_metadata = last_message["config"]
# Definition of the few important metadata
local_metadata = {
"process_datetime": datetime.datetime.now().isoformat(),
"acq_camera_resolution": self.__resolution,
"acq_camera_iso": self.__iso,
"acq_camera_shutter_speed": self.__shutter_speed,
}
# Concat the local metadata and the metadata from Node-RED
self.__global_metadata = {**local_metadata, **nodered_metadata}
# Publish the status "Config updated" to via MQTT to Node-RED
self.imager_client.client.publish(
"status/imager", '{"status":"Config updated"}'
)
logger.info("Configuration has been updated")
else:
logger.error("We can't update the configuration while we are imaging.")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.imager_client.client.publish("status/imager", '{"status":"Busy"}')
pass
elif action == "settings":
if self.__imager.state.name is "stop":
if "settings" not in last_message:
logger.error(
f"The received message has the wrong argument {last_message}"
)
self.imager_client.client.publish(
"status/imager", '{"status":"Camera settings error"}'
)
return
logger.info("Updating the camera settings now with the received data")
# Updating the configuration with the passed parameter in payload["config"]
settings = last_message["settings"]
if "resolution" in settings:
self.__resolution = settings.get("resolution", self.__resolution)
logger.debug(
f"Updating the camera resolution to {self.__resolution}"
)
self.__camera.resolution = self.__resolution
if "iso" in settings:
self.__iso = settings.get("iso", self.__iso)
logger.debug(f"Updating the camera iso to {self.__iso}")
self.__camera.iso = self.__iso
if "shutter_speed" in settings:
self.__shutter_speed = settings.get(
"shutter_speed", self.__shutter_speed
)
logger.debug(
f"Updating the camera shutter speed to {self.__shutter_speed}"
)
self.__camera.shutter_speed = self.__shutter_speed
# Publish the status "Config updated" to via MQTT to Node-RED
self.imager_client.client.publish(
"status/imager", '{"status":"Camera settings updated"}'
)
logger.info("Camera settings have been updated")
else:
logger.error(
"We can't update the camera settings while we are imaging."
)
# Publish the status "Interrupted" to via MQTT to Node-RED
self.imager_client.client.publish("status/imager", '{"status":"Busy"}')
pass
elif action != "":
logger.warning(
f"We did not understand the received request {action} - {last_message}"
)
@logger.catch
def state_machine(self):
if self.__imager.state.name is "imaging":
# subscribe to status/pump
self.imager_client.client.subscribe("status/pump")
self.imager_client.client.message_callback_add(
"status/pump", self.pump_callback
)
logger.info("Setting up the directory structure for storing the pictures")
self.__export_path = os.path.join(
self.__base_path,
# We only keep the date '2020-09-25T15:25:21.079769'
self.__global_metadata["process_datetime"].split("T")[0],
str(self.__global_metadata["sample_id"]),
)
if not os.path.exists(self.__export_path):
# create the path!
os.makedirs(self.__export_path)
# Export the metadata to a json file
logger.info("Exporting the metadata to a metadata.json")
config_path = os.path.join(self.__export_path, "metadata.json")
with open(config_path, "w") as metadata_file:
json.dump(self.__global_metadata, metadata_file)
logger.debug(
f"Metadata dumped in {metadata_file} are {self.__global_metadata}"
)
# Sleep a duration before to start acquisition
time.sleep(self.__sleep_before)
# Set the LEDs as Blue
planktoscope.light.setRGB(0, 0, 255)
self.imager_client.client.publish(
"actuator/pump",
json.dumps(
{
"action": "move",
"direction": "BACKWARD",
"volume": self.__pump_volume,
"flowrate": 2,
}
),
)
# FIXME We should probably update the global metadata here with the current datetime/position/etc...
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
# Change state towards Waiting for pump
self.__imager.change(planktoscope.imager_state_machine.Waiting)
return
elif self.__imager.state.name is "capture":
# Set the LEDs as Cyan
planktoscope.light.setRGB(0, 255, 255)
filename = f"{datetime.datetime.now().strftime('%H_%M_%S_%f')}.jpg"
# Define the filename of the image
filename_path = os.path.join(self.__export_path, filename)
logger.info(f"Capturing an image to {filename_path}")
# Capture an image with the proper filename
self.__camera.capture(filename_path)
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
# Publish the name of the image to via MQTT to Node-RED
self.imager_client.client.publish(
"status/imager",
f'{{"status":"{filename} has been imaged."}}',
)
# Increment the counter
self.__img_done += 1
# If counter reach the number of frame, break
if self.__img_done >= self.__img_goal:
# Reset the counter to 0
self.__img_done = 0
# Publish the status "Done" to via MQTT to Node-RED
self.imager_client.client.publish("status/imager", '{"status":"Done"}')
# Change state towards done
self.__imager.change(planktoscope.imager_state_machine.Stop)
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 255)
return
else:
# We have not reached the final stage, let's keep imaging
# Set the LEDs as Blue
planktoscope.light.setRGB(0, 0, 255)
# subscribe to status/pump
self.imager_client.client.subscribe("status/pump")
self.imager_client.client.message_callback_add(
"status/pump", self.pump_callback
)
# Pump during a given volume
self.imager_client.client.publish(
"actuator/pump",
json.dumps(
{
"action": "move",
"direction": "BACKWARD",
"volume": self.__pump_volume,
"flowrate": 2,
}
),
)
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
# Change state towards Waiting for pump
self.__imager.change(planktoscope.imager_state_machine.Waiting)
return
elif self.__imager.state.name is "waiting":
return
elif self.__imager.state.name is "stop":
return
################################################################################
# While loop for capturing commands from Node-RED
################################################################################
@logger.catch
def run(self):
"""This is the function that needs to be started to create a thread"""
logger.info(
f"The imager control thread has been started in process {os.getpid()}"
)
# MQTT Service connection
self.imager_client = planktoscope.mqtt.MQTT_Client(
topic="imager/#", name="imager_client"
)
# PiCamera settings
self.__camera = picamera.PiCamera(resolution=self.__resolution)
self.__camera.iso = self.__iso
self.__camera.shutter_speed = self.__shutter_speed
self.__camera.exposure_mode = self.__exposure_mode
address = ("", 8000)
server = StreamingServer(address, StreamingHandler)
# Starts the streaming server process
logger.info("Starting the streaming server thread")
self.start_camera()
self.streaming_thread = threading.Thread(
target=server.serve_forever, daemon=True
)
self.streaming_thread.start()
# Publish the status "Ready" to via MQTT to Node-RED
self.imager_client.client.publish("status/imager", '{"status":"Ready"}')
logger.info("Let's rock and roll!")
# This is the loop
while not self.stop_event.is_set():
self.treat_message()
self.state_machine()
time.sleep(0.001)
logger.info("Shutting down the imager process")
self.imager_client.client.publish("status/imager", '{"status":"Dead"}')
logger.debug("Stopping the camera")
self.__camera.stop_recording()
self.__camera.close()
logger.debug("Stopping the streaming thread")
server.shutdown()
self.imager_client.shutdown()
# self.streaming_thread.kill()
logger.info("Imager process shut down! See you!")

View file

@ -0,0 +1,71 @@
# Logger library compatible with multiprocessing
from loguru import logger
# TODO rewrite this in PlantUML
# This works with https://www.diagram.codes/d/state-machine
# "wait for pump" as pump
# "start imager" as imager
# "capture image" as capture
#
# START->imager["init"]
# imager->pump["start pumping"]
# pump->stop["stop"]
# stop->imager["start"]
# pump->capture["pumping is done"]
# capture->pump["start pump"]
# capture->stop["stop or done"]
# State machine class
class ImagerState(object):
name = "state"
allowed = []
def switch(self, state):
""" Switch to new state """
if state.name in self.allowed:
logger.info(f"Current:{self} => switched to new state {state.name}")
self.__class__ = state
else:
logger.error(f"Current:{self} => switching to {state.name} not possible.")
def __str__(self):
return self.name
class Stop(ImagerState):
name = "stop"
allowed = ["imaging"]
class Imaging(ImagerState):
""" State of getting ready to start """
name = "imaging"
allowed = ["waiting"]
class Waiting(ImagerState):
""" State of waiting for the pump to finish """
name = "waiting"
allowed = ["stop", "capture"]
class Capture(ImagerState):
""" State of capturing image """
name = "capture"
allowed = ["stop", "waiting"]
class Imager(object):
""" A class representing the imager """
def __init__(self):
# State of the imager - default is stop.
self.state = Stop()
def change(self, state):
""" Change state """
self.state.switch(state)

View file

@ -0,0 +1,102 @@
#!/usr/bin/env python
# Turn on using this command line :
# python3.7 path/to/file/light.py on
# Turn off using this command line :
# python3.7 path/to/file/light.py off
# Library to send command over I2C for the light module on the fan
import smbus
import RPi.GPIO
import subprocess
# define the bus used to actuate the light module on the fan
bus = smbus.SMBus(1)
DEVICE_ADDRESS = 0x0D
rgb_effect_reg = 0x04
rgb_speed_reg = 0x05
rgb_color_reg = 0x06
rgb_off_reg = 0x07
################################################################################
# LEDs functions
################################################################################
def setRGB(R, G, B):
"""Update all LED at the same time"""
bus.write_byte_data(DEVICE_ADDRESS, 0x00, 0xFF)
bus.write_byte_data(DEVICE_ADDRESS, 0x01, R & 0xFF)
bus.write_byte_data(DEVICE_ADDRESS, 0x02, G & 0xFF)
bus.write_byte_data(DEVICE_ADDRESS, 0x03, B & 0xFF)
# Update the I2C Bus in order to really update the LEDs new values
cmd = "i2cdetect -y 1"
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
def setRGBOff():
"""Turn off the RGB LED"""
bus.write_byte_data(DEVICE_ADDRESS, 0x07, 0x00)
# Update the I2C Bus in order to really update the LEDs new values
cmd = "i2cdetect -y 1"
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
def setRGBEffect(effect):
"""Choose an effect, 0-4
0: Water light
1: Breathing light
2: Marquee
3: Rainbow lights
4: Colorful lights
"""
if effect >= 0 and effect <= 4:
bus.write_byte_data(DEVICE_ADDRESS, rgb_effect_reg, effect & 0xFF)
def setRGBSpeed(speed):
"""Set the effect speed, 1-3, 3 being the fastest speed"""
if speed >= 1 and speed <= 3:
bus.write_byte_data(DEVICE_ADDRESS, rgb_speed_reg, speed & 0xFF)
def setRGBColor(color):
"""Set the color of the water light and breathing light effect, 0-6
0: Red
1: Green (default)
2: Blue
3: Yellow
4: Purple
5: Cyan
6: White
"""
if color >= 0 and color <= 6:
bus.write_byte_data(DEVICE_ADDRESS, rgb_color_reg, color & 0xFF)
def light(state):
"""Turn the LED on or off"""
if state == "on":
RPi.GPIO.output(21, RPi.GPIO.HIGH)
if state == "off":
RPi.GPIO.output(21, RPi.GPIO.LOW)
# This is called if this script is launched directly
if __name__ == "__main__":
import RPi.GPIO as GPIO
import sys
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(21, GPIO.OUT)
state = str(sys.argv[1])
light(state)

View file

@ -0,0 +1,167 @@
# Library for exchaning messages with Node-RED
# We are using MQTT V3.1.1
# The documentation for Paho can be found here:
# https://www.eclipse.org/paho/clients/python/docs/
# MQTT Topics follows this architecture:
# - actuator : This topic adresses the stepper control thread
# No publication under this topic should happen from Python
# - actuator/pump : Control of the pump
# The message is a json object
# {"action":"move", "direction":"FORWARD", "volume":10, "flowrate":1}
# to move 10mL forward at 1mL/min
# action can be "move" or "stop"
# Receive only
# - actuator/focus : Control of the focus stage
# The message is a json object, speed is optional
# {"action":"move", "direction":"UP", "distance":0.26, "speed":1}
# to move up 10mm
# action can be "move" or "stop"
# Receive only
# - imager/image : This topic adresses the imaging thread
# Is a json object with
# {"action":"image","sleep":5,"volume":1,"nb_frame":200}
# sleep in seconds, volume in mL
# Receive only
# - segmenter/segment : This topic adresses the segmenter process
# Is a json object with
# {"action":"segment"}
# Receive only
# - status : This topics sends feedback to Node-Red
# No publication or receive at this level
# - status/pump : State of the pump
# Is a json object with
# {"status":"Start", "time_left":25}
# Status is one of Started, Ready, Done, Interrupted
# Publish only
# - status/focus : State of the focus stage
# Is a json object with
# {"status":"Start", "time_left":25}
# Status is one of Started, Ready, Done, Interrupted
# Publish only
# - status/imager : State of the imager
# Is a json object with
# {"status":"Start", "time_left":25}
# Status is one of Started, Ready, Completed or 12_11_15_0.1.jpg has been imaged.
# Publish only
# - status/segmenter : Status of the segmentation
# - status/segmenter/name
# - status/segmenter/object_id
# - status/segmenter/metric
# TODO Evaluate the opportunity of saving the last x received messages in a queue for treatment
# We can use collections.deque https://docs.python.org/3/library/collections.html#collections.deque
import paho.mqtt.client as mqtt
import json
import planktoscope.light
# Logger library compatible with multiprocessing
from loguru import logger
logger.info("planktoscope.mqtt is loaded")
class MQTT_Client:
"""A client for MQTT
Do not forget to include the wildcards in the topic
when creating this object
"""
def __init__(self, topic, server="127.0.0.1", port=1883, name="client"):
# Declare the global variables command and args
self.command = ""
self.args = ""
self.__new_message = False
self.msg = None
# MQTT Client functions definition
self.client = mqtt.Client()
# self.client.enable_logger(logger)
self.topic = topic
self.server = server
self.port = port
self.name = name
self.connect()
@logger.catch
def connect(self):
logger.info(f"trying to connect to {self.server}:{self.port}")
self.client.connect(self.server, self.port, 60)
self.client.on_connect = self.on_connect
self.client.on_subscribe = self.on_subscribe
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.loop_start()
################################################################################
# MQTT core functions
################################################################################
@logger.catch
# Run this function in order to connect to the client (Node-RED)
def on_connect(self, client, userdata, flags, rc):
reason = [
"0: Connection successful",
"1: Connection refused - incorrect protocol version",
"2: Connection refused - invalid client identifier",
"3: Connection refused - server unavailable",
"4: Connection refused - bad username or password",
"5: Connection refused - not authorised",
]
# Print when connected
logger.success(
f"{self.name} connected to {self.server}:{self.port}! - {reason[rc]}"
)
# When connected, run subscribe()
self.client.subscribe(self.topic)
# Turn green the light module
planktoscope.light.setRGB(0, 255, 0)
@logger.catch
# Run this function in order to subscribe to all the topics begining by actuator
def on_subscribe(self, client, obj, mid, granted_qos):
# Print when subscribed
logger.success(
f"{self.name} subscribed to {self.topic}! - mid:{str(mid)} qos:{str(granted_qos)}"
)
# Run this command when Node-RED is sending a message on the subscribed topic
@logger.catch
def on_message(self, client, userdata, msg):
# Print the topic and the message
logger.info(f"{self.name}: {msg.topic} {str(msg.qos)} {str(msg.payload)}")
# Parse the topic to find the command. ex : actuator/pump -> pump
# This only removes the top-level topic!
self.command = msg.topic.split("/", 1)[1]
logger.debug(f"command is {self.command}")
# Decode the message to find the arguments
self.args = json.loads(msg.payload.decode())
logger.debug(f"args are {self.args}")
self.msg = {"topic": msg.topic, "payload": self.args}
logger.debug(f"msg is {self.msg} or {msg}")
self.__new_message = True
@logger.catch
def on_disconnect(self, client, userdata, rc):
if rc != 0:
logger.error(
f"Connection to the MQTT server is unexpectedly lost by {self.name}"
)
else:
logger.warning(f"Connection to the MQTT server is closed by {self.name}")
# TODO for now, we just log the disconnection, we need to evaluate what to do
# in case of communication loss with the server
def new_message_received(self):
return self.__new_message
def read_message(self):
logger.debug(f"clearing the __new_message flag")
self.__new_message = False
@logger.catch
def shutdown(self, topic="", message=""):
logger.info(f"Shutting down mqtt client {self.name}")
self.client.loop_stop()
logger.debug(f"Mqtt client {self.name} shut down")

View file

@ -0,0 +1,338 @@
################################################################################
# Practical Libraries
################################################################################
# Logger library compatible with multiprocessing
from loguru import logger
# Library to get date and time for folder name and filename
import datetime
# Library to be able to sleep for a given duration
import time
# Libraries manipulate json format, execute bash commands
import json, shutil, os
# Library for starting processes
import multiprocessing
# Basic planktoscope libraries
import planktoscope.mqtt
import planktoscope.light
################################################################################
# Morphocut Libraries
################################################################################
import morphocut
import morphocut.file
import morphocut.image
import morphocut.stat
import morphocut.stream
import morphocut.str
import morphocut.contrib.ecotaxa
import morphocut.contrib.zooprocess
################################################################################
# Other image processing Libraries
################################################################################
import skimage.util
import cv2
logger.info("planktoscope.segmenter is loaded")
################################################################################
# Main Segmenter class
################################################################################
class SegmenterProcess(multiprocessing.Process):
"""This class contains the main definitions for the segmenter of the PlanktoScope"""
@logger.catch
def __init__(self, event):
"""Initialize the Segmenter class
Args:
event (multiprocessing.Event): shutdown event
"""
super(SegmenterProcess, self).__init__(name="segmenter")
logger.info("planktoscope.segmenter is initialising")
self.stop_event = event
self.__pipe = None
self.segmenter_client = None
self.__img_path = "/home/pi/PlanktonScope/img"
self.__export_path = "/home/pi/PlanktonScope/export"
self.__ecotaxa_path = os.path.join(self.__export_path, "ecotaxa")
self.__global_metadata = None
self.__working_path = ""
self.__archive_fn = ""
if not os.path.exists(self.__ecotaxa_path):
# create the path!
os.makedirs(self.__ecotaxa_path)
# Morphocut's pipeline will be created at runtime otherwise shit ensues
logger.info("planktoscope.segmenter is initialised and ready to go!")
def __create_morphocut_pipeline(self):
"""Creates the Morphocut Pipeline"""
logger.debug("Let's start creating the Morphocut Pipeline")
with morphocut.Pipeline() as self.__pipe:
# TODO wrap morphocut.Call(logger.debug()) in something that allows it not to be added to the pipeline
# if the logger.level is not debug. Might not be as easy as it sounds.
# Recursively find .jpg files in import_path.
# Sort to get consective frames.
abs_path = morphocut.file.Find(
self.__working_path, [".jpg"], sort=True, verbose=True
)
# Extract name from abs_path
name = morphocut.Call(
lambda p: os.path.splitext(os.path.basename(p))[0], abs_path
)
# Set the LEDs as Green
morphocut.Call(planktoscope.light.setRGB, 0, 255, 0)
# Read image
img = morphocut.image.ImageReader(abs_path)
# Show progress bar for frames
morphocut.stream.TQDM(morphocut.str.Format("Frame {name}", name=name))
# Apply running median to approximate the background image
flat_field = morphocut.stat.RunningMedian(img, 5)
# Correct image
img = img / flat_field
# Rescale intensities and convert to uint8 to speed up calculations
img = morphocut.image.RescaleIntensity(
img, in_range=(0, 1.1), dtype="uint8"
)
# Filter variable to reduce memory load
morphocut.stream.FilterVariables(name, img)
# Save cleaned images
# frame_fn = morphocut.str.Format(os.path.join("/home/pi/PlanktonScope/tmp","CLEAN", "{name}.jpg"), name=name)
# morphocut.image.ImageWriter(frame_fn, img)
# Convert image to uint8 gray
img_gray = morphocut.image.RGB2Gray(img)
# ?
img_gray = morphocut.Call(skimage.util.img_as_ubyte, img_gray)
# Canny edge detection using OpenCV
img_canny = morphocut.Call(cv2.Canny, img_gray, 50, 100)
# Dilate using OpenCV
kernel = morphocut.Call(
cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15)
)
img_dilate = morphocut.Call(cv2.dilate, img_canny, kernel, iterations=2)
# Close using OpenCV
kernel = morphocut.Call(
cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (5, 5)
)
img_close = morphocut.Call(
cv2.morphologyEx, img_dilate, cv2.MORPH_CLOSE, kernel, iterations=1
)
# Erode using OpenCV
kernel = morphocut.Call(
cv2.getStructuringElement, cv2.MORPH_ELLIPSE, (15, 15)
)
mask = morphocut.Call(cv2.erode, img_close, kernel, iterations=2)
# Find objects
regionprops = morphocut.image.FindRegions(
mask, img_gray, min_area=1000, padding=10, warn_empty=name
)
# Set the LEDs as Purple
morphocut.Call(planktoscope.light.setRGB, 255, 0, 255)
# For an object, extract a vignette/ROI from the image
roi_orig = morphocut.image.ExtractROI(img, regionprops, bg_color=255)
# Generate an object identifier
i = morphocut.stream.Enumerate()
# morphocut.Call(print,i)
# Define the ID of each object
object_id = morphocut.str.Format("{name}_{i:d}", name=name, i=i)
# morphocut.Call(print,object_id)
# Define the name of each object
object_fn = morphocut.str.Format(
os.path.join("/home/pi/PlanktonScope/", "OBJECTS", "{name}.jpg"),
name=object_id,
)
# Save the image of the object with its name
morphocut.image.ImageWriter(object_fn, roi_orig)
# Calculate features. The calculated features are added to the global_metadata.
# Returns a Variable representing a dict for every object in the stream.
meta = morphocut.contrib.zooprocess.CalculateZooProcessFeatures(
regionprops, prefix="object_", meta=self.__global_metadata
)
# Get all the metadata
json_meta = morphocut.Call(json.dumps, meta, sort_keys=True, default=str)
# Publish the json containing all the metadata to via MQTT to Node-RED
morphocut.Call(
self.segmenter_client.client.publish,
"status/segmentater/metric",
json_meta,
)
# Add object_id to the metadata dictionary
meta["object_id"] = object_id
# Generate object filenames
orig_fn = morphocut.str.Format("{object_id}.jpg", object_id=object_id)
# Write objects to an EcoTaxa archive:
# roi image in original color, roi image in grayscale, metadata associated with each object
morphocut.contrib.ecotaxa.EcotaxaWriter(
self.__archive_fn, (orig_fn, roi_orig), meta
)
# Progress bar for objects
morphocut.stream.TQDM(
morphocut.str.Format("Object {object_id}", object_id=object_id)
)
# Publish the object_id to via MQTT to Node-RED
morphocut.Call(
self.segmenter_client.client.publish,
"status/segmentater/object_id",
f'{{"object_id":"{object_id}"}}',
)
# Set the LEDs as Green
morphocut.Call(planktoscope.light.setRGB, 0, 255, 0)
logger.info("Morphocut's Pipeline has been created")
@logger.catch
def treat_message(self):
action = ""
if self.segmenter_client.new_message_received():
logger.info("We received a new message")
last_message = self.segmenter_client.msg["payload"]
logger.debug(last_message)
action = self.segmenter_client.msg["payload"]["action"]
self.segmenter_client.read_message()
# If the command is "segment"
if action == "segment":
# {"action":"segment"}
# Publish the status "Started" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Started"}'
)
img_paths = [x[0] for x in os.walk(self.__img_path)]
logger.info(f"The pipeline will be run in {len(img_paths)} directories")
for path in img_paths:
logger.info(f"Loading the metadata file for {path}")
with open(os.path.join(path, "metadata.json"), "r") as config_file:
self.__global_metadata = json.load(config_file)
logger.debug(f"Configuration loaded is {self.__global_metadata}")
# Define the name of the .zip file that will contain the images and the .tsv table for EcoTaxa
self.__archive_fn = os.path.join(
self.__ecotaxa_path,
# filename includes project name, timestamp and sample id
f"export_{self.__global_metadata['sample_project']}_{self.__global_metadata['process_datetime']}_{self.__global_metadata['sample_id']}.zip",
)
logger.info(f"Starting the pipeline in {path}")
# Start the MorphoCut Pipeline on the found path
self.__working_path = path
try:
self.__pipe.run()
except Exception as e:
logger.exception(f"There was an error in the pipeline {e}")
logger.info(f"Pipeline has been run for {path}")
# remove directory
# shutil.rmtree(import_path)
# Publish the status "Done" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Done"}'
)
# Set the LEDs as White
planktoscope.light.setRGB(255, 255, 255)
# cmd = os.popen("rm -rf /home/pi/PlanktonScope/tmp/*.jpg")
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
elif action == "stop":
logger.info("The segmentation has been interrupted.")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Interrupted"}'
)
elif action == "update_config":
logger.error("We can't update the configuration while we are segmenting.")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.segmenter_client.client.publish(
"status/segmenter", '{"status":"Busy"}'
)
pass
elif action != "":
logger.warning(
f"We did not understand the received request {action} - {last_message}"
)
################################################################################
# While loop for capturing commands from Node-RED
################################################################################
@logger.catch
def run(self):
"""This is the function that needs to be started to create a thread"""
logger.info(
f"The segmenter control thread has been started in process {os.getpid()}"
)
# MQTT Service connection
self.segmenter_client = planktoscope.mqtt.MQTT_Client(
topic="segmenter/#", name="segmenter_client"
)
# Instantiate the morphocut pipeline
self.__create_morphocut_pipeline()
# Publish the status "Ready" to via MQTT to Node-RED
self.segmenter_client.client.publish("status/segmenter", '{"status":"Ready"}')
logger.info("Ready to roll!")
# This is the loop
while not self.stop_event.is_set():
self.treat_message()
time.sleep(0.5)
logger.info("Shutting down the segmenter process")
self.segmenter_client.client.publish("status/segmenter", '{"status":"Dead"}')
self.segmenter_client.shutdown()
logger.info("Segmenter process shut down! See you!")

View file

@ -0,0 +1,440 @@
# Libraries to control the steppers for focusing and pumping
import adafruit_motor
import adafruit_motorkit
import time
import json
import os
import planktoscope.mqtt
import planktoscope.light
import multiprocessing
# Logger library compatible with multiprocessing
from loguru import logger
logger.info("planktoscope.stepper is loaded")
class stepper:
def __init__(self, stepper, style, size=0):
"""Initialize the stepper class
Args:
stepper (adafruit_motorkit.Motorkit().stepper): reference to the object that controls the stepper
style (adafruit_motor.stepper): style of the movement SINGLE, DOUBLE, MICROSTEP
size (int): maximum number of steps of this stepper (aka stage size). Can be 0 if not applicable
"""
self.__stepper = stepper
self.__style = style
self.__size = size
self.__position = 0
self.__goal = 0
self.__direction = ""
self.__next_step_date = time.monotonic()
self.__delay = 0
# Make sure the stepper is released and do not use any power
self.__stepper.release()
def step_waiting(self):
"""Is there a step waiting to be actuated
Returns:
Bool: if time has come to push the step
"""
return time.monotonic() > self.__next_step_date
def at_goal(self):
"""Is the motor at its goal
Returns:
Bool: True if position and goal are identical
"""
return self.__position == self.__goal
def next_step_date(self):
"""set the next step date"""
self.__next_step_date = self.__next_step_date + self.__delay
def initial_step_date(self):
"""set the initial step date"""
self.__next_step_date = time.monotonic() + self.__delay
def move(self):
"""move the stepper
Returns:
Bool: True if the step done was the last one of the movement
"""
if self.step_waiting() and not self.at_goal():
self.__stepper.onestep(
direction=self.__direction,
style=self.__style,
)
if self.__direction == adafruit_motor.stepper.FORWARD:
self.__position += 1
elif self.__direction == adafruit_motor.stepper.BACKWARD:
self.__position -= 1
if not self.at_goal():
self.next_step_date()
else:
logger.success("The stepper has reached its goal")
self.__stepper.release()
return True
return False
def go(self, direction, distance, delay):
"""move in the given direction for the given distance
Args:
direction (adafruit_motor.stepper): gives the movement direction
distance
delay
"""
self.__delay = delay
self.__direction = direction
if self.__direction == adafruit_motor.stepper.FORWARD:
self.__goal = self.__position + distance
elif self.__direction == adafruit_motor.stepper.BACKWARD:
self.__goal = self.__position - distance
else:
logger.error(f"The given direction is wrong {direction}")
self.initial_step_date()
def shutdown(self):
"""Shutdown everything ASAP"""
self.__goal = self.__position
self.__stepper.release()
def release(self):
self.__stepper.release()
class StepperProcess(multiprocessing.Process):
focus_steps_per_mm = 40
# 507 steps per ml for Planktonscope standard
# 5200 for custom NEMA14 pump with 0.8mm ID Tube
pump_steps_per_ml = 507
# focus max speed is in mm/sec and is limited by the maximum number of pulses per second the Planktonscope can send
focus_max_speed = 0.5
# pump max speed is in ml/min
pump_max_speed = 30
def __init__(self, event):
super(StepperProcess, self).__init__()
self.stop_event = event
if os.path.exists("/home/pi/PlanktonScope/hardware.json"):
# load hardware.json
with open("/home/pi/PlanktonScope/hardware.json", "r") as config_file:
configuration = json.load(config_file)
logger.debug(f"Hardware configuration loaded is {configuration}")
else:
logger.info(
"The hardware configuration file doesn't exists, using defaults"
)
configuration = dict()
reverse = False
# parse the config data. If the key is absent, we are using the default value
reverse = configuration.get("stepper_reverse", reverse)
self.focus_steps_per_mm = configuration.get(
"focus_steps_per_mm", self.focus_steps_per_mm
)
self.pump_steps_per_ml = configuration.get(
"pump_steps_per_ml", self.pump_steps_per_ml
)
self.focus_max_speed = configuration.get(
"focus_max_speed", self.focus_max_speed
)
self.pump_max_speed = configuration.get("pump_max_speed", self.pump_max_speed)
# define the names for the 2 exsting steppers
kit = adafruit_motorkit.MotorKit()
if reverse:
self.pump_stepper = stepper(kit.stepper2, adafruit_motor.stepper.DOUBLE)
self.focus_stepper = stepper(
kit.stepper1, adafruit_motor.stepper.MICROSTEP, 45
)
else:
self.pump_stepper = stepper(kit.stepper1, adafruit_motor.stepper.DOUBLE)
self.focus_stepper = stepper(
kit.stepper2, adafruit_motor.stepper.MICROSTEP, 45
)
logger.debug(f"Stepper initialisation is over")
def treat_command(self):
command = ""
if self.actuator_client.new_message_received():
logger.info("We received a new message")
last_message = self.actuator_client.msg["payload"]
logger.debug(last_message)
command = self.actuator_client.msg["topic"].split("/", 1)[1]
logger.debug(command)
self.actuator_client.read_message()
# If the command is "pump"
if command == "pump":
logger.debug("We have received a pumping command")
if last_message["action"] == "stop":
logger.debug("We have received a stop pump command")
self.pump_stepper.shutdown()
# Print status
logger.info("The pump has been interrupted")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.actuator_client.client.publish(
"status/pump", '{"status":"Interrupted"}'
)
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
elif last_message["action"] == "move":
logger.debug("We have received a move pump command")
# Set the LEDs as Blue
planktoscope.light.setRGB(0, 0, 255)
if (
"direction" not in last_message
or "volume" not in last_message
or "flowrate" not in last_message
):
logger.error(
f"The received message has the wrong argument {last_message}"
)
self.actuator_client.client.publish(
"status/pump", '{"status":"Error"}'
)
return
# Get direction from the different received arguments
direction = last_message["direction"]
# Get delay (in between steps) from the different received arguments
volume = float(last_message["volume"])
# Get number of steps from the different received arguments
flowrate = float(last_message["flowrate"])
# Print status
logger.info("The pump is started.")
self.pump(direction, volume, flowrate)
else:
logger.warning(
f"The received message was not understood {last_message}"
)
# If the command is "focus"
elif command == "focus":
logger.debug("We have received a focusing request")
# If a new received command is "focus" but args contains "stop" we stop!
if last_message["action"] == "stop":
logger.debug("We have received a stop focus command")
self.focus_stepper.shutdown()
# Print status
logger.info("The focus has been interrupted")
# Publish the status "Interrupted" to via MQTT to Node-RED
self.actuator_client.client.publish(
"status/focus", '{"status":"Interrupted"}'
)
# Set the LEDs as Green
planktoscope.light.setRGB(0, 255, 0)
elif last_message["action"] == "move":
logger.debug("We have received a move focus command")
# Set the LEDs as Yellow
planktoscope.light.setRGB(255, 255, 0)
if (
"direction" not in last_message
or "distance" not in last_message
):
logger.error(
f"The received message has the wrong argument {last_message}"
)
self.actuator_client.client.publish(
"status/focus", '{"status":"Error"}'
)
# Get direction from the different received arguments
direction = last_message["direction"]
# Get number of steps from the different received arguments
distance = float(last_message["distance"])
# Print status
logger.info("The focus movement is started.")
self.focus(direction, distance)
else:
logger.warning(
f"The received message was not understood {last_message}"
)
elif command != "":
logger.warning(
f"We did not understand the received request {command} - {last_message}"
)
return
def focus(self, direction, distance, speed=focus_max_speed):
"""moves the focus stepper
direction is either UP or DOWN
distance is received in mm
speed is in mm/sec"""
logger.info(
f"The focus stage will move {direction} for {distance}mm at {speed}mm/sec"
)
# Validation of inputs
if direction != "UP" and direction != "DOWN":
logger.error("The direction command is not recognised")
logger.error("It should be either UP or DOWN")
return
if distance > 45:
logger.error("You are trying to move more than the stage physical size")
return
# We are going to use microsteps, so we need to multiply by 16 the steps number
nb_steps = self.focus_steps_per_mm * distance * 16
logger.debug(f"The number of steps that will be applied is {nb_steps}")
steps_per_second = speed * self.focus_steps_per_mm * 16
logger.debug(f"There will be a speed of {steps_per_second} steps per second")
if steps_per_second > 400:
steps_per_second = 400
logger.warning("The requested speed is faster than the maximum safe speed")
logger.warning(
f"The speed of the motor is going to be limited to {steps_per_second/16/self.focus_steps_per_mm}mm/sec"
)
# On linux, the minimal acceptable delay managed by the system is 0.1ms
# see https://stackoverflow.com/questions/1133857/how-accurate-is-pythons-time-sleep
# However we have a fixed delay of at least 2.5ms per step due to the library
# Our maximum speed is thus about 400 pulses per second or 0.5mm/sec of stage speed
delay = max((1 / steps_per_second) - 0.0025, 0)
logger.debug(f"The delay between two steps is {delay}s")
# Publish the status "Started" to via MQTT to Node-RED
self.actuator_client.client.publish(
"status/focus",
f'{{"status":"Started", "duration":{nb_steps / steps_per_second}}}',
)
# Depending on direction, select the right direction for the focus
if direction == "UP":
self.focus_stepper.go(adafruit_motor.stepper.FORWARD, nb_steps, delay)
if direction == "DOWN":
self.focus_stepper.go(adafruit_motor.stepper.BACKWARD, nb_steps, delay)
# The pump max speed will be at about 400 full steps per second
# This amounts to 0.9mL per seconds maximum, or 54mL/min
# NEMA14 pump with 3 rollers is 0.509 mL per round, actual calculation at
# Stepper is 200 steps/round, or 393steps/ml
# https://www.wolframalpha.com/input/?i=pi+*+%280.8mm%29%C2%B2+*+54mm+*+3
def pump(self, direction, volume, speed=pump_max_speed):
"""moves the pump stepper
direction is either FORWARD or BACKWARD
volume is in mL
speed is in mL/min"""
logger.info(f"The pump will move {direction} for {volume}mL at {speed}mL/min")
# Validation of inputs
if direction != "FORWARD" and direction != "BACKWARD":
logger.error("The direction command is not recognised")
logger.error("It should be either FORWARD or BACKWARD")
return
nb_steps = self.pump_steps_per_ml * volume
logger.debug(f"The number of steps that will be applied is {nb_steps}")
steps_per_second = speed * self.pump_steps_per_ml / 60
logger.debug(f"There will be a speed of {steps_per_second} steps per second")
if steps_per_second > 400:
steps_per_second = 400
logger.warning("The requested speed is faster than the maximum safe speed")
logger.warning(
f"The speed of the motor is going to be limited to {steps_per_second*60/self.pump_steps_per_ml}mL/min"
)
# On linux, the minimal acceptable delay managed by the system is 0.1ms
# see https://stackoverflow.com/questions/1133857/how-accurate-is-pythons-time-sleep
# However we have a fixed delay of at least 2.5ms per step due to the library
# Our maximum speed is thus about 400 pulses per second or 2 turn per second of the pump
# 10mL => 6140 pas
# 1.18gr => 1.18mL
# Actual pas/mL => 5200
# Max speed is 400 steps/sec, or 4.6mL/min
# 15mL at 3mL/min
# nb_steps = 5200 * 15 = 78000
# sps = 3mL/min * 5200s/mL = 15600s/min / 60 => 260sps
delay = max((1 / steps_per_second) - 0.0025, 0)
logger.debug(f"The delay between two steps is {delay}s")
# Publish the status "Started" to via MQTT to Node-RED
self.actuator_client.client.publish(
"status/pump",
f'{{"status":"Started", "duration":{nb_steps / steps_per_second}}}',
)
# Depending on direction, select the right direction for the focus
if direction == "FORWARD":
self.pump_stepper.go(adafruit_motor.stepper.FORWARD, nb_steps, delay)
if direction == "BACKWARD":
self.pump_stepper.go(adafruit_motor.stepper.BACKWARD, nb_steps, delay)
@logger.catch
def run(self):
"""This is the function that needs to be started to create a thread"""
logger.info(
f"The stepper control process has been started in process {os.getpid()}"
)
# Creates the MQTT Client
# We have to create it here, otherwise when the process running run is started
# it doesn't see changes and calls made by self.actuator_client because this one
# only exist in the master process
# see https://stackoverflow.com/questions/17172878/using-pythons-multiprocessing-process-class
self.actuator_client = planktoscope.mqtt.MQTT_Client(
topic="actuator/#", name="actuator_client"
)
# Publish the status "Ready" to via MQTT to Node-RED
self.actuator_client.client.publish("status/pump", '{"status":"Ready"}')
# Publish the status "Ready" to via MQTT to Node-RED
self.actuator_client.client.publish("status/focus", '{"status":"Ready"}')
while not self.stop_event.is_set():
# check if a new message has been received
self.treat_command()
if self.pump_stepper.move():
self.actuator_client.client.publish(
"status/pump",
'{"status":"Done"}',
)
if self.focus_stepper.move():
self.actuator_client.client.publish(
"status/focus",
'{"status":"Done"}',
)
time.sleep(0)
logger.info("Shutting down the stepper process")
self.actuator_client.client.publish("status/pump", '{"status":"Dead"}')
self.actuator_client.client.publish("status/focus", '{"status":"Dead"}')
self.pump_stepper.shutdown()
self.focus_stepper.shutdown()
self.actuator_client.shutdown()
logger.info("Stepper process shut down! See you!")
# This is called if this script is launched directly
if __name__ == "__main__":
# Starts the stepper thread for actuators
# This needs to be in a threading or multiprocessing wrapper
stepper_thread = StepperProcess()
stepper_thread.start()
stepper_thread.join()

View file

@ -1,46 +0,0 @@
#!/usr/bin/env python
#Strating pumping foward a vol of 24ml with a flowrate of 3.2ml/min, use this command line :
#python3.7 path/to/file/pump.py 24 3.2 foward
from adafruit_motor import stepper
from adafruit_motorkit import MotorKit
from time import sleep
import sys
volume = int(sys.argv[1])
flowrate = float(sys.argv[2])
action = str(sys.argv[3])
kit = MotorKit()
pump_stepper = kit.stepper1
pump_stepper.release()
def pump(volume, flowrate, action):
if action == "foward":
action=stepper.BACKWARD
if action == "backward":
action=stepper.FORWARD
nb_step=volume*507 #if sleep(0.05) in between 2 steps
#35000steps for 69g
#nb_step=vol*460 if sleep(0) in between 2 steps
duration=(volume*60)/flowrate
delay=(duration/nb_step)-0.005
for i in range(nb_step):
pump_stepper.onestep(direction=action, style=stepper.DOUBLE)
sleep(delay)
sleep(1)
pump_stepper.release()
#volume, flowrate (from 0 to 20), direction (foward or backward)
pump(volume, flowrate, action)