Python: various sourcery fixes and suggestions

This commit is contained in:
Romain Bazile 2021-11-19 13:31:24 +01:00
parent 31c91a246a
commit 8e43977ed7
8 changed files with 32 additions and 32 deletions

View file

@ -31,6 +31,7 @@ class Display(object):
logger.success("planktoscope.display is ready!") logger.success("planktoscope.display is ready!")
except Exception as e: except Exception as e:
logger.error("Could not detect the display") logger.error("Could not detect the display")
logger.error(f"Exception was {e}")
self.display_available = False self.display_available = False
def display_machine_name(self): def display_machine_name(self):

View file

@ -181,7 +181,7 @@ class raspimjpeg(object):
return status return status
except FileNotFoundError as e: except FileNotFoundError as e:
logger.error( logger.error(
f"The status file was not found, make sure the filesystem has not been corrupted" "The status file was not found, make sure the filesystem has not been corrupted"
) )
return "" return ""
@ -489,7 +489,7 @@ class raspimjpeg(object):
""" """
logger.debug(f"Capturing an image to {path}") logger.debug(f"Capturing an image to {path}")
if path == "": if path == "":
self.__send_command(f"im") self.__send_command("im")
else: else:
self.__send_command(f"im {path}") self.__send_command(f"im {path}")
time.sleep(0.1) time.sleep(0.1)
@ -500,7 +500,7 @@ class raspimjpeg(object):
def stop(self): def stop(self):
"""Halt and release the camera.""" """Halt and release the camera."""
logger.debug("Releasing the camera now") logger.debug("Releasing the camera now")
self.__send_command(f"ru 0") self.__send_command("ru 0")
def close(self): def close(self):
"""Kill the process.""" """Kill the process."""

View file

@ -22,7 +22,7 @@ class ImagerState(object):
allowed = [] allowed = []
def switch(self, state): def switch(self, state):
""" Switch to new state """ """Switch to new state"""
if state.name in self.allowed: if state.name in self.allowed:
logger.info(f"Current:{self} => switched to new state {state.name}") logger.info(f"Current:{self} => switched to new state {state.name}")
self.__class__ = state self.__class__ = state
@ -39,33 +39,33 @@ class Stop(ImagerState):
class Imaging(ImagerState): class Imaging(ImagerState):
""" State of getting ready to start """ """State of getting ready to start"""
name = "imaging" name = "imaging"
allowed = ["waiting", "stop"] allowed = ["waiting", "stop"]
class Waiting(ImagerState): class Waiting(ImagerState):
""" State of waiting for the pump to finish """ """State of waiting for the pump to finish"""
name = "waiting" name = "waiting"
allowed = ["stop", "capture"] allowed = ["stop", "capture"]
class Capture(ImagerState): class Capture(ImagerState):
""" State of capturing image """ """State of capturing image"""
name = "capture" name = "capture"
allowed = ["stop", "waiting"] allowed = ["stop", "waiting"]
class Imager(object): class Imager(object):
""" A class representing the imager """ """A class representing the imager"""
def __init__(self): def __init__(self):
# State of the imager - default is stop. # State of the imager - default is stop.
self.state = Stop() self.state = Stop()
def change(self, state): def change(self, state):
""" Change state """ """Change state"""
self.state.switch(state) self.state.switch(state)

View file

@ -35,7 +35,7 @@ class StreamingHandler(http.server.BaseHTTPRequestHandler):
with open("/dev/shm/mjpeg/cam.jpg", "rb") as jpeg: # nosec with open("/dev/shm/mjpeg/cam.jpg", "rb") as jpeg: # nosec
frame = jpeg.read() frame = jpeg.read()
except FileNotFoundError as e: except FileNotFoundError as e:
logger.error(f"Camera has not been started yet") logger.error("Camera has not been started yet")
time.sleep(5) time.sleep(5)
except Exception as e: except Exception as e:
logger.exception(f"An exception occured {e}") logger.exception(f"An exception occured {e}")

View file

