python: reminder to add tests

This commit is contained in:
Romain Bazile 2020-10-06 11:42:07 +02:00
parent 1a42f05941
commit e89cbdc2aa
5 changed files with 30 additions and 12 deletions

View file

@ -526,3 +526,8 @@ class ImagerProcess(multiprocessing.Process):
# self.streaming_thread.kill() # self.streaming_thread.kill()
logger.success("Imager process shut down! See you!") logger.success("Imager process shut down! See you!")
# This is called if this script is launched directly
if __name__ == "__main__":
# TODO This should be a test suite for this library
pass

View file

@ -90,6 +90,7 @@ def light(state):
# This is called if this script is launched directly # This is called if this script is launched directly
if __name__ == "__main__": if __name__ == "__main__":
# TODO This should be a test suite for this library
import RPi.GPIO as GPIO import RPi.GPIO as GPIO
import sys import sys

View file

@ -164,4 +164,10 @@ class MQTT_Client:
def shutdown(self, topic="", message=""): def shutdown(self, topic="", message=""):
logger.info(f"Shutting down mqtt client {self.name}") logger.info(f"Shutting down mqtt client {self.name}")
self.client.loop_stop() self.client.loop_stop()
logger.debug(f"Mqtt client {self.name} shut down") logger.debug(f"Mqtt client {self.name} shut down")
# This is called if this script is launched directly
if __name__ == "__main__":
# TODO This should be a test suite for this library
pass

View file

@ -361,3 +361,8 @@ class SegmenterProcess(multiprocessing.Process):
self.segmenter_client.shutdown() self.segmenter_client.shutdown()
logger.success("Segmenter process shut down! See you!") logger.success("Segmenter process shut down! See you!")
# This is called if this script is launched directly
if __name__ == "__main__":
# TODO This should be a test suite for this library
pass

View file

@ -209,15 +209,15 @@ class StepperProcess(multiprocessing.Process):
# define the names for the 2 exsting steppers # define the names for the 2 exsting steppers
if self.stepper_type == "adafruit": if self.stepper_type == "adafruit":
logger.info("Loading the adafruit configuration") logger.info("Loading the adafruit configuration")
kit = adafruit_motorkit.MotorKit() kit = adafruit_motorkit.MotorKit()
if reverse: if reverse:
self.pump_stepper = stepper(kit.stepper2, adafruit_motor.stepper.DOUBLE) self.pump_stepper = stepper(kit.stepper2, adafruit_motor.stepper.DOUBLE)
self.focus_stepper = stepper( self.focus_stepper = stepper(
kit.stepper1, adafruit_motor.stepper.MICROSTEP, size=45 kit.stepper1, adafruit_motor.stepper.MICROSTEP, size=45
) )
else: else:
self.pump_stepper = stepper(kit.stepper1, adafruit_motor.stepper.DOUBLE) self.pump_stepper = stepper(kit.stepper1, adafruit_motor.stepper.DOUBLE)
self.focus_stepper = stepper( self.focus_stepper = stepper(
kit.stepper2, adafruit_motor.stepper.MICROSTEP, size=45 kit.stepper2, adafruit_motor.stepper.MICROSTEP, size=45
) )
elif self.stepper_type == "waveshare": elif self.stepper_type == "waveshare":
@ -237,7 +237,7 @@ class StepperProcess(multiprocessing.Process):
self.focus_stepper = stepper( self.focus_stepper = stepper(
StepperWaveshare(dir_pin=24, step_pin=18, enable_pin=4), StepperWaveshare(dir_pin=24, step_pin=18, enable_pin=4),
size=45, size=45,
) )
else: else:
logger.error( logger.error(
"The stepper control type is not recognized. Should be 'adafruit' or 'waveshare'" "The stepper control type is not recognized. Should be 'adafruit' or 'waveshare'"
@ -398,7 +398,7 @@ class StepperProcess(multiprocessing.Process):
# However we have a fixed delay of at least 2.5ms per step due to the library # However we have a fixed delay of at least 2.5ms per step due to the library
# Our maximum speed is thus about 400 pulses per second or 0.5mm/sec of stage speed # Our maximum speed is thus about 400 pulses per second or 0.5mm/sec of stage speed
if self.stepper_type == "adafruit": if self.stepper_type == "adafruit":
delay = max((1 / steps_per_second) - 0.0025, 0) delay = max((1 / steps_per_second) - 0.0025, 0)
else: else:
delay = 1 / steps_per_second delay = 1 / steps_per_second
logger.debug(f"The delay between two steps is {delay}s") logger.debug(f"The delay between two steps is {delay}s")
@ -459,7 +459,7 @@ class StepperProcess(multiprocessing.Process):
# nb_steps = 5200 * 15 = 78000 # nb_steps = 5200 * 15 = 78000
# sps = 3mL/min * 5200s/mL = 15600s/min / 60 => 260sps # sps = 3mL/min * 5200s/mL = 15600s/min / 60 => 260sps
if self.stepper_type == "adafruit": if self.stepper_type == "adafruit":
delay = max((1 / steps_per_second) - 0.0025, 0) delay = max((1 / steps_per_second) - 0.0025, 0)
else: else:
delay = 1 / steps_per_second delay = 1 / steps_per_second
logger.debug(f"The delay between two steps is {delay}s") logger.debug(f"The delay between two steps is {delay}s")
@ -523,6 +523,7 @@ class StepperProcess(multiprocessing.Process):
# This is called if this script is launched directly # This is called if this script is launched directly
if __name__ == "__main__": if __name__ == "__main__":
# TODO This should be a test suite for this library
# Starts the stepper thread for actuators # Starts the stepper thread for actuators
# This needs to be in a threading or multiprocessing wrapper # This needs to be in a threading or multiprocessing wrapper
stepper_thread = StepperProcess() stepper_thread = StepperProcess()