#!/usr/bin/env python3
# servo_rigging_example.py - operate a Mini Maestro by mapping DFMoco channels to multiple servos
# Written in 2018 by Garth Zeglin <garthz@cmu.edu>
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import os, sys, argparse, math, logging, socket
# This requires a pySerial installation.
from serial.serialutil import SerialException
# Configure load path to include the parent folder so the course library can be found in a parallel folder.
sys.path.insert(0, os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
# Import modules from the course library.
import ase.DFMocoProtocol
import ase.MiniMaestroProtocol
import ase.SerialTransport
import ase.SocketTransport
import ase.logging
import ase.events
################################################################
[docs]class ServoRiggingDFMocoServerProtocol(ase.DFMocoProtocol.DFMocoServerProtocol):
"""Control a Mini Maestro servo interface from Dragonframe using the DF Moco
protocol. This demo maps two motor channels to multiple servo axes.
"""
def __init__(self, servos):
# initialize parent class for two DFMoco axes
ase.DFMocoProtocol.DFMocoServerProtocol.__init__(self, 2)
# connection to the Mini Maestro board
self.servos = servos
return
def _steps_to_ms(self, steps):
return (1.0 + 0.00025*steps)
def _send_target(self, motor):
"""Update a virtual target; this maps a single motor position to multiple servos."""
# The motor channel is assumed to have the same 0-4000 range as before.
steps = self.targets[motor]
# Compute a wave function with a different phase for each servo position.
phases = [0.0, -0.5 * math.pi, -1.0 * math.pi]
angles = [phase + steps * (1.5 * math.pi / 4000.0) for phase in phases]
outputs = [self._steps_to_ms(4000*math.sin(max(0, min(0.5*math.pi, angle)))) for angle in angles]
self.logger.info("angles: %r outputs: %r", angles, outputs)
# Apply the wave function to the appropriate set of servos.
if motor == 0:
channels = [0,2,4]
else:
channels = [1,3,5]
for channel, output in zip(channels, outputs):
self.servos.protocol.send_target(channel, output)
# Update the internal record of the new virtual target.
self.positions[motor] = self.targets[motor]
return
[docs] def motor_move(self, motor, position):
self.targets[motor] = position
self._send_target(motor)
return
[docs] def jog_motor(self, motor, direction):
self.targets[motor] += 20*direction
self._send_target(motor)
return
[docs] def inch_motor(self, motor, direction):
self.targets[motor] += direction
self._send_target(motor)
return
################################################################
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Dragonframe motion control for the Mini Maestro servo interface. Uses the DFMoco protocol over TCP.")
parser.add_argument('--host', action="store", dest='host', default='127.0.0.1', help='Host IP address to use. Default: 127.0.0.1.')
parser.add_argument('--port', action="store", dest='port', default=9999, type=int, help='TCP port number to use. Default: 9999.')
ase.logging.add_logging_arguments(parser)
parser.add_argument('--list', action='store_true', help='Print list of available serial ports.')
default_serial_port = '/dev/tty.usbmodem00146911'
parser.add_argument( '--maestro', default=default_serial_port,
help='Specify the name of the Mini Maestro Command Port device. Default: %s.' % default_serial_port)
args = parser.parse_args()
# Configure a typical logging setup.
logger, handlers = ase.logging.create_script_logger('minimaestro', args)
# Print the list of serial ports to console if requested.
if args.list:
ase.SerialTransport.print_serial_port_list()
# Create an object to manage the serial port.
servos = ase.SerialTransport.SerialTransport(ase.MiniMaestroProtocol.MiniMaestroProtocol(), args.maestro, 115200)
# Connect to the Mini Maestro board.
logger.info("Connecting to Mini Maestro on %s.", args.maestro)
try:
servos.open()
except SerialException as exp:
logger.error("Unable to open serial port: %r", exp)
logger.info("You might try the --list argument to see the available serial ports.\nExiting.")
sys.exit(1);
# Create a protocol object which can send commands to the Mini Maestro.
protocol = ServoRiggingDFMocoServerProtocol(servos)
# Create a TCP server to receive one connection at a time from Dragonframe.
if ':' in args.host:
args.host, port = args.host.split(':', 1)
args.port = int(port)
# Create a server socket which will attach new client sockets to the protocol.
server = ase.SocketTransport.ServerSocket(lambda: protocol, args.host, args.port)
server.open()
# Wait for a connection, then process messages until the connection closes, then repeat.
try:
while True:
logger.info("Waiting for connection...")
client = server.wait_for_connection()
try:
# Run an event loop to process requests.
while True:
# Run the event loop to process server requests and interface board replies.
ase.events.sleep(1.0, [client, servos])
except socket.error as exc:
logger.info("Socket exception: %r", exc)
# Exiting event loop.
client.close()
except KeyboardInterrupt:
logger.info("User interrupted operation.")
except Exception as exc:
logger.warning("Event loop exited on exception: %r", exc)
server.close()