#!/usr/bin/env python3
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import math, time, argparse, threading, queue
# Third-party library modules.
import numpy as np
# Use the scipy fmin optimizer if available.
# ref: https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.fmin.html
import scipy.optimize
have_optimizer = True
except ImportError:
print("Unable to load scipy.optimize.")
have_optimizer = False
# This uses python-osc to communicate with a Max/MSP patch.
# installation: pip3 install python-osc
# source code: https://github.com/attwad/python-osc
# pypi description: https://pypi.org/project/python-osc/
from pythonosc import udp_client
from pythonosc import dispatcher
from pythonosc import osc_server
[docs]class DoublePendulumController(object):
"""Prototype for a double-pendulum controller. This is where you should
customize the underlying control functions, optimization strategy, and
reward functions.
:param args: namespace of command-line arguments returned from argparse
def __init__(self, args):
# initialize mode and state variables for controller
self.mode = 'dissipate'
self.cycle_start = 0.0
self.torque_cost = 0.0
# fixed controller parameters
self.friction_damping = -0.2
self.hand_up_target = np.array((1.0, 0.5*math.pi, 0.0, 0.0))
self.kp = np.array((16.0, 8.0))
self.ki = np.array((4.0, 2.0))
self.kd = np.array((4.0, 2.0))
# integrated error
self.ierr = np.array((0.0, 0.0, 0.0, 0.0))
# keep track of the worst case cost to know when to cancel a trial
self.worst_cost = None
# Construct default parameter vector for the optimization problem. This
# builds a single array with all the default parameter values. See the
# set_optimization_params() method to update the mapping from parameter
# vector back to controller parameters.
self.default_params = np.hstack((self.kp, self.ki, self.kd))
# launch a thread to run the optimizer as a co-routine
self.evaluation_queue = queue.Queue() # next parameter array to evaluate
self.result_queue = queue.Queue() # next computed cost value
self.optimizer_thread = threading.Thread(target=self.optimizer_task)
self.optimizer_thread.daemon = True
[docs] def message(self, msgaddr, *args):
"""Process messages from the Max/MSP display received via OSC over UDP.
:param msgaddr: the address string of the OSC message, eg '/control/mode'
:param args: tuple of OSC message arguments
print("Controller received message %s: %s" % (msgaddr, args))
if msgaddr == "/control/kp1":
self.kp[0] = args[0]
elif msgaddr == "/control/kd1":
self.kd[0] = args[0]
elif msgaddr == "/control/mode":
self.mode = args[0]
[docs] def optimizer_task(self):
"""Entry point for starting the optimizer on a background thread. It communicates with the simulator using a pair of queues."""
print("Starting optimizer thread.")
if have_optimizer:
result = scipy.optimize.fmin(func=self.optimizer_eval, x0=self.default_params)
print("Optimizer finished:", result)
print("Optimizer not available, evaluating default parameters.")
result = self.optimizer_eval(self.default_params)
[docs] def optimizer_eval(self, params):
"""Optimizer callback function to evaluate a parameter vector. This pushes the
vector into a queue and blocks the optimizer thread waiting for a cost result.
:param params: numpy ndarray for the optimization problem parameters
:returns: a scalar cost value
print("Optimizer requesting evaluation of:", params)
# wait for result
cost = self.result_queue.get()
print("Trial yielded cost:", cost)
return cost
[docs] def set_optimization_params(self, params):
"""Configure controller gains for a single trial of the optimization problem.
Called from within the controller state machine when a new parameter
vector is received for evaluation.
:param params: numpy ndarray of optimization parameters
self.kp = params[0:2]
self.ki = params[2:4]
self.kd = params[4:6]
print("Set optimization params: kp is %s, ki is %s, kd is %s" % (self.kp, self.ki, self.kd))
[docs] def compute_control(self, t, dt, state, tau):
"""Method called from simulator to calculate the next step of applied torques.
:param t: time in seconds since simulation began
:param state: four element ndarray of joint positions q and joint velocities qd as [q1, q2, qd1, qd2], expressed in radians and radians/sec
:param tau: two element ndarray to fill in with joint torques to apply
# convenience variables to notate the state variables
q1 = state[0] # 'shoulder' angle in radians
q2 = state[1] # 'elbow' angle in radians
qd1 = state[2] # 'shoulder' velocity in radians/second
qd2 = state[3] # 'elbow' velocity in radians/second
# select action based on mode
if self.mode == 'dissipate':
# add viscous damping to dissipate energy
tau[0] = self.friction_damping * qd1
tau[1] = self.friction_damping * qd2
elif self.mode == 'pose':
# calculate position and velocity error as difference from reference state
qerr = self.hand_up_target - state
# apply PD control to reach the pose (no integral term)
tau[0] = (self.kp[0] * qerr[0]) + (self.kd[0] * qerr[2])
tau[1] = (self.kp[1] * qerr[1]) + (self.kd[1] * qerr[3])
# The following states alternate between driving the pendulum to a
# 'hand-up' pose and relaxing back to the bottom. The 'hand-up' pose
# has the first link horizontal, with the second link pointing straight
# up. These can be used in an optimization process to find the most
# efficient transition between the two poses.
elif self.mode == 'hand-init' or self.mode == 'optimize':
# wait for the next optimization trial to begin
tau[0] = 16.0 * self.friction_damping * qd1
tau[1] = 16.0 * self.friction_damping * qd2
if not self.evaluation_queue.empty():
params = self.evaluation_queue.get()
self.torque_cost = 0.0
self.mode = 'hand-up'
elif self.mode == 'hand-up':
# calculate position and velocity error as difference from reference state
qerr = self.hand_up_target - state
# apply PID control to reach the pose
tau[0] = (self.kp[0] * qerr[0]) + (self.kd[0] * qerr[2]) + (self.ki[0] * self.ierr[0])
tau[1] = (self.kp[1] * qerr[1]) + (self.kd[1] * qerr[3]) + (self.ki[1] * self.ierr[1])
# integrate the error
self.ierr = self.ierr + qerr * dt
# once at the target, relax, and finish the trial
if max(abs(qerr[0:2])) < 0.01 and max(abs(qerr[2:4])) < 0.1:
self.mode = 'hand-down'
self.ierr = np.array((0.0, 0.0, 0.0, 0.0)) # reset integrator
if self.worst_cost is None: self.worst_cost = self.torque_cost
else: self.worst_cost = max(self.worst_cost, self.torque_cost)
# if the trial isn't converging, quit it early
elif self.worst_cost is not None and self.torque_cost > 2*self.worst_cost:
print("Cancelling trial at cost: ", self.torque_cost)
self.mode = 'hand-down'
self.ierr = np.array((0.0, 0.0, 0.0, 0.0)) # reset integrator
elif self.mode == 'hand-down':
# relax and let the arm fall back down to the bottom
tau[0] = 16.0 * self.friction_damping * qd1
tau[1] = 16.0 * self.friction_damping * qd2
# if completely at rest, restart the process
if abs(qd1) < 0.01 and abs(q1) < 0.01:
self.mode = 'hand-init'
elif self.mode == 'swingup':
tau[1] = self.friction_damping * qd2 # assume unpowered elbow
# if completely at rest, add a slight perturbation to kick-start the process
if abs(qd1) < 0.01 and abs(q1) < 0.01:
tau[0] = 1.0
# unstable positive velocity feedback near bottom
elif abs(q1) < 0.5:
tau[0] = 2.0 * qd1
else: # else coast
tau[0] = self.friction_damping * qd1
else: # free-swinging, no torques
tau[0] = 0.0
tau[1] = 0.0
# accumulate sum of squared torques cost
self.torque_cost += tau.dot(tau)
[docs]class DoublePendulumSimulator(object):
"""Dynamic simulation of a frictionless double pendulum. This object can send
state updates to an external server for rendering. It communicates with a
user-supplied control object to compute applied joint torques.
:param args: namespace of command-line arguments returned from argparse
:param cartoon: python-osc UDP client object for sending messages to GUI
:param control: instance of DoublePendulumControl for computing applied torques
def __init__(self, args, cartoon, control):
# save the OSC client object used to update the display
self.cartoon = cartoon
# save the object used to calculate joint torques
self.control = control
# save selected command line flags
self.verbose = args.verbose
self.fast = args.fast
# set default dynamics
# configure transient state
[docs] def reset(self):
"""Reset or initialize all simulator state variables."""
self.t = 0.0
self.dt = 0.001
self.state = np.array([0.0, 0.0, 0.0, 0.0])
self.tau = np.array([0.0, 0.0])
self.dydt = np.ndarray((4,))
[docs] def set_default_dynamic_parameters(self):
"""Set the default dynamics coefficients defining the rigid-body model physics."""
self.l1 = 1.0 # proximal link length, link1
self.l2 = 1.0 # distal link length, link2
self.lc1 = 0.5 # distance from proximal joint to link1 COM
self.lc2 = 0.5 # distance from distal joint to link2 COM
self.m1 = 1.0 # link1 mass
self.m2 = 1.0 # link2 mass
self.I1 = (self.m1 * self.l1**2) / 12 # link1 moment of inertia
self.I2 = (self.m2 * self.l2**2) / 12 # link2 moment of inertia
self.gravity = -9.81
[docs] def deriv(self):
"""Calculate the accelerations for a rigid body double-pendulum dynamics model.
:returns: system derivative vector as a numpy ndarray
q1 = self.state[0]
q2 = self.state[1]
qd1 = self.state[2]
qd2 = self.state[3]
LC1 = self.lc1
LC2 = self.lc2
L1 = self.l1
M1 = self.m1
M2 = self.m2
d11 = M1*LC1*LC1 + M2*(L1*L1 + LC2*LC2 + 2*L1*LC2*math.cos(q2)) + self.I1 + self.I2
d12 = M2*(LC2*LC2 + L1*LC2*math.cos(q2)) + self.I2
d21 = d12
d22 = M2*LC2*LC2 + self.I2
h1 = -M2*L1*LC2*math.sin(q2)*qd2*qd2 - 2*M2*L1*LC2*math.sin(q2)*qd2*qd1
h2 = M2*L1*LC2*math.sin(q2)*qd1*qd1
phi1 = -M2*LC2*self.gravity*math.sin(q1+q2) - (M1*LC1 + M2*L1) * self.gravity * math.sin(q1)
phi2 = -M2*LC2*self.gravity*math.sin(q1+q2)
# now solve the equations for qdd:
# d11 qdd1 + d12 qdd2 + h1 + phi1 = tau1
# d21 qdd1 + d22 qdd2 + h2 + phi2 = tau2
rhs1 = self.tau[0] - h1 - phi1
rhs2 = self.tau[1] - h2 - phi2
# Apply Cramer's Rule to compute the accelerations using
# determinants by solving D qdd = rhs. First compute the
# denominator as the determinant of D:
denom = (d11 * d22) - (d21 * d12)
# the derivative of the position is trivially the current velocity
self.dydt[0] = qd1
self.dydt[1] = qd2
# the derivative of the velocity is the acceleration.
# the numerator of qdd[n] is the determinant of the matrix in
# which the nth column of D is replaced by RHS
self.dydt[2] = ((rhs1 * d22 ) - (rhs2 * d12)) / denom
self.dydt[3] = (( d11 * rhs2) - (d21 * rhs1)) / denom
return self.dydt
[docs] def timer_tick(self, delta_t):
"""Run the simulation for an interval.
:param delta_t: length of interval in simulated time seconds
while delta_t > 0:
# calculate next control outputs
self.control.compute_control(self.t, self.dt, self.state, self.tau)
# calculate dynamics model
qd = self.deriv()
# Euler integration
self.state = self.state + self.dt * qd
delta_t -= self.dt
self.t += self.dt
[docs] def update_display(self):
"""Send the current state to the GUI using a UDP OSC message."""
radian_to_degrees = 180.0 / math.pi
pose = (self.state[0]*radian_to_degrees, self.state[1]*radian_to_degrees)
self.cartoon.send_message("/q", pose)
if self.verbose: print("Position: ", pose)
[docs] def event_loop(self):
"""Simulator run loop; never exits."""
frame_interval = 0.040 if not self.fast else 10.0
while True:
if not self.fast: time.sleep(frame_interval)
# Methods to process messages from the Max/MSP display received via OSC over UDP.
[docs] def message(self, msgaddr, *args):
"""Process messages from the Max/MSP display received via OSC over UDP.
:param msgaddr: the address string of the OSC message, eg '/sim/verbose'
:param args: tuple of OSC message arguments
print("Simulator received message %s: %s" % (msgaddr, args))
if msgaddr == "/sim/verbose":
self.verbose = (args[0] != 0)
[docs] def unknown_message(self, msgaddr, *args):
"""Default handler for unrecognized OSC messages."""
print("Simulator received unmapped message %s: %s" % (msgaddr, args))
def start_osc_server(args, sim, control):
"""Start a background thread running an OSC server listening for messages on an UDP socket."""
# Initialize the OSC message dispatch system.
dispatch = dispatcher.Dispatcher()
dispatch.map("/sim/*", sim.message)
dispatch.map("/control/*", control.message)
# Start and run the server.
server = osc_server.ThreadingOSCUDPServer((args.simaddr, args.simport), dispatch)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
# Script entry point.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Double-pendulum or two-link arm simulator.")
parser.add_argument('--verbose', action='store_true', help='Enable debugging output.')
parser.add_argument('--maxport', type=int, default=16375, help='UDP port for Max/MSP patch (default 16375).')
parser.add_argument('--maxaddr', default="", help='IP address for the Max/MSP patch (default localhost).')
parser.add_argument('--simport', type=int, default=54375, help='UDP port for Python simulator (default 54375).')
parser.add_argument('--simaddr', default="", help='IP address for the simulator (default localhost).')
parser.add_argument('--fast', action='store_true', help='Run as fast as possible.')
args = parser.parse_args()
# create an OSC client endpoint to send display updates to Max/MSP
cartoon = udp_client.SimpleUDPClient(args.maxaddr, args.maxport)
# initializes the simulator
control = DoublePendulumController(args)
sim = DoublePendulumSimulator(args, cartoon, control)
# set up the OSC message server to receive parameters from Max/MSP
start_osc_server(args, sim, control)
# begin the simulation