Source code for stage.valves

"""
valves.py : sample code in Python to communicate with an Arduino running ValveControl

No copyright, 2021-2024, Garth Zeglin.  This file is explicitly placed in the public domain.
"""

#================================================================
import logging
import os.path
import time

# This requires a pySerial installation.
#  Package details: https://pypi.python.org/pypi/pyserial,
#  Documentation: http://pythonhosted.org/pyserial/
import serial

# initialize logging for this module
log = logging.getLogger(__name__)

#================================================================
[docs] class ValveControlClient: """Class to manage a connection to a serial-connected Arduino running the ValveControl script. :param port: the name of the serial port device :param debug: true when debug log is enabled :param verbose: flag to increase logging output :param kwargs: collect any unused keyword arguments """ def __init__(self, port=None, verbose=False, debug=False, **kwargs ): # logging flags self.verbose = verbose self.debug = debug # initialize the client state self.arduino_time = 0 self.pwm = [0.0, 0.0, 0.0, 0.0] self.awake = False # open the serial port, which should also reset the Arduino if os.path.exists(port): self.port = serial.Serial(port, 115200, timeout=5) if self.verbose: log.info("Opened serial port named %s", self.port.name) log.info("Sleeping briefly while Arduino boots...") # wait briefly for the Arduino to finish booting time.sleep(2) # units are seconds # throw away any extraneous input self.port.flushInput() else: log.warning("Serial port %s not available, running offline.", port) self.port = None return
[docs] def close(self): """Shut down the serial connection to the Arduino, after which this object may no longer be used.""" if self.port is not None: self.port.close() self.port = None return
#---------------------------------------------------------------- def _wait_for_input(self): if self.port is None: return line = self.port.readline().rstrip() if line: elements = line.split() if self.verbose: log.debug("Received: '%s'", line) if elements[0] == b'status': self.arduino_time = int(elements[1]) self.pwm = [float(s) for s in elements[2:6]] if self.verbose: log.debug("Valves at %s", self.pwm) elif elements[0] == b'awake': self.awake = True elif elements[0] == b'id': self.awake = True log.info("Arduino firmware: %s", line.decode()) elif elements[0] == b'dbg': log.info("Received debugging message: %s", line) else: log.warning("Unknown status message: %s", line) return def poll_status(self): if self.port is not None: if self.port.in_waiting > 0: self._wait_for_input() #---------------------------------------------------------------- def _send_command(self, string): if self.verbose: log.debug("Sending: %s", string) if self.port is not None: self.port.write(string.encode() + b'\n') return
[docs] def send_stop(self): """Issue a command to shut off all air flow.""" self._send_command("stop") return
[docs] def send_empty(self): """Issue a command to shut motion and exhaust all air.""" self._send_command("empty") return
[docs] def send_move(self, joint, pwm): """Issue a command to start an open-loop motion on the specified joint with the given pwm rate and return immediately. :param joint: joint identifier (usually 1 or 2) :param pwm: integer valve PWM from -100 to 100 """ self._send_command("move %d %d" % (joint, pwm)) return
#----------------------------------------------------------------
[docs] def wait_for_wakeup(self): """Issue a status query and wait until status has been received.""" while self.awake is False: self._send_command("version") if self.port is not None: self._wait_for_input() else: self.awake = True # simulate
#================================================================