Source code for theater.steppers

"""
steppers.py : sample code in Python to communicate with an Arduino running StepperSpline

No copyright, 2021-2023, Garth Zeglin.  This file is explicitly placed in the public domain.
"""

#================================================================
import logging
import os.path
import time

# This requires a pySerial installation.
#  Package details: https://pypi.python.org/pypi/pyserial,
#  Documentation: http://pythonhosted.org/pyserial/
import serial

# initialize logging for this module
log = logging.getLogger(__name__)

#================================================================
[docs]class StepperSplineClient(object): """Class to manage a connection to a serial-connected Arduino running the StepperSpline script. :param port: the name of the serial port device :param debug: true when debug log is enabled :param verbose: flag to increase logging output :param kwargs: collect any unused keyword arguments """ def __init__(self, port=None, verbose=False, debug=False, **kwargs ): # logging flags self.verbose = verbose self.debug = debug # initialize the client state self.arduino_time = 0 self.position = [0, 0, 0, 0] self.awake = False # open the serial port, which should also reset the Arduino if os.path.exists(port): self.port = serial.Serial(port, 115200, timeout=5) if self.verbose: log.info("Opened serial port named %s", self.port.name) log.info("Sleeping briefly while Arduino boots...") # wait briefly for the Arduino to finish booting time.sleep(2) # units are seconds # throw away any extraneous input self.port.flushInput() else: log.warning("Serial port %s not available, running offline.", port) self.port = None return
[docs] def close(self): """Shut down the serial connection to the Arduino, after which this object may no longer be used.""" if self.port is not None: self.port.close() self.port = None return
#---------------------------------------------------------------- def _wait_for_input(self): if self.port is None: return line = self.port.readline().rstrip() if line: elements = line.split() if self.verbose: log.debug("Received: '%s'", line) if elements[0] == b'txyza': self.arduino_time = int(elements[1]) self.position = [int(s) for s in elements[2:]] if self.verbose: log.debug("Steppers at %s", self.position) elif elements[0] == b'awake': self.awake = True elif elements[0] == b'id': self.awake = True log.info("Arduino firmware: %s", line.decode()) elif elements[0] == b'dbg': log.info("Received debugging message: %s", line) else: log.warning("Unknown status message: %s", line) return def poll_status(self): if self.port is not None: if self.port.in_waiting > 0: self._wait_for_input() #---------------------------------------------------------------- def _send_command(self, string): if self.verbose: log.debug("Sending: %s", string) if self.port is not None: self.port.write(string.encode() + b'\n') return
[docs] def motor_enable( self, value=True): """Issue a command to enable or disable the stepper motor drivers.""" self._send_command( "enable 1" if value is True else "enable 0" ) return
[docs] def send_move(self, position): """Issue a command to move to a [x, y, z, a] absolute position (specified in microsteps) and return immediately. :param position: a list or tuple with at least four elements """ self._send_command("a xyza %d %d %d %d" % tuple(position)) return
[docs] def send_relative_move(self, position, mask='xyza'): """Issue a command to move to a relative position (specified in microsteps) and return immediately. :param position: a list or tuple with one element per channel mask entry :param mask: a string specifying the channels to include """ self._send_command("d " + mask + " " + " ".join([str(int(x)) for x in position])) return
[docs] def send_tempo(self, tempo, mask='xyza'): """Issue a command to set the spline rate and return immediately. :param tempo: a scalar spline rate in segments/minute :param mask: a string specifying the channels to include (default is all) """ # the spline rate command expects milliseconds per spline segment; limit # the value to 200 ms/segment, i.e. a maximum rate of 5 segs/sec (300 segs/min) value = max(int(60000 / tempo), 200) # create a string with the value duplicated for each mask element elems = len(mask) values = [value]*elems self._send_command("r " + mask + " " + " ".join([str(x) for x in values])) return
[docs] def send_knots(self, knots, mask='xyza'): """Issue a series of knot commands to append Bezier spline segments (specified in microsteps) to the current path. :param knots: a matrix in which each row specifies a knot point value for each active channel :param mask: a string specifying the channels to include """ # reset the knot origin self._send_command("s " + mask) # send the knots for row in knots: self._send_command("k " + mask + " " + " ".join([str(int(x)) for x in row])) return
#----------------------------------------------------------------
[docs] def wait_for_wakeup(self): """Issue a status query and wait until status has been received.""" while self.awake is False: self._send_command("version") if self.port is not None: self._wait_for_input() else: self.awake = True # simulate
#================================================================