"""A protocol for communicating with a Pololu Mini-Maestro USB Servo Interface using a serial port transport.
Copyright (c) 2015-2018, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
# The Pololu Mini Maestro 18 is a USB interface for driving up to 18 servos or
# digital I/O lines from a computer. It is also available in several other sizes.
# Pololu documentation: https://www.pololu.com/docs/0J40
# Details of serial commands: https://www.pololu.com/docs/0J40/5.e
# Board Configuration
#
# The Mini Maestro board should be configured using the Pololu-supplied configuration program (Windows or Linux) as follows:
# 1. Serial set to USB Dual Port mode.
# 2. CRC should be left disabled.
# 3. All required servo channels enabled.
# 4. Servo timing ranges should generally be increased to enable full servo travel.
# 5. Verify that firmware 1.03 or later is installed in order to work with macOS 10.11.
#================================================================
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import struct, logging
# Import modules from the course library.
import ase.base
#================================================================
[docs]class MiniMaestroProtocol(ase.base.Protocol):
"""Communicate with a Pololu Mini-Maestro USB servo interface. An instance of
this class can be passed to a serial transport class (e.g. SerialTransport
or QtSerialTransport) which handles the device port. The Mini Maestro
enumerates as two separate ports; this protocol should be attached to the
'command' port at 115200 baud.
"""
# This class uses the 'compact protocol', not the 'Pololu protocol'.
def __init__(self):
self.transport = None #: object representing the device connection
self.messages_received = 0 #: Count of total messages received.
self.messages_sent = 0 #: Count of total messages sent.
# configure logging
self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log
self.logger.setLevel(logging.DEBUG)
# private attributes
self._requests = list() # keep track of pending requests for data
self._buffer = b''
return
[docs] def data_received(self, data):
"""Process a bytestring received from the port, matching it to any data requests."""
self.logger.debug('Status received: %r', data)
# Manage the possibility of partial reads by appending new data to any previously received partial line.
self._buffer += data
# Match received values with requests.
while(len(self._requests) > 0):
req = self._requests[0]
if req[0] == 'get-moving-state':
if len(self._buffer) >= 1:
if req[1] is not None:
value = (self._buffer[0] == b'\x01') # True if any servos are moving
req[1](value)
self._buffer = self._buffer[1:]
del self._requests[0]
else: # insufficient data
return
elif req[0] == 'get-position':
if len(self._buffer) >= 2:
if req[2] is not None:
low_bits = ord(self._buffer[0])
high_bits = ord(self._buffer[1])
value = (high_bits << 8) | low_bits
req[2](req[1], 0.00025 * value)
self._buffer = self._buffer[2:]
del self._requests[0]
else: # insufficient data
return
# This point will only be reached when all requests have been answered.
# Check if there is more returned data than requests.
if len(self._buffer) > 0:
self.logger.warning("Extraneous response received: %r", self._buffer)
self._buffer = b''
return
# ----------------------------------------------------------------
[docs] def is_connected(self):
"""Return true if the serial port device is open."""
return self.transport is not None
[docs] def send_target(self, channel, pulse_width):
"""Send a single Set Target command to the Mini Maestro.
:param channel: integer channel number; the first servo channel is 0
:param pulse_width: servo pulse width in milliseconds; the typical range is 1.0 to 2.0
"""
# convert the pulse width to a 14 bit integer number with units of 250 ns, clamped
# to the valid range
pulse_units = min(max(int(pulse_width * 4000), 0), 16383)
message = struct.pack(b'BBBB', 0x84, channel, pulse_units & 0x7f, pulse_units >> 7)
self.logger.debug("Sending: %r", message)
self.transport.write(message)
self.messages_sent += 1
return
[docs] def send_multiple_targets(self, first_channel, pulse_widths):
"""Send a single Set Target command to the Mini Maestro.
:param first_channel: integer channel number for the first value
:param pulse_widths: list of servo pulse width in milliseconds; the typical range is 1.0 to 2.0
"""
# Convert the pulse widths to 14 bit integers with units of 250 ns, clamped to the valid range
pulse_units = [min(max(int(width * 4000), 0), 16383) for width in pulse_widths]
# Convert a list of two-byte arrays with the low and high 7-bit values.
values = [bytearray((units & 0x7f, units >> 7)) for units in pulse_units]
# Create the header bytes and construct the message.
header = struct.pack(b'BBB', 0x9f, len(pulse_widths), first_channel)
message = header + bytearray().join(values)
self.logger.debug("Sending: %r", message)
self.transport.write(message)
self.messages_sent += 1
return
[docs] def send_speed(self, channel, speed):
"""Send a single Set Speed command to the Mini Maestro. The valid range is
0.025 to 409.575, but for most servos practical values are less than 10.
Zero sets unlimited speed. Out-of-range values are silently clamped.
:param channel: integer channel number; the first servo channel is 0
:param speed: maximum rate of change of servo pulse width in milliseconds/second
"""
# The board expects units of 250 ns/10ms; we are providing a rate in
# ms/sec. Convert the speed to a 14 bit integer number with units
# of 250 ns/10ms, clamped to the valid range.
if speed == 0:
pulse_speed = 0
else:
pulse_speed = min(max(int(speed * 4000 * 0.01), 1), 16383)
message = struct.pack(b'BBBB', 0x87, channel, pulse_speed & 0x7f, pulse_speed >> 7)
self.logger.debug("Sending: %r", message)
self.transport.write(message)
self.messages_sent += 1
return
[docs] def send_multiple_speeds(self, first_channel, speeds):
"""Send multiple Set Speed commands to a Mini Maestro. N.B. there is no special
protocol support, this is a convenience function to issue multiple Set
Speed commands.
:param first_channel: integer channel number for the first value
:param speeds: list of speed values in milliseconds-of-pulse-width/second
"""
for channel, speed in enumerate(speeds):
self.send_speed(first_channel+channel, speed)
return
[docs] def send_acceleration(self, channel, accel):
"""Send a single Set Acceleration command to the Mini Maestro. The valid range
is 0.3125 to 79.6875. Zero sets unlimited acceleration. Out-of-range
values are silently clamped.
:param channel: integer channel number; the first servo channel is 0
:param accel: acceleration of servo pulse width in milliseconds/second/second
"""
# The board expects units of 250 ns/10ms/80ms; we are providing a rate in
# ms/sec/sec. Convert the accel to a 8 bit integer number with units
# of 250 ns/10ms, clamped to the valid range.
if accel == 0:
pulse_accel = 0
else:
pulse_accel = min(max(int(accel * 4000 * 0.01 * 0.08), 1), 255)
message = struct.pack(b'BBBB', 0x89, channel, pulse_accel & 0x7f, pulse_accel >> 7)
self.logger.debug("Sending: %r", message)
self.transport.write(message)
self.messages_sent += 1
return
[docs] def send_multiple_accelerations(self, first_channel, accels):
"""Send multiple Set Accel commands to a Mini Maestro. N.B. there is no special
protocol support, this is a convenience function to issue multiple Set
Acceleration commands.
:param first_channel: integer channel number for the first value
:param accels: list of accel values in milliseconds-of-pulse-width/second/second
"""
for channel, accel in enumerate(accels):
self.send_acceleration(first_channel+channel, accel)
return
[docs] def send_get_moving_state(self, callback=None):
"""Send a Get Moving State request to the Mini Maestro. This is an asynchronous
request, but a callback function can be provided to receive the eventual
return value.
:param callback: callable to be called with a Boolean value
"""
message = b'\x93'
self.logger.debug("Sending: %r", message)
self.transport.write(message)
self.messages_sent += 1
self._requests.append(("get-moving-state", callback))
return
[docs] def send_get_position(self, channel, callback=None):
"""Send a Get Position request to the Mini Maestro. This is an asynchronous
request, but a callback function can be provided to receive the eventual
return value.
:param channel: integer channel number; the first servo channel is 0
:param callback: callable to be called with a (channel, position) tuple, with position specified in milliseconds of pulse width
"""
message = struct.pack(b'BB', 0x90, channel)
self.logger.debug("Sending: %r", message)
self.transport.write(message)
self.messages_sent += 1
self._requests.append(("get-position", channel, callback))
return
#================================================================