suitcase Webots emulator¶
One solution for the suitcase is to develop performances as much as possible in simulation than use a compatibility layer to replace the Webots API with the physical hardware.
The sample implementation makes use of several Python features: multithreading,
multiple inheritance, and ‘duck typing’. The RealSuitcase
class
encapsulates the robot control process by inheriting from multiple classes:
The
suitcase.Suitcase
class is a duplicate of the Webots simulation controller. It inherits fromcontroller.Robot
which is normally provided by the Webots API. Instead, a proxy class is provided incontroller.py
with a partial implementation of this API.The
controller.SuitcaseRobot
class implements the specific devices required for the example in the form of methods which are normally implemented in the Webots Robot class. This works because Python isn’t particular about the exact parent class of a method.The
threading.Thread
class implements the methods to allow the control process to run as a separate thread.
The communication with the Arduino is handled by a separate thread to allow the
controller and Arduino to run on independent schedules. In practice, the
Arduino communication is slower than the Webots communication, so the actual
sensor and motor behavior will vary from the simulation, but the controller will
run in approximately real time. This communication is managed by the
Arduino
class which receives status information, passes it to the controller
emulator for interpretation, then requests and sends updated control commands.
running the system¶
The script is intended to be run from the command line, as it requires the Arduino serial port specified as an argument and it continuously prints out data traffic.
A typical invocation looks as follows:
python3 main.py --port /dev/cu.usbmodem1444301
Under Windows, Arduino serial ports are named using COM, e.g. COM5
.
On any platform, if you are not sure of the port name, please launch the Arduino IDE to identify the correct port.
main.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | #!/usr/bin/env python3
# main.py : top-level script to run a suitcase testbed show
# No copyright, 2021, Garth Zeglin. This file is explicitly placed in the public domain.
# This provides a skeleton Webots API to connect a Webots controller object to
# physical hardware. Unlike Webots, it uses threads to run the controller as
# a normal Python object within a single process.
# Standard Python modules.
import os, sys, time, threading, argparse
# Import pySerial module for communicating with an Arduino. https://pythonhosted.org/pyserial/
import serial
# Import the skeleton Webots API from the current folder.
import controller
# Import the controller. The suitcase.py file was copied from a Webots project tree.
import suitcase
################################################################
# Define wrapper classes which inherit from both a Webots robot controller and a
# specialized physical interface class. The controller object inherits
# controller.Robot with common API code, but some methods are implemented
# instead by the SuitcaseRobot interface class.
class RealSuitcase(suitcase.Suitcase, controller.SuitcaseRobot, threading.Thread):
def __init__(self):
print("Entering RealSuitcase.__init__ and initializing superclasses")
super(RealSuitcase, self).__init__()
self.daemon = True
def run(self):
print("Starting suitcase.")
self._lock.acquire()
super(RealSuitcase, self).run()
################################################################
class Arduino(threading.Thread):
"""Connect an Arduino serial port to a controller object. The input and output
is assumed to be a text line of space-delimited integers. The specific data
format of both input is determined by the controller class and the Arduino
sketch. This process runs as a thread so it can easily block on the input.
"""
def __init__(self, portname, controller):
print("Entering Arduino.__init__ and initializing superclasses")
super(Arduino, self).__init__()
self.controller = controller
self.portname = portname
self.port = serial.Serial(portname, baudrate=115200, timeout=1.0)
self.running = True
def run(self):
# wait briefly for the Arduino to complete waking up
time.sleep(0.2)
print(f"Entering Arduino event loop for {self.portname}.")
while(self.running):
line = self.controller.get_output_string()
self.port.write(line.encode())
print("Sending to Arduino: " + line.strip())
input = self.port.readline().decode('ascii')
print("Received from Arduino: " + input.strip())
self.controller.process_sensor_string(input)
def safe_exit(self):
self.running = False
self.port.write(b"enable 0\n")
################################################################
# The following section is run when this is loaded as a script.
if __name__ == "__main__":
# Initialize the command parser.
parser = argparse.ArgumentParser( description = """Control script to connect Webots controller to StepperWinch firmware on an Arduino.""")
parser.add_argument( '-p', '--port', default='/dev/cu.usbmodem1444301',
help='Specify the name of the Arduino serial port device (default is %(default)s.')
# Parse the command line, returning a Namespace.
args = parser.parse_args()
# Create the suitcase control object.
suitcase = RealSuitcase()
# Create the Arduino interface which communicates with physical hardware.
# The serial device address will need to be modified for your specific computer.
suitcase_hw = Arduino(args.port, suitcase)
# Start the control thread.
suitcase.start()
# Start the hardware interface thread.
suitcase_hw.start()
# Wait for the hardware thread to finish.
try:
suitcase_hw.join()
except KeyboardInterrupt:
suitcase_hw.safe_exit()
print("Main thread received user interrupt, exiting.")
|
controller.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | # controller.py : skeleton Webots API to connect the controller objects to the physical hardware.
# No copyright, 2021, Garth Zeglin. This file is explicitly placed in the public domain.
import time, threading, queue, math
################################################################
class Robot(object):
"""This object emulates the portions of the Webots API common to all physical robots."""
def __init__(self):
print("Entering Robot.__init__ and initializing superclasses")
super(Robot, self).__init__()
# include a receiver/emitter pair by default
self.emitter = Emitter()
self.receiver = Receiver()
# keep track of elapsed time
self._time = 0.0
# lock to synchronize access for this object and its children
self._lock = threading.Lock()
def step(self, interval):
# print(f"Robot step function sleeping for {interval} msec on {self}.")
self._lock.release()
time.sleep(0.001 * interval)
self._lock.acquire()
self._time += 0.001 * interval
# print("Robot step function resuming.")
return 0
def getTime(self):
return self._time
# only support a single receiver and emitter
def getReceiver(self, name):
return self.receiver
def getEmitter(self, name):
return self.emitter
################################################################
class SuitcaseRobot(object):
"""This object emulates the portions of the Webots API related to the physical suitcase robot."""
def __init__(self):
print("Entering SuitcaseRobot.__init__ and initializing superclasses")
super(SuitcaseRobot, self).__init__()
# The hardware is assumed to use 1/4 step microstepping and 800 steps per revolution.
self.radians_to_steps = 800 / (2*math.pi)
self.steps_to_radians = 1.0 / self.radians_to_steps
# Create objects representing the physical hardware.
self._motors = [RealMotor(name) for name in ['motor1', 'motor2', 'motor3', 'motor4']]
self._sensors = [RealSensor(name) for name in ['joint1', 'joint2', 'joint3', 'joint4']]
# Collect all the devices into a single dictionary for retrieval by the controller.
self._devices = {}
for m in self._motors:
self._devices[m.name] = m
for s in self._sensors:
self._devices[s.name] = s
# print("SuitcaseRobot initialized devices: ", self._devices)
def getDevice(self, name):
return self._devices[name]
def process_sensor_string(self, line):
# Process a bytestring of text from the hardware, typically called from
# a serial port thread with data from the Arduino.
# This assumes the StepperWinch sketch is running.
# Sample input line: txyza 1966200716 0 0 0 0
try:
tokens = line.split()
if len(tokens) >= 6:
# ignore the timestamp and extract the stepper motor counts
values = [int(s) for s in tokens[2:6]]
# update the sensor values in a thread-safe way
with self._lock:
self._sensors[0].value = self.steps_to_radians * values[0]
self._sensors[1].value = self.steps_to_radians * values[1]
self._sensors[2].value = self.steps_to_radians * values[2]
self._sensors[3].value = self.steps_to_radians * values[3]
except ValueError:
pass
def get_output_string(self):
# Generate the next string of output to the hardware, typically called
# from a serial port thread to generate output for the Arduino.
# This assumes the StepperWinch sketch is running.
with self._lock:
# fetch the current motor commands in a thread-safe way
positions = [m.target for m in self._motors]
velocities = [m.rate for m in self._motors]
# Accumulate a list of position targets and a list of velocity targets.
# The default mode is position, but an infinite position target
# indicates the use of velocity mode instead.
v_c = ""
p_c = ""
vel = []
pos = []
for l, p, v in zip('xyza', positions, velocities):
if not math.isinf(p):
p_c += l
pos.append(str(int(self.radians_to_steps * p)))
else:
v_c += l
vel.append(str(int(self.radians_to_steps * v)))
# create either one or two commands as output
if v_c != "":
vcommand = "v " + v_c + " " + " ".join(vel) + "\n"
else:
vcommand = ""
if p_c != "":
pcommand = "a " + p_c + " " + " ".join(pos) + "\n"
else:
pcommand = ""
return pcommand + vcommand
################################################################
# Emulate the Webots sensor API. This object just stores and retrieves a single value.
class RealSensor(object):
def __init__(self, name):
self.value = 0
self.name = name
def enable(self, rate):
pass
def getValue(self):
return self.value
################################################################
# Emulate the Webots motor API. This object just stores the commanded state; the parent robot emulator
# is responsible for communicating it to the hardware controller.
class RealMotor(object):
def __init__(self, name):
self.rate = 0
self.target = 0
self.name = name
def setVelocity(self, rate):
self.rate = rate
def setPosition(self, target):
self.target = target
################################################################
# Emulate the Webots 'radio' communication receiver.
class Receiver(object):
def __init__(self):
# create a thread-safe queue for received messages
self.queue = queue.Queue()
pass
def enable(self, rate):
pass
def getQueueLength(self):
# return the number of available packets
return self.queue.qsize()
def getData(self):
# return the next packet data as a bytestring
# this also removes it from the queue, which doesn't match the Webots behavior, but
# will work as long as it isn't called twice
data = self.queue.get()
print(f"receiver returning {data}")
return data
def nextPacket(self):
# remove the oldest packet from the queue
# because it is removed by getData, this is a no-op
pass
# Emulate the Webots 'radio' communication transmitter.
class Emitter(object):
def __init__(self):
self.receivers = []
def add_receiver(self, receiver):
self.receivers.append(receiver)
def send(self, data):
# send a bytestring to the subscribed receivers
print(f"emitter broadcasting {data}")
for r in self.receivers:
r.queue.put(data)
################################################################
|