suitcase Webots emulator¶
One solution for the suitcase is to develop performances as much as possible in simulation than use a compatibility layer to replace the Webots API with the physical hardware.
The sample implementation makes use of several Python features: multithreading,
multiple inheritance, and ‘duck typing’. The RealSuitcase
class
encapsulates the robot control process by inheriting from multiple classes:
The
suitcase.Suitcase
class is a duplicate of the Webots simulation controller. It inherits fromcontroller.Robot
which is normally provided by the Webots API. Instead, a proxy class is provided incontroller.py
with a partial implementation of this API.The
controller.SuitcaseRobot
class implements the specific devices required for the example in the form of methods which are normally implemented in the Webots Robot class. This works because Python isn’t particular about the exact parent class of a method.The
threading.Thread
class implements the methods to allow the control process to run as a separate thread.
The communication with the Arduino is handled by a separate thread to allow the
controller and Arduino to run on independent schedules. In practice, the
Arduino communication is slower than the Webots communication, so the actual
sensor and motor behavior will vary from the simulation, but the controller will
run in approximately real time. This communication is managed by the
Arduino
class which receives status information, passes it to the controller
emulator for interpretation, then requests and sends updated control commands.
running the system¶
The script is intended to be run from the command line, as it requires the Arduino serial port specified as an argument and it continuously prints out data traffic.
A typical invocation looks as follows:
python3 main.py --port /dev/cu.usbmodem1444301
Under Windows, Arduino serial ports are named using COM, e.g. COM5
.
On any platform, if you are not sure of the port name, please launch the Arduino IDE to identify the correct port.
main.py¶
1#!/usr/bin/env python3
2
3# main.py : top-level script to run a suitcase testbed show
4# No copyright, 2021, Garth Zeglin. This file is explicitly placed in the public domain.
5
6# This provides a skeleton Webots API to connect a Webots controller object to
7# physical hardware. Unlike Webots, it uses threads to run the controller as
8# a normal Python object within a single process.
9
10# Standard Python modules.
11import os, sys, time, threading, argparse
12
13# Import pySerial module for communicating with an Arduino. https://pythonhosted.org/pyserial/
14import serial
15
16# Import the skeleton Webots API from the current folder.
17import controller
18
19# Import the controller. The suitcase.py file was copied from a Webots project tree.
20import suitcase
21
22################################################################
23# Define wrapper classes which inherit from both a Webots robot controller and a
24# specialized physical interface class. The controller object inherits
25# controller.Robot with common API code, but some methods are implemented
26# instead by the SuitcaseRobot interface class.
27
28class RealSuitcase(suitcase.Suitcase, controller.SuitcaseRobot, threading.Thread):
29 def __init__(self):
30 print("Entering RealSuitcase.__init__ and initializing superclasses")
31 super(RealSuitcase, self).__init__()
32 self.daemon = True
33
34 def run(self):
35 print("Starting suitcase.")
36 self._lock.acquire()
37 super(RealSuitcase, self).run()
38
39################################################################
40class Arduino(threading.Thread):
41 """Connect an Arduino serial port to a controller object. The input and output
42 is assumed to be a text line of space-delimited integers. The specific data
43 format of both input is determined by the controller class and the Arduino
44 sketch. This process runs as a thread so it can easily block on the input.
45 """
46 def __init__(self, portname, controller):
47 print("Entering Arduino.__init__ and initializing superclasses")
48 super(Arduino, self).__init__()
49
50 self.controller = controller
51 self.portname = portname
52 self.port = serial.Serial(portname, baudrate=115200, timeout=1.0)
53 self.running = True
54
55 def run(self):
56 # wait briefly for the Arduino to complete waking up
57 time.sleep(0.2)
58
59 print(f"Entering Arduino event loop for {self.portname}.")
60 while(self.running):
61 line = self.controller.get_output_string()
62 self.port.write(line.encode())
63 print("Sending to Arduino: " + line.strip())
64
65 input = self.port.readline().decode('ascii')
66 print("Received from Arduino: " + input.strip())
67 self.controller.process_sensor_string(input)
68
69 def safe_exit(self):
70 self.running = False
71 self.port.write(b"enable 0\n")
72
73################################################################
74# The following section is run when this is loaded as a script.
75if __name__ == "__main__":
76
77 # Initialize the command parser.
78 parser = argparse.ArgumentParser( description = """Control script to connect Webots controller to StepperWinch firmware on an Arduino.""")
79 parser.add_argument( '-p', '--port', default='/dev/cu.usbmodem1444301',
80 help='Specify the name of the Arduino serial port device (default is %(default)s.')
81
82 # Parse the command line, returning a Namespace.
83 args = parser.parse_args()
84
85 # Create the suitcase control object.
86 suitcase = RealSuitcase()
87
88 # Create the Arduino interface which communicates with physical hardware.
89 # The serial device address will need to be modified for your specific computer.
90 suitcase_hw = Arduino(args.port, suitcase)
91
92 # Start the control thread.
93 suitcase.start()
94
95 # Start the hardware interface thread.
96 suitcase_hw.start()
97
98 # Wait for the hardware thread to finish.
99 try:
100 suitcase_hw.join()
101
102 except KeyboardInterrupt:
103 suitcase_hw.safe_exit()
104 print("Main thread received user interrupt, exiting.")
105
controller.py¶
1# controller.py : skeleton Webots API to connect the controller objects to the physical hardware.
2# No copyright, 2021, Garth Zeglin. This file is explicitly placed in the public domain.
3
4import time, threading, queue, math
5
6################################################################
7class Robot(object):
8 """This object emulates the portions of the Webots API common to all physical robots."""
9 def __init__(self):
10
11 print("Entering Robot.__init__ and initializing superclasses")
12 super(Robot, self).__init__()
13
14 # include a receiver/emitter pair by default
15 self.emitter = Emitter()
16 self.receiver = Receiver()
17
18 # keep track of elapsed time
19 self._time = 0.0
20
21 # lock to synchronize access for this object and its children
22 self._lock = threading.Lock()
23
24 def step(self, interval):
25 # print(f"Robot step function sleeping for {interval} msec on {self}.")
26
27 self._lock.release()
28 time.sleep(0.001 * interval)
29 self._lock.acquire()
30 self._time += 0.001 * interval
31
32 # print("Robot step function resuming.")
33 return 0
34
35 def getTime(self):
36 return self._time
37
38 # only support a single receiver and emitter
39 def getReceiver(self, name):
40 return self.receiver
41
42 def getEmitter(self, name):
43 return self.emitter
44
45################################################################
46class SuitcaseRobot(object):
47 """This object emulates the portions of the Webots API related to the physical suitcase robot."""
48 def __init__(self):
49 print("Entering SuitcaseRobot.__init__ and initializing superclasses")
50 super(SuitcaseRobot, self).__init__()
51
52 # The hardware is assumed to use 1/4 step microstepping and 800 steps per revolution.
53 self.radians_to_steps = 800 / (2*math.pi)
54 self.steps_to_radians = 1.0 / self.radians_to_steps
55
56 # Create objects representing the physical hardware.
57 self._motors = [RealMotor(name) for name in ['motor1', 'motor2', 'motor3', 'motor4']]
58 self._sensors = [RealSensor(name) for name in ['joint1', 'joint2', 'joint3', 'joint4']]
59
60 # Collect all the devices into a single dictionary for retrieval by the controller.
61 self._devices = {}
62 for m in self._motors:
63 self._devices[m.name] = m
64 for s in self._sensors:
65 self._devices[s.name] = s
66 # print("SuitcaseRobot initialized devices: ", self._devices)
67
68 def getDevice(self, name):
69 return self._devices[name]
70
71 def process_sensor_string(self, line):
72 # Process a bytestring of text from the hardware, typically called from
73 # a serial port thread with data from the Arduino.
74 # This assumes the StepperWinch sketch is running.
75 # Sample input line: txyza 1966200716 0 0 0 0
76 try:
77 tokens = line.split()
78 if len(tokens) >= 6:
79 # ignore the timestamp and extract the stepper motor counts
80 values = [int(s) for s in tokens[2:6]]
81
82 # update the sensor values in a thread-safe way
83 with self._lock:
84 self._sensors[0].value = self.steps_to_radians * values[0]
85 self._sensors[1].value = self.steps_to_radians * values[1]
86 self._sensors[2].value = self.steps_to_radians * values[2]
87 self._sensors[3].value = self.steps_to_radians * values[3]
88
89 except ValueError:
90 pass
91
92 def get_output_string(self):
93 # Generate the next string of output to the hardware, typically called
94 # from a serial port thread to generate output for the Arduino.
95 # This assumes the StepperWinch sketch is running.
96
97 with self._lock:
98 # fetch the current motor commands in a thread-safe way
99 positions = [m.target for m in self._motors]
100 velocities = [m.rate for m in self._motors]
101
102 # Accumulate a list of position targets and a list of velocity targets.
103 # The default mode is position, but an infinite position target
104 # indicates the use of velocity mode instead.
105 v_c = ""
106 p_c = ""
107 vel = []
108 pos = []
109 for l, p, v in zip('xyza', positions, velocities):
110 if not math.isinf(p):
111 p_c += l
112 pos.append(str(int(self.radians_to_steps * p)))
113 else:
114 v_c += l
115 vel.append(str(int(self.radians_to_steps * v)))
116
117 # create either one or two commands as output
118 if v_c != "":
119 vcommand = "v " + v_c + " " + " ".join(vel) + "\n"
120 else:
121 vcommand = ""
122
123 if p_c != "":
124 pcommand = "a " + p_c + " " + " ".join(pos) + "\n"
125 else:
126 pcommand = ""
127
128 return pcommand + vcommand
129
130################################################################
131# Emulate the Webots sensor API. This object just stores and retrieves a single value.
132class RealSensor(object):
133 def __init__(self, name):
134 self.value = 0
135 self.name = name
136
137 def enable(self, rate):
138 pass
139
140 def getValue(self):
141 return self.value
142
143################################################################
144# Emulate the Webots motor API. This object just stores the commanded state; the parent robot emulator
145# is responsible for communicating it to the hardware controller.
146
147class RealMotor(object):
148 def __init__(self, name):
149 self.rate = 0
150 self.target = 0
151 self.name = name
152
153 def setVelocity(self, rate):
154 self.rate = rate
155
156 def setPosition(self, target):
157 self.target = target
158
159################################################################
160# Emulate the Webots 'radio' communication receiver.
161class Receiver(object):
162 def __init__(self):
163
164 # create a thread-safe queue for received messages
165 self.queue = queue.Queue()
166 pass
167
168 def enable(self, rate):
169 pass
170
171 def getQueueLength(self):
172 # return the number of available packets
173 return self.queue.qsize()
174
175 def getData(self):
176 # return the next packet data as a bytestring
177 # this also removes it from the queue, which doesn't match the Webots behavior, but
178 # will work as long as it isn't called twice
179 data = self.queue.get()
180 print(f"receiver returning {data}")
181 return data
182
183 def nextPacket(self):
184 # remove the oldest packet from the queue
185 # because it is removed by getData, this is a no-op
186 pass
187
188# Emulate the Webots 'radio' communication transmitter.
189class Emitter(object):
190 def __init__(self):
191 self.receivers = []
192
193 def add_receiver(self, receiver):
194 self.receivers.append(receiver)
195
196 def send(self, data):
197 # send a bytestring to the subscribed receivers
198 print(f"emitter broadcasting {data}")
199 for r in self.receivers:
200 r.queue.put(data)
201
202################################################################