Source code for ase.MiniMaestroProtocol

"""A protocol for communicating with a Pololu Mini-Maestro USB Servo Interface using a serial port transport.

Copyright (c) 2015-2018, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""

# The Pololu Mini Maestro 18 is a USB interface for driving up to 18 servos or
# digital I/O lines from a computer.  It is also available in several other sizes.

# Pololu documentation: https://www.pololu.com/docs/0J40
# Details of serial commands: https://www.pololu.com/docs/0J40/5.e

# Board Configuration
#
# The Mini Maestro board should be configured using the Pololu-supplied configuration program (Windows or Linux) as follows:

# 1. Serial set to USB Dual Port mode.
# 2. CRC should be left disabled.
# 3. All required servo channels enabled.
# 4. Servo timing ranges should generally be increased to enable full servo travel.
# 5. Verify that firmware 1.03 or later is installed in order to work with macOS 10.11.

#================================================================

# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals

# Standard library modules.
import struct, logging

# Import modules from the course library.
import ase.base

#================================================================
[docs]class MiniMaestroProtocol(ase.base.Protocol): """Communicate with a Pololu Mini-Maestro USB servo interface. An instance of this class can be passed to a serial transport class (e.g. SerialTransport or QtSerialTransport) which handles the device port. The Mini Maestro enumerates as two separate ports; this protocol should be attached to the 'command' port at 115200 baud. """ # This class uses the 'compact protocol', not the 'Pololu protocol'. def __init__(self): self.transport = None #: object representing the device connection self.messages_received = 0 #: Count of total messages received. self.messages_sent = 0 #: Count of total messages sent. # configure logging self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log self.logger.setLevel(logging.DEBUG) # private attributes self._requests = list() # keep track of pending requests for data self._buffer = b'' return
[docs] def data_received(self, data): """Process a bytestring received from the port, matching it to any data requests.""" self.logger.debug('Status received: %r', data) # Manage the possibility of partial reads by appending new data to any previously received partial line. self._buffer += data # Match received values with requests. while(len(self._requests) > 0): req = self._requests[0] if req[0] == 'get-moving-state': if len(self._buffer) >= 1: if req[1] is not None: value = (self._buffer[0] == b'\x01') # True if any servos are moving req[1](value) self._buffer = self._buffer[1:] del self._requests[0] else: # insufficient data return elif req[0] == 'get-position': if len(self._buffer) >= 2: if req[2] is not None: low_bits = ord(self._buffer[0]) high_bits = ord(self._buffer[1]) value = (high_bits << 8) | low_bits req[2](req[1], 0.00025 * value) self._buffer = self._buffer[2:] del self._requests[0] else: # insufficient data return # This point will only be reached when all requests have been answered. # Check if there is more returned data than requests. if len(self._buffer) > 0: self.logger.warning("Extraneous response received: %r", self._buffer) self._buffer = b'' return
# ----------------------------------------------------------------
[docs] def is_connected(self): """Return true if the serial port device is open.""" return self.transport is not None
[docs] def send_target(self, channel, pulse_width): """Send a single Set Target command to the Mini Maestro. :param channel: integer channel number; the first servo channel is 0 :param pulse_width: servo pulse width in milliseconds; the typical range is 1.0 to 2.0 """ # convert the pulse width to a 14 bit integer number with units of 250 ns, clamped # to the valid range pulse_units = min(max(int(pulse_width * 4000), 0), 16383) message = struct.pack(b'BBBB', 0x84, channel, pulse_units & 0x7f, pulse_units >> 7) self.logger.debug("Sending: %r", message) self.transport.write(message) self.messages_sent += 1 return
[docs] def send_multiple_targets(self, first_channel, pulse_widths): """Send a single Set Target command to the Mini Maestro. :param first_channel: integer channel number for the first value :param pulse_widths: list of servo pulse width in milliseconds; the typical range is 1.0 to 2.0 """ # Convert the pulse widths to 14 bit integers with units of 250 ns, clamped to the valid range pulse_units = [min(max(int(width * 4000), 0), 16383) for width in pulse_widths] # Convert a list of two-byte arrays with the low and high 7-bit values. values = [bytearray((units & 0x7f, units >> 7)) for units in pulse_units] # Create the header bytes and construct the message. header = struct.pack(b'BBB', 0x9f, len(pulse_widths), first_channel) message = header + bytearray().join(values) self.logger.debug("Sending: %r", message) self.transport.write(message) self.messages_sent += 1 return
[docs] def send_speed(self, channel, speed): """Send a single Set Speed command to the Mini Maestro. The valid range is 0.025 to 409.575, but for most servos practical values are less than 10. Zero sets unlimited speed. Out-of-range values are silently clamped. :param channel: integer channel number; the first servo channel is 0 :param speed: maximum rate of change of servo pulse width in milliseconds/second """ # The board expects units of 250 ns/10ms; we are providing a rate in # ms/sec. Convert the speed to a 14 bit integer number with units # of 250 ns/10ms, clamped to the valid range. if speed == 0: pulse_speed = 0 else: pulse_speed = min(max(int(speed * 4000 * 0.01), 1), 16383) message = struct.pack(b'BBBB', 0x87, channel, pulse_speed & 0x7f, pulse_speed >> 7) self.logger.debug("Sending: %r", message) self.transport.write(message) self.messages_sent += 1 return
[docs] def send_multiple_speeds(self, first_channel, speeds): """Send multiple Set Speed commands to a Mini Maestro. N.B. there is no special protocol support, this is a convenience function to issue multiple Set Speed commands. :param first_channel: integer channel number for the first value :param speeds: list of speed values in milliseconds-of-pulse-width/second """ for channel, speed in enumerate(speeds): self.send_speed(first_channel+channel, speed) return
[docs] def send_acceleration(self, channel, accel): """Send a single Set Acceleration command to the Mini Maestro. The valid range is 0.3125 to 79.6875. Zero sets unlimited acceleration. Out-of-range values are silently clamped. :param channel: integer channel number; the first servo channel is 0 :param accel: acceleration of servo pulse width in milliseconds/second/second """ # The board expects units of 250 ns/10ms/80ms; we are providing a rate in # ms/sec/sec. Convert the accel to a 8 bit integer number with units # of 250 ns/10ms, clamped to the valid range. if accel == 0: pulse_accel = 0 else: pulse_accel = min(max(int(accel * 4000 * 0.01 * 0.08), 1), 255) message = struct.pack(b'BBBB', 0x89, channel, pulse_accel & 0x7f, pulse_accel >> 7) self.logger.debug("Sending: %r", message) self.transport.write(message) self.messages_sent += 1 return
[docs] def send_multiple_accelerations(self, first_channel, accels): """Send multiple Set Accel commands to a Mini Maestro. N.B. there is no special protocol support, this is a convenience function to issue multiple Set Acceleration commands. :param first_channel: integer channel number for the first value :param accels: list of accel values in milliseconds-of-pulse-width/second/second """ for channel, accel in enumerate(accels): self.send_acceleration(first_channel+channel, accel) return
[docs] def send_get_moving_state(self, callback=None): """Send a Get Moving State request to the Mini Maestro. This is an asynchronous request, but a callback function can be provided to receive the eventual return value. :param callback: callable to be called with a Boolean value """ message = b'\x93' self.logger.debug("Sending: %r", message) self.transport.write(message) self.messages_sent += 1 self._requests.append(("get-moving-state", callback)) return
[docs] def send_get_position(self, channel, callback=None): """Send a Get Position request to the Mini Maestro. This is an asynchronous request, but a callback function can be provided to receive the eventual return value. :param channel: integer channel number; the first servo channel is 0 :param callback: callable to be called with a (channel, position) tuple, with position specified in milliseconds of pulse width """ message = struct.pack(b'BB', 0x90, channel) self.logger.debug("Sending: %r", message) self.transport.write(message) self.messages_sent += 1 self._requests.append(("get-position", channel, callback)) return
#================================================================