@ -117,20 +117,20 @@ class MQTT_Client:
# When connected, run subscribe() # When connected, run subscribe()
self.client.subscribe(self.topic) self.client.subscribe(self.topic)
@logger.catch
# Run this function in order to subscribe to all the topics begining by actuator # Run this function in order to subscribe to all the topics begining by actuator
@logger.catch
def on_subscribe(self, client, obj, mid, granted_qos): def on_subscribe(self, client, obj, mid, granted_qos):
# Print when subscribed # Print when subscribed
# TODO Fix bug when this is called outside of this init function (for example when the imager subscribe to status/pump) # TODO Fix bug when this is called outside of this init function (for example when the imager subscribe to status/pump)
logger.success( logger.success(
f"{self.name} subscribed to {self.topic}! - mid:{str(mid)} qos:{str(granted_qos)}" f"{self.name} subscribed to {self.topic}! - mid:{mid} qos:{granted_qos}"
) )
# Run this command when Node-RED is sending a message on the subscribed topic # Run this command when Node-RED is sending a message on the subscribed topic
@logger.catch @logger.catch
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
# Print the topic and the message # Print the topic and the message
logger.info(f"{self.name}: {msg.topic} {str(msg.qos)} {str(msg.payload)}") logger.info(f"{self.name}: {msg.topic} {msg.qos} {msg.payload}")
# Parse the topic to find the command. ex : actuator/pump -> pump # Parse the topic to find the command. ex : actuator/pump -> pump
# This only removes the top-level topic! # This only removes the top-level topic!
self.command = msg.topic.split("/", 1)[1] self.command = msg.topic.split("/", 1)[1]
@ -157,7 +157,7 @@ class MQTT_Client:
return self.__new_message return self.__new_message
def read_message(self): def read_message(self):
logger.debug(f"clearing the __new_message flag") logger.debug("clearing the __new_message flag")
self.__new_message = False self.__new_message = False
@logger.catch @logger.catch

View file

@ -36,7 +36,7 @@ def adaptative_threshold(img):
# logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# logger.debug(time.monotonic() - start) # logger.debug(time.monotonic() - start)
# logger.success(f"Threshold used was {ret}") # logger.success(f"Threshold used was {ret}")
logger.success(f"Adaptative threshold is done") logger.success("Adaptative threshold is done")
return mask return mask
@ -62,7 +62,7 @@ def simple_threshold(img):
# logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# logger.debug(time.monotonic() - start) # logger.debug(time.monotonic() - start)
logger.info(f"Threshold value used was {ret}") logger.info(f"Threshold value used was {ret}")
logger.success(f"Simple threshold is done") logger.success("Simple threshold is done")
return mask return mask

View file

@ -35,11 +35,10 @@ def uuidMachineName(machine="", type=1):
""" """
if type == 4: if type == 4:
id = str(uuid.uuid4()) id = str(uuid.uuid4())
elif machine == "":
id = str(uuid.uuid1())
else: else:
if machine == "": id = str(uuid.uuid1(node=int(str(machine)[-12:], 16)))
id = str(uuid.uuid1())
else:
id = str(uuid.uuid1(node=int(str(machine)[-12:], 16)))
name = "" name = ""
x = id.rsplit("-", 1) x = id.rsplit("-", 1)
machine = x[1] machine = x[1]
@ -84,13 +83,13 @@ def uuidMachine(machine="", type=1):
str: universally unique id str: universally unique id
""" """
if type == 4: if type == 4:
id = str(uuid.uuid4()) return str(uuid.uuid4())
else: else:
if machine == "": return (
id = str(uuid.uuid1()) str(uuid.uuid1())
else: if machine == ""
id = str(uuid.uuid1(node=int(str(machine)[-12:], 16))) else str(uuid.uuid1(node=int(str(machine)[-12:], 16)))
return id )
def uuidName(): def uuidName():
@ -151,12 +150,12 @@ def getSerial():
Returns: Returns:
str: serial number or MAC address str: serial number or MAC address
""" """
if os.path.exists("/sys/firmware/devicetree/base/serial-number"): if not os.path.exists("/sys/firmware/devicetree/base/serial-number"):
with open("/sys/firmware/devicetree/base/serial-number", "r") as serial_file:
return serial_file.readline().strip("\x00")
else:
return str(uuid.getnode()) return str(uuid.getnode())
with open("/sys/firmware/devicetree/base/serial-number", "r") as serial_file:
return serial_file.readline().strip("\x00")
if __name__ == "__main__": if __name__ == "__main__":
print("Type 4:") print("Type 4:")