stepper: add support for waveshare hat, for #11

This commit is contained in:
Romain Bazile 2020-10-04 23:28:26 +02:00
parent 1488459954
commit 20e07a110d

View file

@ -1,5 +1,5 @@
# Libraries to control the steppers for focusing and pumping # Libraries to control the steppers for focusing and pumping
import adafruit_motor import adafruit_motor.stepper
import adafruit_motorkit import adafruit_motorkit
import time import time
import json import json
@ -7,6 +7,7 @@ import os
import planktoscope.mqtt import planktoscope.mqtt
import planktoscope.light import planktoscope.light
import multiprocessing import multiprocessing
import RPi.GPIO
# Logger library compatible with multiprocessing # Logger library compatible with multiprocessing
from loguru import logger from loguru import logger
@ -14,12 +15,63 @@ from loguru import logger
logger.info("planktoscope.stepper is loaded") logger.info("planktoscope.stepper is loaded")
class StepperWaveshare:
"""A bipolar stepper motor."""
def __init__(self, dir_pin, step_pin, enable_pin):
self.dir_pin = dir_pin
self.step_pin = step_pin
self.enable_pin = enable_pin
RPi.GPIO.setmode(RPi.GPIO.BCM)
RPi.GPIO.setwarnings(False)
RPi.GPIO.setup(self.dir_pin, RPi.GPIO.OUT)
RPi.GPIO.setup(self.step_pin, RPi.GPIO.OUT)
RPi.GPIO.setup(self.enable_pin, RPi.GPIO.OUT)
self.release()
def release(self):
"""Releases all the coils so the motor can free spin, also won't use any power"""
self.__digital_write(self.enable_pin, 1)
def __digital_write(self, pin, value):
RPi.GPIO.output(pin, value)
def stop(self):
self.__digital_write(self.enable_pin, 1)
def onestep(self, *, direction=adafruit_motor.stepper.FORWARD, style=""):
"""Performs one step.
:param int direction: Either `FORWARD` or `BACKWARD`"""
if direction == adafruit_motor.stepper.FORWARD:
self.__digital_write(self.enable_pin, 0)
self.__digital_write(self.dir_pin, 1)
elif direction == adafruit_motor.stepper.BACKWARD:
self.__digital_write(self.enable_pin, 0)
self.__digital_write(self.dir_pin, 0)
else:
logger.error(
"The direction must be : adafruit_motor.stepper.FORWARD or adafruit_motor.stepper.BACKWARD"
)
self.release()
return
# This delay is just to make sure the chip had time to take the dir/enable pin
# into account, min delay is 650ns
time.sleep(0.000001)
self.__digital_write(self.step_pin, True)
# This delay is the minimal time high for the step impulse, 2µs
time.sleep(0.000005)
self.__digital_write(self.step_pin, False)
class stepper: class stepper:
def __init__(self, stepper, style, size=0): def __init__(self, stepper, style=adafruit_motor.stepper.SINGLE, size=0):
"""Initialize the stepper class """Initialize the stepper class
Args: Args:
stepper (adafruit_motorkit.Motorkit().stepper): reference to the object that controls the stepper stepper (adafruit_motorkit.Motorkit().stepper or StepperWaveshare): reference to the object that controls the stepper
style (adafruit_motor.stepper): style of the movement SINGLE, DOUBLE, MICROSTEP style (adafruit_motor.stepper): style of the movement SINGLE, DOUBLE, MICROSTEP
size (int): maximum number of steps of this stepper (aka stage size). Can be 0 if not applicable size (int): maximum number of steps of this stepper (aka stage size). Can be 0 if not applicable
""" """
@ -119,6 +171,8 @@ class StepperProcess(multiprocessing.Process):
# pump max speed is in ml/min # pump max speed is in ml/min
pump_max_speed = 30 pump_max_speed = 30
stepper_type = "adafruit"
def __init__(self, event): def __init__(self, event):
super(StepperProcess, self).__init__() super(StepperProcess, self).__init__()
@ -149,19 +203,46 @@ class StepperProcess(multiprocessing.Process):
"focus_max_speed", self.focus_max_speed "focus_max_speed", self.focus_max_speed
) )
self.pump_max_speed = configuration.get("pump_max_speed", self.pump_max_speed) self.pump_max_speed = configuration.get("pump_max_speed", self.pump_max_speed)
self.stepper_type = configuration.get("stepper_type", self.stepper_type)
# define the names for the 2 exsting steppers # define the names for the 2 exsting steppers
if self.stepper_type == "adafruit":
logger.info("Loading the adafruit configuration")
kit = adafruit_motorkit.MotorKit() kit = adafruit_motorkit.MotorKit()
if reverse: if reverse:
self.pump_stepper = stepper(kit.stepper2, adafruit_motor.stepper.DOUBLE) self.pump_stepper = stepper(kit.stepper2, adafruit_motor.stepper.DOUBLE)
self.focus_stepper = stepper( self.focus_stepper = stepper(
kit.stepper1, adafruit_motor.stepper.MICROSTEP, 45 kit.stepper1, adafruit_motor.stepper.MICROSTEP, size=45
) )
else: else:
self.pump_stepper = stepper(kit.stepper1, adafruit_motor.stepper.DOUBLE) self.pump_stepper = stepper(kit.stepper1, adafruit_motor.stepper.DOUBLE)
self.focus_stepper = stepper( self.focus_stepper = stepper(
kit.stepper2, adafruit_motor.stepper.MICROSTEP, 45 kit.stepper2, adafruit_motor.stepper.MICROSTEP, size=45
) )
elif self.stepper_type == "waveshare":
logger.info("Loading the waveshare configuration")
if reverse:
self.pump_stepper = stepper(
StepperWaveshare(dir_pin=24, step_pin=18, enable_pin=4)
)
self.focus_stepper = stepper(
StepperWaveshare(dir_pin=13, step_pin=19, enable_pin=12),
size=45,
)
else:
self.pump_stepper = stepper(
StepperWaveshare(dir_pin=13, step_pin=19, enable_pin=12)
)
self.focus_stepper = stepper(
StepperWaveshare(dir_pin=24, step_pin=18, enable_pin=4),
size=45,
)
else:
logger.error(
"The stepper control type is not recognized. Should be 'adafruit' or 'waveshare'"
)
logger.error(f"{self.stepper_type} is what was supplied")
return
logger.debug(f"Stepper initialisation is over") logger.debug(f"Stepper initialisation is over")
@ -315,7 +396,10 @@ class StepperProcess(multiprocessing.Process):
# see https://stackoverflow.com/questions/1133857/how-accurate-is-pythons-time-sleep # see https://stackoverflow.com/questions/1133857/how-accurate-is-pythons-time-sleep
# However we have a fixed delay of at least 2.5ms per step due to the library # However we have a fixed delay of at least 2.5ms per step due to the library
# Our maximum speed is thus about 400 pulses per second or 0.5mm/sec of stage speed # Our maximum speed is thus about 400 pulses per second or 0.5mm/sec of stage speed
if self.stepper_type == "adafruit":
delay = max((1 / steps_per_second) - 0.0025, 0) delay = max((1 / steps_per_second) - 0.0025, 0)
else:
delay = 1 / steps_per_second
logger.debug(f"The delay between two steps is {delay}s") logger.debug(f"The delay between two steps is {delay}s")
# Publish the status "Started" to via MQTT to Node-RED # Publish the status "Started" to via MQTT to Node-RED
@ -373,7 +457,10 @@ class StepperProcess(multiprocessing.Process):
# 15mL at 3mL/min # 15mL at 3mL/min
# nb_steps = 5200 * 15 = 78000 # nb_steps = 5200 * 15 = 78000
# sps = 3mL/min * 5200s/mL = 15600s/min / 60 => 260sps # sps = 3mL/min * 5200s/mL = 15600s/min / 60 => 260sps
if self.stepper_type == "adafruit":
delay = max((1 / steps_per_second) - 0.0025, 0) delay = max((1 / steps_per_second) - 0.0025, 0)
else:
delay = 1 / steps_per_second
logger.debug(f"The delay between two steps is {delay}s") logger.debug(f"The delay between two steps is {delay}s")
# Publish the status "Started" to via MQTT to Node-RED # Publish the status "Started" to via MQTT to Node-RED